Unverified Commit f56174ac authored by tanreinama's avatar tanreinama Committed by GitHub
Browse files

add GPTSAN model (reopen) (#21291)

* add GPTSAN-Japanese

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN (update for review)

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* fix typo in comment text

* add GPTSAN

* add GPTSAN

* add GPTSAN

* add GPTSAN

* fix document and comments

* fix class name GPTSAN->GPTSan

* fix import and test for tokenizer
parent c87bbe1f
......@@ -90,6 +90,7 @@ MODEL_MAPPING_NAMES = OrderedDict(
("gpt_neox", "GPTNeoXModel"),
("gpt_neox_japanese", "GPTNeoXJapaneseModel"),
("gptj", "GPTJModel"),
("gptsan-japanese", "GPTSanJapaneseForConditionalGeneration"),
("graphormer", "GraphormerModel"),
("groupvit", "GroupViTModel"),
("hubert", "HubertModel"),
......@@ -216,6 +217,7 @@ MODEL_FOR_PRETRAINING_MAPPING_NAMES = OrderedDict(
("funnel", "FunnelForPreTraining"),
("gpt-sw3", "GPT2LMHeadModel"),
("gpt2", "GPT2LMHeadModel"),
("gptsan-japanese", "GPTSanJapaneseForConditionalGeneration"),
("ibert", "IBertForMaskedLM"),
("layoutlm", "LayoutLMForMaskedLM"),
("longformer", "LongformerForMaskedLM"),
......@@ -286,6 +288,7 @@ MODEL_WITH_LM_HEAD_MAPPING_NAMES = OrderedDict(
("gpt_neox", "GPTNeoXForCausalLM"),
("gpt_neox_japanese", "GPTNeoXJapaneseForCausalLM"),
("gptj", "GPTJForCausalLM"),
("gptsan-japanese", "GPTSanJapaneseForConditionalGeneration"),
("ibert", "IBertForMaskedLM"),
("layoutlm", "LayoutLMForMaskedLM"),
("led", "LEDForConditionalGeneration"),
......@@ -579,6 +582,7 @@ MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING_NAMES = OrderedDict(
("blenderbot-small", "BlenderbotSmallForConditionalGeneration"),
("encoder-decoder", "EncoderDecoderModel"),
("fsmt", "FSMTForConditionalGeneration"),
("gptsan-japanese", "GPTSanJapaneseForConditionalGeneration"),
("led", "LEDForConditionalGeneration"),
("longt5", "LongT5ForConditionalGeneration"),
("m2m_100", "M2M100ForConditionalGeneration"),
......
......@@ -154,6 +154,7 @@ else:
("gpt_neox", (None, "GPTNeoXTokenizerFast" if is_tokenizers_available() else None)),
("gpt_neox_japanese", ("GPTNeoXJapaneseTokenizer", None)),
("gptj", ("GPT2Tokenizer", "GPT2TokenizerFast" if is_tokenizers_available() else None)),
("gptsan-japanese", ("GPTSanJapaneseTokenizer", None)),
("groupvit", ("CLIPTokenizer", "CLIPTokenizerFast" if is_tokenizers_available() else None)),
("herbert", ("HerbertTokenizer", "HerbertTokenizerFast" if is_tokenizers_available() else None)),
("hubert", ("Wav2Vec2CTCTokenizer", None)),
......
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from ...utils import (
OptionalDependencyNotAvailable,
_LazyModule,
is_flax_available,
is_tf_available,
is_torch_available,
)
_import_structure = {
"configuration_gptsan_japanese": ["GPTSAN_JAPANESE_PRETRAINED_CONFIG_ARCHIVE_MAP", "GPTSanJapaneseConfig"],
"tokenization_gptsan_japanese": ["GPTSanJapaneseTokenizer"],
}
try:
if not is_torch_available():
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
pass
else:
_import_structure["modeling_gptsan_japanese"] = [
"GPTSAN_JAPANESE_PRETRAINED_MODEL_ARCHIVE_LIST",
"GPTSanJapaneseForConditionalGeneration",
"GPTSanJapaneseModel",
"GPTSanJapanesePreTrainedModel",
]
_import_structure["tokenization_gptsan_japanese"] = [
"GPTSanJapaneseTokenizer",
]
if TYPE_CHECKING:
from .configuration_gptsan_japanese import GPTSAN_JAPANESE_PRETRAINED_CONFIG_ARCHIVE_MAP, GPTSanJapaneseConfig
from .tokenization_gptsan_japanese import GPTSanJapaneseTokenizer
try:
if not is_torch_available():
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
pass
else:
from .modeling_gptsan_japanese import (
GPTSAN_JAPANESE_PRETRAINED_MODEL_ARCHIVE_LIST,
GPTSanJapaneseForConditionalGeneration,
GPTSanJapaneseModel,
GPTSanJapanesePreTrainedModel,
)
from .tokenization_gptsan_japanese import GPTSanJapaneseTokenizer
else:
import sys
sys.modules[__name__] = _LazyModule(__name__, globals()["__file__"], _import_structure, module_spec=__spec__)
# coding=utf-8
# Copyright 2023, HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" GPTSAN-japanese model configuration"""
from ...configuration_utils import PretrainedConfig
from ...utils import logging
logger = logging.get_logger(__name__)
GPTSAN_JAPANESE_PRETRAINED_CONFIG_ARCHIVE_MAP = {
"tanreinama/GPTSAN-2.8B-spout_is_uniform": (
"https://huggingface.co/tanreinama/GPTSAN-2.8B-spout_is_uniform/resolve/main/config.json"
),
}
class GPTSanJapaneseConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`GPTSanJapaneseModel`]. It is used to instantiate
a GPTSANJapanese model according to the specified arguments, defining the model architecture. Instantiating a
configuration with the defaults will yield a similar configuration to that of the GPTSANJapanese
[tanreinama/GPTSAN-2.8B-spout_is_uniform](https://huggingface.co/tanreinama/GPTSAN-2.8B-spout_is_uniform)
architecture.
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Arguments:
vocab_size (`int`, *optional*, defaults to 36000):
Vocabulary size of the GPTSANJapanese model. Defines the number of different tokens that can be represented
by the `inputs_ids` passed when calling [`GPTSanJapaneseModel`].
max_position_embeddings (`int`, *optional*, defaults to 1280):
The maximum sequence length that this model might ever be used with. Defaults set this to 1280.
d_model (`int`, *optional*, defaults to 1024):
Size of the encoder layers and the pooler layer.
d_ff (`int`, *optional*, defaults to 8192):
Size of the intermediate feed forward layer in each `SwitchTransformersBlock`.
d_ext (`int`, *optional*, defaults to 4096):
Size of the intermediate feed forward layer in each Extra-layers.
d_spout (`int`, *optional*, defaults to 128):
Size of the `spout` vector.
num_switch_layers (`int`, *optional*, defaults to 10):
Number of layers in the Switch Transformer layer.
num_ext_layers (`int`, *optional*, defaults to 0):
Number of layers in the Extra-layers.
num_heads (`int`, *optional*, defaults to 16):
Number of attention heads for each attention layer in the Transformer encoder.
num_experts (`int`, *optional*, defaults to 16):
Number of experts for each SwitchTransformer layer.
expert_capacity (`int`, *optional*, defaults to 128):
Number of tokens that can be stored in each expert. If set to 1, the model will behave like a regular
Transformer.
dropout_rate (`float`, *optional*, defaults to 0.0):
The ratio for all dropout layers.
layer_norm_eps (`float`, *optional*, defaults to 1e-5):
The epsilon used by the layer normalization layers.
router_bias (`bool`, *optional*, defaults to `False`):
Whether to add a bias to the router.
router_jitter_noise (`float`, *optional*, defaults to 0.0):
Amount of noise to add to the router. Set it to 0.0 during prediction or set small value (usually 1e-2)
during training.
router_dtype (`str`, *optional*, default to `"float32"`):
The `dtype` used for the routers. It is preferable to keep the `dtype` to `"float32"` as specified in the
*selective precision* discussion in [the paper](https://arxiv.org/abs/2101.03961).
router_ignore_padding_tokens (`bool`, *optional*, defaults to `False`):
Whether to ignore padding tokens when routing.
output_hidden_states (`bool`, *optional*, default to `False`):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
output_attentions (`bool`, *optional*, defaults to `False`):
Whether or not to return the attentions tensors of all attention layers.
initializer_factor (`float`, *optional*, defaults to 0.002):
A factor for initializing all weight matrices.
output_router_logits (`bool`, *optional*, default to `False`):
Whether or not to return the router logits of all experts.
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models)
"""
model_type = "gptsan-japanese"
keys_to_ignore_at_inference = [
"past_key_values",
]
attribute_map = {
"hidden_size": "d_model",
"num_attention_heads": "num_heads",
"num_hidden_layers": "num_layers",
}
def __init__(
self,
vocab_size=36000,
max_position_embeddings=1280,
d_model=1024,
d_ff=8192,
d_ext=4096,
d_spout=128,
num_switch_layers=10,
num_ext_layers=0,
num_heads=16,
num_experts=16,
expert_capacity=128,
dropout_rate=0.0,
layer_norm_epsilon=1e-5,
router_bias=False,
router_jitter_noise=0.0,
router_dtype="float32",
router_ignore_padding_tokens=False,
output_hidden_states=False,
output_attentions=False,
initializer_factor=0.002,
output_router_logits=False,
use_cache=True,
separator_token_id=35998,
pad_token_id=35995,
eos_token_id=35999,
**kwargs,
):
self.vocab_size = vocab_size
self.max_position_embeddings = max_position_embeddings
self.d_model = d_model
self.d_ff = d_ff
self.d_ext = d_ext
self.d_spout = d_spout
self.num_switch_layers = num_switch_layers
self.num_ext_layers = num_ext_layers
self.num_layers = num_switch_layers + num_ext_layers
self.num_heads = num_heads
self.num_experts = num_experts
self.expert_capacity = expert_capacity
self.dropout_rate = dropout_rate
self.layer_norm_epsilon = layer_norm_epsilon
self.router_bias = router_bias
self.router_jitter_noise = router_jitter_noise
self.router_dtype = router_dtype
self.router_ignore_padding_tokens = router_ignore_padding_tokens
self.output_hidden_states = output_hidden_states
self.output_attentions = output_attentions
self.initializer_factor = initializer_factor
self.output_router_logits = output_router_logits
self.use_cache = use_cache
super().__init__(
separator_token_id=separator_token_id,
pad_token_id=pad_token_id,
eos_token_id=eos_token_id,
**kwargs,
)
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Convert GPTSANJapanese checkpoints from the original repository to pytorch model."""
import argparse
import json
import os
from collections import OrderedDict
import numpy as np
import tensorflow as tf
import torch
def convert_tf_gptsan_to_pt(args):
parameter_file = os.path.join(args.tf_model_dir, "parameters.json")
params = json.loads(open(parameter_file).read())
if not params:
raise ValueError(
f"It seems that the json file at {parameter_file} is empty. Make sure you have a correct json file."
)
if not args.output.endswith(".pt"):
args.output = args.output + ".pt"
new_state = OrderedDict()
with tf.device("/CPU:0"):
reader = tf.train.load_checkpoint(args.tf_model_dir)
shapes = reader.get_variable_to_shape_map()
for key_name in shapes.keys():
vnp = reader.get_tensor(key_name).astype(np.float16)
if key_name.endswith("/adam_m") or key_name.endswith("/adam_v"):
continue
if key_name.startswith("pasts/"):
if key_name.startswith("pasts/mlp"):
player = int(key_name[9])
elif key_name.startswith("pasts/out"):
player = 8
name = "model.sqout.%d.weight" % (player * 2) # enter to nn.Sequencial with Tanh, so 2 at a time
state = vnp.transpose([1, 0]).copy() # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name.startswith("model/moe"):
player = int(key_name[9:].split("/")[0])
if key_name.endswith("/switch_gating/kernel"):
name = "model.blocks.%d.feed_forward.mlp.router.classifier.weight" % player
state = vnp.transpose([1, 0]).copy() # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name.endswith("/softmlp/kernel"):
name = "model.blocks.%d.feed_forward.soft_bypass_mlp.weight" % player
state = vnp.transpose([1, 0]).copy() # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name.endswith("/wo/kernel") or key_name.endswith("/wi/kernel"):
nlayer = key_name[-9:-7]
for i in range(16):
name = "model.blocks.%d.feed_forward.mlp.experts.expert_%d.%s.weight" % (player, i, nlayer)
state = (
vnp[i].transpose([1, 0]).copy()
) # In Mesh-Tensorflow, it is one array, so it is divided
new_state[name] = torch.tensor(state)
elif key_name.startswith("model/mlp"):
player = int(key_name[9:].split("/")[0])
if key_name.endswith("/p1/kernel"):
name = "model.blocks.%d.feed_forward.mlp.wi.weight" % player
state = vnp.transpose([1, 0]).copy() # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name.endswith("/p1/bias"):
name = "model.blocks.%d.feed_forward.mlp.wi.bias" % player
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
elif key_name.endswith("/p2/kernel"):
name = "model.blocks.%d.feed_forward.mlp.wo.weight" % player
state = vnp.transpose([1, 0]).copy() # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name.endswith("/p2/bias"):
name = "model.blocks.%d.feed_forward.mlp.wo.bias" % player
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
elif key_name.startswith("model/ln"):
player = int(key_name[8:].split("/")[0])
if key_name.endswith("/b"):
name = "model.blocks.%d.feed_forward.norm.bias" % player
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
elif key_name.endswith("/g"):
name = "model.blocks.%d.feed_forward.norm.weight" % player
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
elif key_name.startswith("model/att"):
player = int(key_name[9:].split("/")[0])
if key_name.endswith("/qkv/kernel"):
state = vnp.copy() # Compute same dimension as Mesh-tensorflow using einsum
state_q = state[:, 0, :, :]
state_k = state[:, 1, :, :]
state_v = state[:, 2, :, :]
state_q = (
state_q.reshape([state_q.shape[0], state_q.shape[1] * state_q.shape[2]])
.transpose([1, 0])
.copy()
) # Mesh-Tensorflow is a diagonal matrix
state_k = (
state_k.reshape([state_k.shape[0], state_k.shape[1] * state_k.shape[2]])
.transpose([1, 0])
.copy()
) # Mesh-Tensorflow is a diagonal matrix
state_v = (
state_v.reshape([state_v.shape[0], state_v.shape[1] * state_v.shape[2]])
.transpose([1, 0])
.copy()
) # Mesh-Tensorflow is a diagonal matrix
name = "model.blocks.%d.self_attn.self_attn.q_proj.weight" % player
new_state[name] = torch.tensor(state_q)
name = "model.blocks.%d.self_attn.self_attn.k_proj.weight" % player
new_state[name] = torch.tensor(state_k)
name = "model.blocks.%d.self_attn.self_attn.v_proj.weight" % player
new_state[name] = torch.tensor(state_v)
elif key_name.endswith("/o/kernel"):
name = "model.blocks.%d.self_attn.self_attn.out_proj.weight" % player
state = (
vnp.reshape([vnp.shape[0] * vnp.shape[1], vnp.shape[2]]).transpose([1, 0]).copy()
) # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name.startswith("model/an"):
player = int(key_name[8:].split("/")[0])
if key_name.endswith("/b"):
name = "model.blocks.%d.self_attn.norm.bias" % player
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
elif key_name.endswith("/g"):
name = "model.blocks.%d.self_attn.norm.weight" % player
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
elif (
key_name.startswith("model/wte")
or key_name.startswith("model/wpe")
or key_name.startswith("model/ete")
):
nlayer = {"wte": "embed_tokens", "wpe": "position_embeddings", "ete": "extra_position_embeddings"}[
key_name[-3:]
]
name = "model.%s.weight" % nlayer
state = vnp.copy() # same in embedded
new_state[name] = torch.tensor(state)
if key_name.startswith("model/wte"):
name = "lm_head.weight"
state = vnp.copy() # same in embedded
new_state[name] = torch.tensor(state)
elif key_name.startswith("model/wob"):
name = "final_logits_bias"
state = vnp.copy() # same in embedded
state = state.reshape((1, -1))
new_state[name] = torch.tensor(state)
elif key_name == "model/dense/kernel":
name = "model.last_project.weight"
state = vnp.transpose([1, 0]).copy() # Mesh-Tensorflow is a diagonal matrix
new_state[name] = torch.tensor(state)
elif key_name == "model/dense_1/bias":
name = "model.last_project.bias"
state = vnp.copy() # same because it is one dimensional
new_state[name] = torch.tensor(state)
torch.save(new_state, args.output)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="model converter.", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--tf_model_dir", metavar="PATH", type=str, required=True, help="import model")
parser.add_argument("--output", metavar="PATH", type=str, required=True, help="output model")
args = parser.parse_args()
convert_tf_gptsan_to_pt(args)
# coding=utf-8
# Copyright 2023 Toshiyuki Sakamoto(tanreinama) and HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PyTorch GPTSANJapanese model."""
import copy
from typing import List, Optional, Tuple, Union
import torch
import torch.nn as nn
from ...activations import ACT2FN
from ...modeling_outputs import MoECausalLMOutputWithPast, MoEModelOutputWithPastAndCrossAttentions
from ...modeling_utils import PreTrainedModel
from ...utils import (
DUMMY_INPUTS,
DUMMY_MASK,
add_start_docstrings,
add_start_docstrings_to_model_forward,
is_torch_fx_proxy,
logging,
)
from .configuration_gptsan_japanese import GPTSanJapaneseConfig
logger = logging.get_logger(__name__)
_CONFIG_FOR_DOC = "GPTSanJapaneseConfig"
_CHECKPOINT_FOR_DOC = "Tanrei/GPTSAN-japanese"
####################################################
# This dict contains ids and associated url
# for the pretrained weights provided with the models
####################################################
GPTSAN_JAPANESE_PRETRAINED_MODEL_ARCHIVE_LIST = [
"Tanrei/GPTSAN-japanese",
# See all GPTSAN-japanese models at https://huggingface.co/models?filter=gptsan-japanese
]
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.router_z_loss_func
def router_z_loss_func(router_logits: torch.Tensor) -> float:
r"""
Compute the router z-loss implemented in PyTorch.
The router z-loss was introduced in [Designing Effective Sparse Expert Models](https://arxiv.org/abs/2202.08906).
It encourages router logits to remain small in an effort to improve stability.
Args:
router_logits (`float`):
Input logits of shape [batch_size, sequence_length, num_experts]
Returns:
Scalar router z-loss.
"""
num_groups, tokens_per_group, _ = router_logits.shape
log_z = torch.logsumexp(router_logits, dim=-1)
z_loss = log_z**2
return torch.sum(z_loss) / (num_groups * tokens_per_group)
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.load_balancing_loss_func
def load_balancing_loss_func(router_probs: torch.Tensor, expert_indices: torch.Tensor) -> float:
r"""
Computes auxiliary load balancing loss as in Switch Transformer - implemented in Pytorch.
See Switch Transformer (https://arxiv.org/abs/2101.03961) for more details. This function implements the loss
function presented in equations (4) - (6) of the paper. It aims at penalizing cases where the routing between
experts is too unbalanced.
Args:
router_probs (`torch.Tensor`):
Probability assigned to each expert per token. Shape: [batch_size, seqeunce_length, num_experts].
expert_indices (`torch.Tensor`):
Indices tensor of shape [batch_size, seqeunce_length] identifying the selected expert for a given token.
Returns:
The auxiliary loss.
"""
num_experts = router_probs.shape[-1]
# cast the expert indices to int64, otherwise one-hot encoding will fail
if expert_indices.dtype != torch.int64:
expert_indices = expert_indices.to(torch.int64)
if len(expert_indices.shape) == 2:
expert_indices = expert_indices.unsqueeze(2)
expert_mask = torch.nn.functional.one_hot(expert_indices, num_experts)
# For a given token, determine if it was routed to a given expert.
expert_mask = torch.max(expert_mask, axis=-2).values
# cast to float32 otherwise mean will fail
expert_mask = expert_mask.to(torch.float32)
tokens_per_group_and_expert = torch.mean(expert_mask, axis=-2)
router_prob_per_group_and_expert = torch.mean(router_probs, axis=-2)
return torch.mean(tokens_per_group_and_expert * router_prob_per_group_and_expert) * (num_experts**2)
class GPTSanJapaneseDenseActDense(nn.Module):
"""
FFN Layer for Switch Transformer and Extra layers
GPTSAN can mix Switch Transformer layers and normal Transformer layers This class is used as Expert in Switch
Transformer layers and as FFN in regular Transformer layers. RELU is used in the Switch Transformer layer, and
Swish is used in the normal Transformer layer, so there is a choice of which is used in the argument.
"""
def __init__(self, config: GPTSanJapaneseConfig, ext_layer=False):
super().__init__()
d_inter = config.d_ext if ext_layer else config.d_ff
self.wi = nn.Linear(config.d_model, d_inter, bias=ext_layer)
self.wo = nn.Linear(d_inter, config.d_model, bias=ext_layer)
self.dropout = nn.Identity() if ext_layer else nn.Dropout(config.dropout_rate)
self.act = ACT2FN["swish" if ext_layer else "relu"]
def forward(self, hidden_states):
r"""
Args:
hidden_states (`torch.Tensor`) :
[num_groups, tokens_per_group, hidden_dim] inputs to send to experts.
Returns:
torch.Tensor[num_groups, tokens_per_group, hidden_dim]
"""
hidden_states = self.wi(hidden_states)
hidden_states = self.act(hidden_states)
hidden_states = self.dropout(hidden_states)
hidden_states = self.wo(hidden_states)
return hidden_states
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.SwitchTransformersTop1Router with SwitchTransformers->GPTSanJapanese
class GPTSanJapaneseTop1Router(nn.Module):
"""
Router using tokens choose top-1 experts assignment.
This router uses the same mechanism as in Switch Transformer (https://arxiv.org/abs/2101.03961) and V-MoE
(https://arxiv.org/abs/2106.05974): tokens choose their top experts. Items are sorted by router_probs and then
routed to their choice of expert until the expert's expert_capacity is reached. **There is no guarantee that each
token is processed by an expert**, or that each expert receives at least one token.
"""
def __init__(self, config: GPTSanJapaneseConfig):
super().__init__()
self.num_experts = config.num_experts
self.expert_capacity = config.expert_capacity
self.classifier = nn.Linear(config.hidden_size, self.num_experts, bias=config.router_bias)
self.jitter_noise = config.router_jitter_noise
self.ignore_padding_tokens = config.router_ignore_padding_tokens
self.dtype = getattr(torch, config.router_dtype)
def _compute_router_probabilities(self, hidden_states: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
r"""
Computes router probabilities from input hidden states.
Args:
hidden_states (`torch.Tensor`):
(batch_size, sequence_length, hidden_dim) from which router probabilities are computed.
Returns:
router_probabilities (`torch.Tensor`):
Tensor of shape (batch_size, sequence_length, num_experts) corresponding to the probabilities for each
token and expert. Used for routing tokens to experts.
router_logits (`torch.Tensor`):
Logits tensor of shape (batch_size, sequence_length, num_experts) corresponding to raw router logits.
This is used later for computing router z-loss.
"""
# float32 is used to ensure stability. See the discussion of "selective precision" in
# https://arxiv.org/abs/2101.03961.
# We also store the previous dtype to cast back the output to the previous dtype
self.input_dtype = hidden_states.dtype
hidden_states = hidden_states.to(self.dtype)
if self.jitter_noise > 0:
# Get the lower and upper bound of the uniform distribution
# Adapted from: https://stackoverflow.com/questions/44328530/how-to-get-a-uniform-distribution-in-a-range-r1-r2-in-pytorch
distrib_lower_bound = 1.0 - self.jitter_noise
distrib_upper_bound = 1.0 + self.jitter_noise
uniform_distrib = torch.rand(hidden_states.shape, device=hidden_states.device, dtype=self.dtype)
uniform_distrib = uniform_distrib * (distrib_lower_bound - distrib_upper_bound)
uniform_distrib = uniform_distrib + distrib_upper_bound
# Multiply the token inputs by the uniform distribution - adding some noise
hidden_states *= uniform_distrib
# Shape: [num_groups, tokens_per_group, num_experts]
self._cast_classifier()
router_logits = self.classifier(hidden_states)
# Apply Softmax and cast back to the original `dtype`
router_probabilities = nn.functional.softmax(router_logits, dim=-1, dtype=self.dtype).to(self.input_dtype)
return router_probabilities, router_logits
def _cast_classifier(self):
r"""
`bitsandbytes` `Linear8bitLt` layers does not support manual casting Therefore we need to check if they are an
instance of the `Linear8bitLt` class by checking special attributes.
"""
if not (hasattr(self.classifier, "SCB") or hasattr(self.classifier, "CB")):
self.classifier = self.classifier.to(self.dtype)
def forward(self, hidden_states: torch.Tensor) -> Tuple:
r"""
Generic forward function for every Router class. Each Router expects to have the same input hidden states
(`hidden_states`) corresponding to the hidden states for each token, the `expert_capacity` corresponding to the
number of tokens the Router will send to each expert, some Routers can send up to few tokens to each expert.
Each Router works as the following: it expects the hidden states for each token, gets the `router_probs` and
`router_logits` from the `router_weights`. This will assign for each token, the raw probability to be assigned
to an expert. Then each Router class will have to define its own `_compute_routing_instructions`.
Args:
hidden_states (`torch.Tensor`) :
[num_groups, tokens_per_group, hidden_dim] inputs to send to experts.
Returns:
Tuple[`torch.Tensor`, `torch.Tensor`, `torch.Tensor`] Tuple containing the expert index, the router probs
and the router logits. The router probabilities and logits are required to compute the loss.
"""
router_probs, router_logits = self._compute_router_probabilities(hidden_states)
expert_index = torch.argmax(router_probs, dim=-1)
expert_index = torch.nn.functional.one_hot(expert_index, num_classes=self.num_experts)
# Mask tokens outside expert capacity. Sum over each sequence
token_priority = torch.cumsum(expert_index, dim=-2)
# mask if the token routed to to the expert will overflow
expert_capacity_mask = token_priority <= self.expert_capacity
expert_index = expert_index * expert_capacity_mask
router_probs = torch.max(router_probs, dim=-1).values.unsqueeze(-1)
return expert_index, router_probs, router_logits
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.SwitchTransformersSparseMLP with SwitchTransformers->GPTSanJapanese
class GPTSanJapaneseSparseMLP(nn.Module):
r"""
Implementation of the Switch Transformers Sparse MLP module.
"""
def __init__(self, config: GPTSanJapaneseConfig, expert_class: nn.Module = GPTSanJapaneseDenseActDense):
super().__init__()
# Step 1: Get the correct router according to its class
self.router = GPTSanJapaneseTop1Router(config)
# Step 2: Get the experts
self.experts = nn.ModuleDict()
for idx in range(config.num_experts):
self.experts[f"expert_{idx}"] = expert_class(config)
def forward(self, hidden_states):
r"""
Hold on, this will be slightly tricky to understand In the correct order, a MoE layer does the following:
1- Gets the `router_mask` from the router. The shape of the mask is `(batch_size, sequence_length, num_expert)`
and corresponds to the argmax of the `router_probs`. The probabilities are needed in the computation of the
hidden states : they are broadcasted to the hidden states values (can be interpreted as a scaling factor).
2- Dispatch the tokens to its associated experts. We do a classic for loop over the experts and assign for each
expert the corresponding hidden states.
"""
# Step 1: Get the router_mask from the router as wel as the probabilities
router_mask, router_probs, router_logits = self.router(hidden_states)
expert_index = torch.argmax(router_mask, dim=-1)
# The routers introduced might not always map all the tokens, to a router, which means that some hidden states
# can be unchanged from one layer to another. That is why the hidden states are cloned before updating only the seleced ones.
next_states = hidden_states.clone()
for idx, expert in enumerate(self.experts.values()):
token_indices = router_mask[:, :, idx].bool()
next_states[token_indices] = expert(hidden_states[token_indices])
hidden_states = router_probs * next_states
return hidden_states, (router_logits, expert_index)
class GPTSanJapaneseLayerSparseFF(nn.Module):
r"""
Switch Transformers Feed Forward layer module. This is a wrapper around the Mixture of Experts module.
Parameters:
config : ([`GPTSanJapaneseConfig`]): Model configuration class with all the parameters of the model.
Initializing with a config file does not load the weights associated with the model, only the
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
def __init__(self, config: GPTSanJapaneseConfig):
super().__init__()
self.mlp = GPTSanJapaneseSparseMLP(config)
self.soft_bypass_mlp = nn.Linear(config.d_model, config.d_model, bias=False)
self.norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_epsilon)
def forward(self, hidden_states, output_router_logits):
r"""
Args:
hidden_states (`torch.Tensor`) :
[num_groups, tokens_per_group, hidden_dim] inputs to send to experts.
output_router_logits (`bool`) :
output experts router output.
Returns:
torch.Tensor[num_groups, tokens_per_group, hidden_dim]
"""
forwarded_states, router_tuple = self.mlp(hidden_states)
forwarded_states += torch.tanh(self.soft_bypass_mlp(hidden_states))
output = hidden_states + self.norm(forwarded_states)
if output_router_logits and router_tuple is not None:
return output, router_tuple
else:
return output
class GPTSanJapaneseLayerDenseFF(nn.Module):
r"""
Extra Transformers Feed Forward layer module.
Parameters:
config : ([`GPTSanJapaneseConfig`]): Model configuration class with all the parameters of the model.
Initializing with a config file does not load the weights associated with the model, only the
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
def __init__(self, config: GPTSanJapaneseConfig):
super().__init__()
# Check if it is a sparse layer, if not then it is a dense layer
self.mlp = GPTSanJapaneseDenseActDense(config, ext_layer=True)
self.norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_epsilon)
def forward(self, hidden_states):
r"""
Args:
hidden_states (`torch.Tensor`) :
[num_groups, tokens_per_group, hidden_dim] inputs to send to experts.
Returns:
torch.Tensor[num_groups, tokens_per_group, hidden_dim]
"""
forwarded_states = self.mlp(hidden_states)
output = hidden_states + self.norm(forwarded_states)
return output
# Copied from transformers.models.bart.modeling_bart.BartAttention with Bart->GPTSanJapanese
class GPTSanJapaneseAttention(nn.Module):
"""Multi-headed attention from 'Attention Is All You Need' paper"""
def __init__(
self,
embed_dim: int,
num_heads: int,
dropout: float = 0.0,
is_decoder: bool = False,
bias: bool = True,
):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.dropout = dropout
self.head_dim = embed_dim // num_heads
if (self.head_dim * num_heads) != self.embed_dim:
raise ValueError(
f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}"
f" and `num_heads`: {num_heads})."
)
self.scaling = self.head_dim**-0.5
self.is_decoder = is_decoder
self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()
def forward(
self,
hidden_states: torch.Tensor,
key_value_states: Optional[torch.Tensor] = None,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
attention_mask: Optional[torch.Tensor] = None,
layer_head_mask: Optional[torch.Tensor] = None,
output_attentions: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
"""Input shape: Batch x Time x Channel"""
# if key_value_states are provided this layer is used as a cross-attention layer
# for the decoder
is_cross_attention = key_value_states is not None
bsz, tgt_len, _ = hidden_states.size()
# get query proj
query_states = self.q_proj(hidden_states) * self.scaling
# get key, value proj
# `past_key_value[0].shape[2] == key_value_states.shape[1]`
# is checking that the `sequence_length` of the `past_key_value` is the same as
# the provided `key_value_states` to support prefix tuning
if (
is_cross_attention
and past_key_value is not None
and past_key_value[0].shape[2] == key_value_states.shape[1]
):
# reuse k,v, cross_attentions
key_states = past_key_value[0]
value_states = past_key_value[1]
elif is_cross_attention:
# cross_attentions
key_states = self._shape(self.k_proj(key_value_states), -1, bsz)
value_states = self._shape(self.v_proj(key_value_states), -1, bsz)
elif past_key_value is not None:
# reuse k, v, self_attention
key_states = self._shape(self.k_proj(hidden_states), -1, bsz)
value_states = self._shape(self.v_proj(hidden_states), -1, bsz)
key_states = torch.cat([past_key_value[0], key_states], dim=2)
value_states = torch.cat([past_key_value[1], value_states], dim=2)
else:
# self_attention
key_states = self._shape(self.k_proj(hidden_states), -1, bsz)
value_states = self._shape(self.v_proj(hidden_states), -1, bsz)
if self.is_decoder:
# if cross_attention save Tuple(torch.Tensor, torch.Tensor) of all cross attention key/value_states.
# Further calls to cross_attention layer can then reuse all cross-attention
# key/value_states (first "if" case)
# if uni-directional self-attention (decoder) save Tuple(torch.Tensor, torch.Tensor) of
# all previous decoder key/value_states. Further calls to uni-directional self-attention
# can concat previous decoder key/value_states to current projected key/value_states (third "elif" case)
# if encoder bi-directional self-attention `past_key_value` is always `None`
past_key_value = (key_states, value_states)
proj_shape = (bsz * self.num_heads, -1, self.head_dim)
query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape)
key_states = key_states.view(*proj_shape)
value_states = value_states.view(*proj_shape)
src_len = key_states.size(1)
attn_weights = torch.bmm(query_states, key_states.transpose(1, 2))
if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len):
raise ValueError(
f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is"
f" {attn_weights.size()}"
)
if attention_mask is not None:
if attention_mask.size() != (bsz, 1, tgt_len, src_len):
raise ValueError(
f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}"
)
attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + attention_mask
attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
attn_weights = nn.functional.softmax(attn_weights, dim=-1)
if layer_head_mask is not None:
if layer_head_mask.size() != (self.num_heads,):
raise ValueError(
f"Head mask for a single layer should be of size {(self.num_heads,)}, but is"
f" {layer_head_mask.size()}"
)
attn_weights = layer_head_mask.view(1, -1, 1, 1) * attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
if output_attentions:
# this operation is a bit awkward, but it's required to
# make sure that attn_weights keeps its gradient.
# In order to do so, attn_weights have to be reshaped
# twice and have to be reused in the following
attn_weights_reshaped = attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
attn_weights = attn_weights_reshaped.view(bsz * self.num_heads, tgt_len, src_len)
else:
attn_weights_reshaped = None
attn_probs = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training)
attn_output = torch.bmm(attn_probs, value_states)
if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim):
raise ValueError(
f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is"
f" {attn_output.size()}"
)
attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim)
attn_output = attn_output.transpose(1, 2)
# Use the `embed_dim` from the config (stored in the class) rather than `hidden_state` because `attn_output` can be
# partitioned aross GPUs when using tensor-parallelism.
attn_output = attn_output.reshape(bsz, tgt_len, self.embed_dim)
attn_output = self.out_proj(attn_output)
return attn_output, attn_weights_reshaped, past_key_value
class GPTSanJapaneseLayerSelfAttention(nn.Module):
"""
Self Attention and Normalization Unit
"""
def __init__(self, config, has_relative_attention_bias=False):
super().__init__()
self.self_attn = GPTSanJapaneseAttention(
embed_dim=config.d_model,
num_heads=config.num_heads,
is_decoder=True,
bias=has_relative_attention_bias,
)
self.norm = nn.LayerNorm(config.d_model, eps=config.layer_norm_epsilon)
def forward(
self,
hidden_states: Optional[Tuple[torch.FloatTensor]],
past_key_value: Optional[Tuple[torch.Tensor]] = None,
attention_mask: Optional[torch.FloatTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
) -> Tuple[Union[torch.Tensor, Tuple[torch.Tensor]], ...]:
r"""
Self-attention and normalize block.
Args:
hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention
if the model is configured as a decoder.
past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up
decoding. If `past_key_values` are used, the user can optionally input only the last
`decoder_input_ids` (those that don't have their past key value states given to this model) of shape
`(batch_size, 1)` instead of all `decoder_input_ids` of shape `(batch_size, sequence_length)`.
attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used
in the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
head_mask (`numpy.ndarray` of shape `({0})`, `optional):
Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`:
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding
(see `past_key_values`).
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under
returned tensors for more detail.
Returns:
Tuple[torch.Tensor[num_groups, tokens_per_group, hidden_dim],...]
"""
# Self Attention
# decoder uni-directional self-attention cached key/values tuple is at positions 1,2
self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
# add present self-attn cache to positions 1,2 of present_key_value tuple
atten_out = self.self_attn(
hidden_states=hidden_states,
past_key_value=self_attn_past_key_value,
attention_mask=(1 - attention_mask) * torch.finfo(hidden_states.dtype).min,
layer_head_mask=head_mask,
output_attentions=output_attentions,
)
if output_attentions:
attn_weights = (atten_out[1],)
else:
attn_weights = ()
attention_output = atten_out[0]
hidden = hidden_states + self.norm(attention_output)
if use_cache:
outputs = (hidden, atten_out[2]) # hidden, present, (attentions)
else:
outputs = (hidden,) # hidden, (attentions)
return outputs + attn_weights
class GPTSanJapaneseBlock(nn.Module):
"""
Self Attention and FFN Unit
"""
def __init__(self, config, ext_layer=False):
super().__init__()
self.self_attn = GPTSanJapaneseLayerSelfAttention(config)
self.feed_forward = GPTSanJapaneseLayerDenseFF(config) if ext_layer else GPTSanJapaneseLayerSparseFF(config)
def forward(
self,
hidden_states: Optional[Tuple[torch.FloatTensor]],
past_key_value: Optional[Tuple[torch.Tensor]] = None,
attention_mask: Optional[torch.FloatTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
output_router_tuple: Optional[bool] = False,
) -> Tuple[Union[torch.Tensor, Tuple[torch.Tensor]], ...]:
r"""
GPTSAN transformer block.
Args:
hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention
if the model is configured as a decoder.
past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up
decoding. If `past_key_values` are used, the user can optionally input only the last
`decoder_input_ids` (those that don't have their past key value states given to this model) of shape
`(batch_size, 1)` instead of all `decoder_input_ids` of shape `(batch_size, sequence_length)`.
attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used
in the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
head_mask (`numpy.ndarray` of shape `({0})`, `optional):
Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`:
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding
(see `past_key_values`).
output_attentions (`bool`) :
output attention probabirities.
output_router_tuple:
output experts router logits and expert id.
Returns:
Tuple[torch.Tensor[num_groups, tokens_per_group, hidden_dim],...]
"""
atten_out = self.self_attn(
hidden_states=hidden_states,
past_key_value=past_key_value,
attention_mask=attention_mask,
head_mask=head_mask,
use_cache=use_cache,
output_attentions=output_attentions,
)
attention_output = atten_out[0]
if isinstance(self.feed_forward, GPTSanJapaneseLayerSparseFF):
sparse_out = self.feed_forward(attention_output, output_router_tuple)
if output_router_tuple:
hidden, router_tuple = sparse_out
else:
hidden = sparse_out
else:
hidden = self.feed_forward(attention_output)
outputs = (hidden,) + atten_out[1:]
if isinstance(self.feed_forward, GPTSanJapaneseLayerSparseFF) and output_router_tuple:
outputs += (router_tuple,)
return outputs
class GPTSanJapanesePreTrainedModel(PreTrainedModel):
"""
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
models.
"""
config_class = GPTSanJapaneseConfig
base_model_prefix = "gptsan_japanese"
supports_gradient_checkpointing = False
_no_split_modules = ["GPTSanJapaneseBlock"]
@property
def dummy_inputs(self):
input_ids = torch.tensor(DUMMY_INPUTS)
input_mask = torch.tensor(DUMMY_MASK)
dummy_inputs = {
"input_ids": input_ids,
"attention_mask": input_mask,
}
return dummy_inputs
def _init_weights(self, module):
"""Initialize the weights"""
factor = self.config.initializer_factor # Used for testing weights initialization
if isinstance(module, nn.LayerNorm):
module.weight.data.fill_(factor * 1.0)
module.bias.data.zero_()
elif isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=factor * ((self.config.d_model) ** -0.5))
if hasattr(module, "bias") and module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=factor * 1.0)
elif isinstance(module, GPTSanJapaneseModel):
# Mesh TensorFlow embeddings initialization
# See https://github.com/tensorflow/mesh/blob/fa19d69eafc9a482aff0b59ddd96b025c0cb207d/mesh_tensorflow/layers.py#L1624
module.embed_tokens.weight.data.normal_(mean=0.0, std=factor * 1.0)
module.position_embeddings.weight.data.normal_(mean=0.0, std=factor * 1.0)
if hasattr(module, "extra_position_embeddings") and module.extra_position_embeddings is not None:
module.extra_position_embeddings.weight.data.normal_(mean=0.0, std=factor * 1.0)
elif isinstance(module, (GPTSanJapaneseModel, GPTSanJapaneseForConditionalGeneration)):
# Mesh TensorFlow embeddings initialization
# See https://github.com/tensorflow/mesh/blob/fa19d69eafc9a482aff0b59ddd96b025c0cb207d/mesh_tensorflow/layers.py#L1624
module.final_logits_bias.data.normal_(mean=0.0, std=factor * 1.0)
if hasattr(module, "lm_head") and not self.config.tie_word_embeddings:
module.lm_head.weight.data.normal_(mean=0.0, std=factor * 1.0)
elif isinstance(module, GPTSanJapaneseDenseActDense):
# Mesh TensorFlow FF initialization
# See https://github.com/tensorflow/mesh/blob/master/mesh_tensorflow/transformer/transformer_layers.py#L56
# and https://github.com/tensorflow/mesh/blob/fa19d69eafc9a482aff0b59ddd96b025c0cb207d/mesh_tensorflow/layers.py#L89
module.wi.weight.data.normal_(mean=0.0, std=factor * ((self.config.d_model) ** -0.5))
if hasattr(module.wi, "bias") and module.wi.bias is not None:
module.wi.bias.data.zero_()
module.wo.weight.data.normal_(mean=0.0, std=factor * ((self.config.d_ff) ** -0.5))
if hasattr(module.wo, "bias") and module.wo.bias is not None:
module.wo.bias.data.zero_()
elif isinstance(module, GPTSanJapaneseAttention):
# Multi-headed attention
d_model = self.config.d_model
key_value_proj_dim = self.config.d_model
n_heads = self.config.num_heads
module.k_proj.weight.data.normal_(mean=0.0, std=factor * ((d_model * key_value_proj_dim) ** -0.5))
module.v_proj.weight.data.normal_(mean=0.0, std=factor * ((d_model * key_value_proj_dim) ** -0.5))
module.q_proj.weight.data.normal_(mean=0.0, std=factor * ((d_model * key_value_proj_dim) ** -0.5))
module.out_proj.weight.data.normal_(mean=0.0, std=factor * ((n_heads * key_value_proj_dim) ** -0.5))
elif isinstance(module, GPTSanJapaneseSparseMLP):
# Mesh TensorFlow attention initialization to avoid scaling before softmax
# See https://github.com/tensorflow/mesh/blob/fa19d69eafc9a482aff0b59ddd96b025c0cb207d/mesh_tensorflow/transformer/attention.py#L136
d_model = self.config.d_model
key_value_proj_dim = self.config.d_model
n_heads = self.config.num_heads
module.router.classifier.weight.data.normal_(mean=0.0, std=factor * 1)
for idx in range(self.config.num_experts):
module.experts[f"expert_{idx}"].wi.weight.data.normal_(mean=0.0, std=factor * (d_model**-0.5))
module.experts[f"expert_{idx}"].wo.weight.data.normal_(mean=0.0, std=factor * (d_model**-0.5))
def _set_gradient_checkpointing(self, module, value=False):
if isinstance(module, (GPTSanJapaneseAttention,)):
module.gradient_checkpointing = value
# Copied from transformers.models.t5.modeling_t5.T5PreTrainedModel._shift_right
def _shift_right(self, input_ids):
decoder_start_token_id = self.config.decoder_start_token_id
pad_token_id = self.config.pad_token_id
assert decoder_start_token_id is not None, (
"self.model.config.decoder_start_token_id has to be defined. In T5 it is usually set to the pad_token_id."
" See T5 docs for more information"
)
# shift inputs to the right
if is_torch_fx_proxy(input_ids):
# Item assignment is not supported natively for proxies.
shifted_input_ids = torch.full(input_ids.shape[:-1] + (1,), decoder_start_token_id)
shifted_input_ids = torch.cat([shifted_input_ids, input_ids[..., :-1]], dim=-1)
else:
shifted_input_ids = input_ids.new_zeros(input_ids.shape)
shifted_input_ids[..., 1:] = input_ids[..., :-1].clone()
shifted_input_ids[..., 0] = decoder_start_token_id
assert pad_token_id is not None, "self.model.config.pad_token_id has to be defined."
# replace possible -100 values in labels by `pad_token_id`
shifted_input_ids.masked_fill_(shifted_input_ids == -100, pad_token_id)
return shifted_input_ids
GPTSAN_JAPANESE_START_DOCSTRING = r"""
The [GPTSAN-japanese](https://github.com/tanreinama/GPTSAN) model was proposed in General-purpose Swich transformer
based Japanese language model
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
and behavior.
Parameters:
config ([`GPTSanJapaneseConfig`]): Model configuration class with all the parameters of the model.
Initializing with a config file does not load the weights associated with the model, only the
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
GPTSAN_JAPANESE_INPUTS_DOCSTRING = r"""
Args:
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
Indices of input sequence tokens in the vocabulary. GPTSAN-japanese is a model that generates sentence
continuations or predicts tokens at mask positions. Special tokens required for inputs to the model are
automatically appended.
attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
token_type_ids (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
An input that masks the Prefix part in the Prefix-LM input. Mask values selected in `[0, 1]`:
- 1 for tokens that are **prefix** input,
- 0 for tokens that are **not-prefix** input.
spout (`torch.Tensor` of shape `(batch_size, config.d_spout)`):
This vector is transformed through an 8-layer FFN and can be used instead of `past_key_values`.
past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding.
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that
don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all
`decoder_input_ids` of shape `(batch_size, sequence_length)`.
head_mask (`torch.FloatTensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*):
Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`:
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
`past_key_values`).
inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
model's internal embedding lookup matrix.
decoder_inputs_embeds (`torch.FloatTensor` of shape `(batch_size, target_sequence_length, hidden_size)`, *optional*):
Optionally, instead of passing `decoder_input_ids` you can choose to directly pass an embedded
representation. If `past_key_values` is used, optionally only the last `decoder_inputs_embeds` have to be
input (see `past_key_values`). This is useful if you want more control over how to convert
`decoder_input_ids` indices into associated vectors than the model's internal embedding lookup matrix.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
router_logits (`tuple(torch.FloatTensor)`, *optional*, returned when `output_router_logits=True` is passed or when `config.add_router_probs=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, sequence_length, num_experts)`.
Router logits of the decoder model, useful to compute the auxiliary loss for Mixture of Experts models.
"""
@add_start_docstrings(
"The bare GPTSAN-japanese Model transformer outputting raw hidden-states without any specific head on top.",
GPTSAN_JAPANESE_START_DOCSTRING,
)
class GPTSanJapaneseModel(GPTSanJapanesePreTrainedModel):
def __init__(self, config: GPTSanJapaneseConfig):
super().__init__(config)
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.d_model)
self.config = copy.deepcopy(config)
self.embed_tokens = nn.Embedding(config.vocab_size, config.d_model)
self.last_project = nn.Linear(config.d_model, config.d_model, bias=True)
self.act = ACT2FN["swish"]
self.blocks = torch.nn.ModuleList([])
for _ in range(config.num_switch_layers):
self.blocks.append(GPTSanJapaneseBlock(config))
for _ in range(config.num_ext_layers):
self.blocks.append(GPTSanJapaneseBlock(config, ext_layer=True))
if config.num_ext_layers > 0:
self.extra_position_embeddings = nn.Embedding(config.max_position_embeddings, config.d_model)
if config.d_spout:
spouts = []
for _ in range(8):
spouts.append(nn.Linear(config.d_spout, config.d_spout, bias=False))
spouts.append(nn.Tanh())
spouts.append(nn.Linear(config.d_spout, config.num_layers * 2 * config.d_model, bias=False))
self.spout = nn.Sequential(*spouts)
self.post_init()
def get_input_embeddings(self):
return self.embed_tokens
def set_input_embeddings(self, new_embeddings):
self.embed_tokens = new_embeddings
@add_start_docstrings_to_model_forward(GPTSAN_JAPANESE_INPUTS_DOCSTRING)
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.FloatTensor] = None,
spout: Optional[torch.FloatTensor] = None,
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
head_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
inputs_embeds: Optional[torch.FloatTensor] = None,
decoder_inputs_embeds: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
output_router_logits: Optional[bool] = None,
num_precontext: Optional[torch.LongTensor] = None,
) -> Union[MoEModelOutputWithPastAndCrossAttentions, Tuple[torch.FloatTensor]]:
r"""
num_precontext (`torch.LongTensor` of shape `(batch_size,1)`):
length of `hybrid` input tokens in the input. Tokens up to this length refer to both front and back like
BERT, tokens after that refer only to front like GPT. see also:
https://github.com/tanreinama/GPTSAN/blob/main/report/model.md
Returns:
`MoEModelOutputWithPastAndCrossAttentions` or `tuple` if `return_dict` returns
MoEModelOutputWithPastAndCrossAttentions insted of tuple
"""
return_dict = return_dict if return_dict is not None else self.config.return_dict
device = self.position_embeddings.weight.device
if input_ids is None:
input_ids = torch.zeros([1, 1]).int().to(device) # dummy for input_ids was None
num_pasts_contexts = 0
num_batch = input_ids.shape[0]
pasts_or_spout_value = None
if past_key_values is not None:
num_pasts_contexts = past_key_values[0][0].shape[2]
elif self.config.d_spout and spout is not None:
# `spout` is a special input vector specific to GPTSAN
# This controls the output by projecting embedded information such as the class of sentences during learning.
# It should passed instead of the first past_key_value.
# See the original GPTSAN repository for details
num_pasts_contexts += 1
# If there is an attention_mask, increase first one for spout
if self.config.d_spout and spout is not None and attention_mask is not None:
attention_mask_with_spout = torch.ones(num_batch, attention_mask.shape[1] + 1, device=device)
attention_mask_with_spout[:, 1:] -= 1 - attention_mask # 1st token should be spout
attention_mask = attention_mask_with_spout # update attention_mask
if num_precontext is not None:
# `num_precontext` is the number of tokens that refer to each other in prefix-lm
# created per batch, so dimension of num_precontext should be [batch, 1]
if not (
len(num_precontext.shape) == 2 and num_precontext.shape[1] == 1
): # num_precontext Should be [batch,1]
raise ValueError("num_precontext should be [batch, 1] size.")
num_precontext = torch.reshape(num_precontext, [-1])
else:
num_precontext = torch.zeros([num_batch]).int().to(device)
num_input_contexts = input_ids.shape[1]
num_output_contexts = num_input_contexts + num_pasts_contexts
hidden_states = self.embed_tokens(input_ids)
if past_key_values is not None:
pasts_or_spout_value = past_key_values
elif self.config.d_spout and spout is not None:
# Make vector from `spout` of GPTSAN to the same shape as past_key_values
pasts_or_spout_value = self.spout(spout) # projecting `spout` vector
pasts_or_spout_value = torch.reshape(
pasts_or_spout_value,
[
num_batch,
self.config.num_layers,
2,
self.config.num_heads,
num_pasts_contexts,
self.config.d_model // self.config.num_heads,
],
)
pasts_or_spout_value = torch.split(pasts_or_spout_value, [1] * self.config.num_layers, dim=1)
# make same shape as past_key_values
pasts_or_spout_value = tuple(
tuple([b.squeeze(1) for b in torch.split(a.squeeze(1), [1, 1], dim=1)]) for a in pasts_or_spout_value
)
else:
pasts_or_spout_value = [None] * self.config.num_layers
# Token position considering spout and pasts
token_position = torch.arange(num_input_contexts).to(device) + num_pasts_contexts
if attention_mask is None:
attention_mask = torch.ones(num_batch, num_input_contexts, device=device)
# positions for get position_embeddings
gather_position = (
(
torch.zeros((num_batch, self.config.d_model, num_input_contexts)).to(device)
+ token_position.unsqueeze(0)
)
.transpose(1, 2)
.long()
)
# When padding with padding_side="left", zeros line up on the left side of attention_mask, so position_embeddings is shifted accordingly
gather_position -= (1 - attention_mask).argmin(dim=-1).unsqueeze(1).unsqueeze(2)
gather_position = torch.clip(gather_position, num_pasts_contexts, self.config.max_position_embeddings - 1)
# attention_mask is applied per batch
for i in range(num_batch):
hidden_states[i] += torch.gather(self.position_embeddings.weight, dim=0, index=gather_position[i])
# Create a mask to be used when making the prefix Input length of Prefix-LM variable
causal_mask = (
torch.tril(torch.ones((num_output_contexts, num_output_contexts), dtype=torch.uint8))
.view(1, 1, num_output_contexts, num_output_contexts)
.to(device)
)
prefix_lm_mask = causal_mask[:, :, -num_input_contexts:, :]
if token_type_ids is not None:
token_type_ids = token_type_ids.unsqueeze(1).unsqueeze(2)
prefix_lm_mask = ((prefix_lm_mask + token_type_ids) > 0).float()
# Marge prefix_lm_mask and attention_mask
extended_attention_mask = prefix_lm_mask * attention_mask.unsqueeze(1).unsqueeze(2)
# Prepare head mask if needed
if head_mask is not None:
head_mask = self.get_head_mask(
head_mask, self.config.num_switch_layers + self.config.num_ext_layers
) # n_layer x batch x n_heads x N x N
# outputs
present_key_value_states = tuple() if self.config.use_cache or use_cache else None
all_hidden_states = () if self.config.output_hidden_states or output_hidden_states else None
all_attentions = () if self.config.output_attentions or output_attentions else None
all_router_probs = () if self.config.output_router_logits or output_router_logits else None
for layer, past in enumerate(pasts_or_spout_value):
if layer == self.config.num_switch_layers:
if self.config.num_ext_layers > 0:
# extra_position_embeddings are extra position embeddings that are only created when extending the model with code from the original GPTSAN repository. Not used in the default model.
# However, it is created when you create an additional layer and partially train only that location.
# Therefore, convert_gptsan_tf_checkpoint_to_pytorch.py is used when converting and loading models created in the original GPTSAN repository.
for i in range(num_batch):
hidden_states[i] += torch.gather(
self.extra_position_embeddings.weight, dim=0, index=gather_position[i]
)
output_router_tuple = (
self.config.output_router_logits or output_router_logits
) and layer < self.config.num_switch_layers
block_output = self.blocks[layer](
hidden_states=hidden_states,
past_key_value=past,
attention_mask=extended_attention_mask,
head_mask=head_mask,
use_cache=self.config.use_cache or use_cache,
output_attentions=self.config.output_attentions or output_attentions,
output_router_tuple=output_router_tuple,
)
outpos = 0
hidden_states = block_output[outpos]
if self.config.output_hidden_states or output_hidden_states:
all_hidden_states += (hidden_states,)
if self.config.use_cache or use_cache:
outpos += 1
present = block_output[outpos]
present_key_value_states += (present,)
if self.config.output_attentions or output_attentions:
outpos += 1
attention_probs = block_output[outpos]
all_attentions += (attention_probs,)
if output_router_tuple:
outpos += 1
router_tuple = block_output[outpos]
all_router_probs.append(router_tuple[0])
hidden_states = self.last_project(hidden_states)
hidden_states = self.act(hidden_states)
if self.config.output_hidden_states or output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
if not return_dict:
return tuple(
v
for v in [
hidden_states,
present_key_value_states,
all_hidden_states,
all_attentions,
all_router_probs,
]
if v is not None
)
return MoEModelOutputWithPastAndCrossAttentions(
last_hidden_state=hidden_states,
past_key_values=present_key_value_states,
hidden_states=all_hidden_states,
attentions=all_attentions,
router_probs=all_router_probs,
)
@add_start_docstrings(
"The bare GPTSAN-japanese Model with a language modeling head.",
GPTSAN_JAPANESE_START_DOCSTRING,
)
class GPTSanJapaneseForConditionalGeneration(GPTSanJapanesePreTrainedModel):
_keys_to_ignore_on_load_missing = [r"lm_head.weight"]
def __init__(self, config: GPTSanJapaneseConfig):
super().__init__(config)
self.model = GPTSanJapaneseModel(config)
self.register_buffer("final_logits_bias", torch.zeros([1, config.vocab_size]))
self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias=False)
if not self.config.torchscript:
self.lm_head.weight = self.model.embed_tokens.weight
@add_start_docstrings_to_model_forward(GPTSAN_JAPANESE_INPUTS_DOCSTRING)
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.FloatTensor] = None,
spout: Optional[torch.FloatTensor] = None,
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
head_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
inputs_embeds: Optional[torch.FloatTensor] = None,
decoder_inputs_embeds: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
output_router_logits: Optional[bool] = None,
labels: Optional[torch.LongTensor] = None,
) -> Union[Tuple[torch.FloatTensor], MoECausalLMOutputWithPast]:
r"""
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for computing the sequence classification loss. Indices should be in `[-100, 0, ...,
config.vocab_size - 1]`. All labels set to `-100` are ignored (masked), the loss is only computed for
labels in `[0, ..., config.vocab_size]`
Returns:
`MoECausalLMOutputWithPast` or `tuple` if `return_dict` returns MoECausalLMOutputWithPast insted of tuple
Example:
Text Generation with regular LM Model
```python
>>> from transformers import AutoModel, AutoTokenizer, trainer_utils
>>> device = "cuda"
>>> model = AutoModel.from_pretrained("Tanrei/GPTSAN-japanese").to(device)
>>> tokenizer = AutoTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
>>> x_token = tokenizer("織田信長は、", return_tensors="pt")
>>> trainer_utils.set_seed(30)
>>> input_ids = x_token.input_ids.to(device)
>>> gen_token = model.generate(input_ids, max_new_tokens=50)
>>> tokenizer.decode(gen_token[0])
"織田信長は、政治・軍事の中枢まで掌握した政治家であり、日本史上類を見ない驚異的な軍事侵攻を続け..."
```
Text Generation with Prefix-LM Model
```python
>>> from transformers import AutoModel, AutoTokenizer, trainer_utils
>>> device = "cuda"
>>> model = AutoModel.from_pretrained("Tanrei/GPTSAN-japanese").to(device)
>>> tokenizer = AutoTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
>>> x_token = tokenizer("", prefix_text="織田信長は、", return_tensors="pt")
>>> trainer_utils.set_seed(30)
>>> input_ids = x_token.input_ids.to(device)
>>> token_type_ids = x_token.token_type_ids.to(device)
>>> gen_token = model.generate(input_ids, token_type_ids=token_type_ids, max_new_tokens=50)
>>> tokenizer.decode(gen_token[0])
"織田信長は、政治・外交で数々の戦果を上げるが、1568年からは、いわゆる本能寺の変で細川晴元に暗殺される..."
```
Simultaneously Text Generation And Masked Language Model
```python
>>> from transformers import AutoModel, AutoTokenizer, trainer_utils
>>> device = "cuda"
>>> model = AutoModel.from_pretrained("Tanrei/GPTSAN-japanese").to(device)
>>> tokenizer = AutoTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
>>> masked_sentence = "武田信玄は、<|inputmask|>時代ファンならぜひ押さえ<|inputmask|>きたい名将の一人。"
>>> x_token = tokenizer("", prefix_text=masked_sentence, return_tensors="pt")
>>> trainer_utils.set_seed(30)
>>> input_ids = x_token.input_ids.to(device)
>>> token_type_ids = x_token.token_type_ids.to(device)
>>> out_lm_token = model.generate(input_ids, token_type_ids=token_type_ids, max_new_tokens=50)
>>> out_mlm_token = model(input_ids, token_type_ids=token_type_ids).logits.argmax(axis=-1)
>>> tokenizer.decode(out_mlm_token[0])
"武田信玄は、戦国時代ファンならぜひ押さえておきたい名将の一人。"
>>> tokenizer.decode(out_lm_token[0][input_ids.shape[1] :])
"武田氏の三代に渡った武田家のひとり\n甲斐市に住む、日本史上最大の戦国大名。..."
```"""
SEG_TOKEN = self.config.separator_token_id
use_cache = use_cache or self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
model_return_dict = True
num_precontext = None
if input_ids is not None:
num_batch = input_ids.shape[0]
num_precontext = torch.zeros([num_batch]).int().to(input_ids.device)
where_separators = torch.where(input_ids == SEG_TOKEN)
num_precontext[where_separators[0]] += where_separators[1]
num_precontext = num_precontext.unsqueeze(1)
outputs = self.model(
input_ids,
attention_mask,
token_type_ids,
spout,
past_key_values,
head_mask,
use_cache,
inputs_embeds,
decoder_inputs_embeds,
output_attentions,
output_hidden_states,
model_return_dict,
output_router_logits,
num_precontext,
)
lm_logits = self.lm_head(outputs[0])
if lm_logits.shape[-1] == self.final_logits_bias.shape[-1]:
lm_logits = lm_logits + self.final_logits_bias
loss = None
z_loss = None
router_probs = None
aux_loss = None
if labels is not None:
loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
if output_router_logits:
# Compute the router loss (z_loss + auxiliary loss) for each router in the encoder and decoder
router_logits, expert_indexes = self._unpack_router_logits(outputs.router_probs)
z_loss = router_z_loss_func(router_logits)
router_probs = nn.Softmax(dim=-1)(router_logits)
aux_loss = load_balancing_loss_func(router_probs, expert_indexes)
loss = loss_fct(lm_logits.view(-1, lm_logits.size(-1)), labels.view(-1))
if not return_dict:
return tuple(
v
for v in [
loss,
lm_logits,
outputs.past_key_values,
outputs.hidden_states,
outputs.router_probs,
z_loss,
aux_loss,
]
if v is not None
)
return MoECausalLMOutputWithPast(
loss=loss,
logits=lm_logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
router_logits=outputs.router_probs,
z_loss=z_loss,
aux_loss=aux_loss,
)
def prepare_inputs_for_generation(
self,
input_ids: torch.LongTensor,
attention_mask: torch.FloatTensor,
token_type_ids: Optional[torch.FloatTensor] = None,
spout: Optional[Union[List, torch.FloatTensor]] = None,
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
**kwargs,
):
if type(spout) is list:
spout = torch.tensor(spout).float()
if input_ids is not None:
spout = spout.to(input_ids.device)
if past_key_values is not None:
return {
"input_ids": input_ids[:, -1:] if input_ids is not None else None,
"attention_mask": attention_mask,
"token_type_ids": token_type_ids[:, -1:] if token_type_ids is not None else None,
"spout": spout,
"past_key_values": past_key_values,
}
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"token_type_ids": token_type_ids,
"spout": spout,
"past_key_values": None,
}
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.SwitchTransformersForConditionalGeneration.prepare_decoder_input_ids_from_labels with SwitchTransformers->GPTSanJapanese
def prepare_decoder_input_ids_from_labels(self, labels: torch.Tensor):
return self._shift_right(labels)
# Copied from transformers.models.mbart.modeling_mbart.MBartForConditionalGeneration.resize_token_embeddings with MBart->GPTSanJapanese
def resize_token_embeddings(self, new_num_tokens: int) -> nn.Embedding:
new_embeddings = super().resize_token_embeddings(new_num_tokens)
self._resize_final_logits_bias(new_num_tokens)
return new_embeddings
# Copied from transformers.models.mbart.modeling_mbart.MBartForConditionalGeneration._resize_final_logits_bias with MBart->GPTSanJapanese
def _resize_final_logits_bias(self, new_num_tokens: int) -> None:
old_num_tokens = self.final_logits_bias.shape[-1]
if new_num_tokens <= old_num_tokens:
new_bias = self.final_logits_bias[:, :new_num_tokens]
else:
extra_bias = torch.zeros((1, new_num_tokens - old_num_tokens), device=self.final_logits_bias.device)
new_bias = torch.cat([self.final_logits_bias, extra_bias], dim=1)
self.register_buffer("final_logits_bias", new_bias)
def get_input_embeddings(self):
return self.model.get_input_embeddings()
def set_input_embeddings(self, new_embeddings):
self.model.set_input_embeddings(new_embeddings)
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.SwitchTransformersForConditionalGeneration.set_output_embeddings with SwitchTransformers->GPTSanJapanese
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.SwitchTransformersForConditionalGeneration.get_output_embeddings with SwitchTransformers->GPTSanJapanese
def get_output_embeddings(self):
return self.lm_head
# Copied from transformers.models.switch_transformers.modeling_switch_transformers.SwitchTransformersForConditionalGeneration._unpack_router_logits with SwitchTransformers->GPTSanJapanese
def _unpack_router_logits(self, router_outputs):
total_router_logits = []
total_expert_indexes = []
for router_output in router_outputs:
if router_output[0] is not None:
router_logits, expert_indexes = router_output
total_router_logits.append(router_logits)
total_expert_indexes.append(expert_indexes)
return torch.cat(total_router_logits, dim=1), torch.cat(total_expert_indexes, dim=1)
# coding=utf-8
# Copyright 2023 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tokenization classes for GPTSANJapanese."""
import collections
import json
import os
import re
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
import numpy as np
from ...tokenization_utils import PreTrainedTokenizer
from ...tokenization_utils_base import (
BatchEncoding,
PreTokenizedInput,
PreTokenizedInputPair,
TextInput,
TextInputPair,
TruncationStrategy,
)
from ...utils import PaddingStrategy, logging
if TYPE_CHECKING:
from transformers.pipelines.conversational import Conversation
logger = logging.get_logger(__name__)
VOCAB_FILES_NAMES = {"vocab_file": "vocab.txt", "emoji_file": "emoji.json"}
PRETRAINED_VOCAB_FILES_MAP = {
"vocab_file": {
"Tanrei/GPTSAN-japanese": "https://huggingface.co/Tanrei/GPTSAN-japanese/blob/main/vocab.txt",
},
"emoji_file": {
"Tanrei/GPTSAN-japanese": "https://huggingface.co/Tanrei/GPTSAN-japanese/blob/main/emoji.json",
},
}
PRETRAINED_POSITIONAL_EMBEDDINGS_SIZES = {
"Tanrei/GPTSAN-japanese": 1280,
}
# Copied from transformers.models.gpt_neox_japanese.tokenization_gpt_neox_japanese.load_vocab_and_emoji
def load_vocab_and_emoji(vocab_file, emoji_file):
"""Loads a vocabulary file and emoji file into a dictionary."""
with open(emoji_file, "r", encoding="utf-8") as f:
emoji = json.loads(f.read())
vocab = collections.OrderedDict()
raw_vocab = collections.OrderedDict()
ids_to_tokens = collections.OrderedDict()
with open(vocab_file, "r", encoding="utf-8") as f:
token = f.readlines()
token = [[t.rstrip("\n")] if (t == "," or "," not in t) else t.rstrip("\n").split(",") for t in token]
for idx, b in enumerate(token):
ids_to_tokens[idx] = b
raw_vocab[",".join(b)] = idx
for wd in b:
vocab[wd] = idx
return vocab, raw_vocab, ids_to_tokens, emoji
class GPTSanJapaneseTokenizer(PreTrainedTokenizer):
"""
This tokenizer is based on GPTNeoXJapaneseTokenizer and has the following modifications
- Decoding byte0~byte255 tokens correctly
- Added bagofword token handling
- Return token_type_ids for Prefix-LM model
The bagofword token represents a repetition of the previous token and is converted to 3 consecutive tokens when
decoding In addition, the original Japanese special Sub-Word-Encoding has been released in this repository
(https://github.com/tanreinama/Japanese-BPEEncoder_V2). The token_type_ids is a mask indicating the prefix input
position of the Prefix-LM model. To specify a prefix position, specify a prefix input for prefix_text, or specify a
sentence of the prefix part and the part after it as a text pair of batch input.
Example:
```python
>>> from transformers import GPTSanJapaneseTokenizer
>>> tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
>>> # You can confirm both 慶応 and 慶應 are encoded to 17750
>>> tokenizer("吾輩は猫である🐯。実は慶応(慶應)大学出身")["input_ids"]
[34347, 31459, 30647, 31448, 25, 30659, 35729, 35676, 32417, 30647, 17750, 35589, 17750, 35590, 321, 1281]
>>> # Both 慶応 and 慶應 are decoded to 慶応
>>> tokenizer.decode(tokenizer("吾輩は猫である🐯。実は慶応(慶應)大学出身")["input_ids"])
'吾輩は猫である🐯。実は慶応(慶応)大学出身'
```
Example for Prefix-LM:
```python
>>> from transformers import GPTSanJapaneseTokenizer
>>> tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
>>> tokenizer("実は慶応(慶應)大学出身", prefix_text="吾輩は猫である🐯。")["input_ids"]
[35993, 34347, 31459, 30647, 31448, 25, 30659, 35729, 35676, 35998, 32417, 30647, 17750, 35589, 17750, 35590, 321, 1281]
>>> # Mask for Prefix-LM inputs
>>> tokenizer("実は慶応(慶應)大学出身", prefix_text="吾輩は猫である🐯。")["token_type_ids"]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
```
Example for batch encode:
```python
>>> from transformers import GPTSanJapaneseTokenizer
>>> tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
>>> tokenizer([["武田信玄", "は、"], ["織田信長", "の配下の、"]], padding=True)["input_ids"]
[[35993, 8640, 25948, 35998, 30647, 35675, 35999, 35999], [35993, 10382, 9868, 35998, 30646, 9459, 30646, 35675]]
>>> # Mask for Prefix-LM inputs
>>> tokenizer([["武田信玄", "は、"], ["織田信長", "の配下の、"]], padding=True)["token_type_ids"]
[[1, 1, 1, 0, 0, 0, 0, 0], [1, 1, 1, 0, 0, 0, 0, 0]]
>>> # Mask for padding
>>> tokenizer([["武田信玄", "は、"], ["織田信長", "の配下の、"]], padding=True)["attention_mask"]
[[1, 1, 1, 1, 1, 1, 0, 0], [1, 1, 1, 1, 1, 1, 1, 1]]
```
Args:
vocab_file (`str`):
File containing the vocabulary.
emoji_file (`str`):
File containing the emoji.
unk_token (`str`, *optional*, defaults to `"<|nottoken|>"`):
The token used for unknown charactor
pad_token (`str`, *optional*, defaults to `"<|separator|>"`):
The token used for padding
bos_token (`str`, *optional*, defaults to `"<|startoftext|>""`):
The beginning of sequence token.
eos_token (`str`, *optional*, defaults to `"<|endoftext|>"`):
The end of sequence token.
sep_token (`str`, *optional*, defaults to `"<|segmenter|>"`):
A special token to separate token to prefix part and general input part.
do_clean_text (`bool`, *optional*, defaults to `False`):
Whether or not to clean text for URL, EMAIL, TEL, Japanese DATE and Japanese PRICE.
"""
vocab_files_names = VOCAB_FILES_NAMES
pretrained_vocab_files_map = PRETRAINED_VOCAB_FILES_MAP
max_model_input_sizes = PRETRAINED_POSITIONAL_EMBEDDINGS_SIZES
model_input_names = ["input_ids", "attention_mask", "token_type_ids"]
def __init__(
self,
vocab_file,
emoji_file,
unk_token="<|nottoken|>",
pad_token="<|separator|>",
bos_token="<|startoftext|>",
eos_token="<|endoftext|>",
sep_token="<|segmenter|>",
do_clean_text=False,
**kwargs,
):
super().__init__(
unk_token=unk_token,
pad_token=pad_token,
bos_token=bos_token,
eos_token=eos_token,
sep_token=sep_token,
do_clean_text=do_clean_text,
**kwargs,
)
if not os.path.isfile(vocab_file):
raise ValueError(
f"Can't find a vocabulary file at path '{vocab_file}'. To load the vocabulary from a Google pretrained"
" model use `tokenizer = GPTSanJapaneseTokenizer.from_pretrained(PRETRAINED_MODEL_NAME)`"
)
if not os.path.isfile(emoji_file):
raise ValueError(
f"Can't find a emoji file at path '{emoji_file}'. To load the emoji information from a Google"
" pretrained model use `tokenizer = GPTSanJapaneseTokenizer.from_pretrained(PRETRAINED_MODEL_NAME)`"
)
self.do_clean_text = do_clean_text
self.vocab, self.raw_vocab, self.ids_to_tokens, self.emoji = load_vocab_and_emoji(vocab_file, emoji_file)
self.subword_tokenizer = SubWordJapaneseTokenizer(
vocab=self.vocab, ids_to_tokens=self.ids_to_tokens, emoji=self.emoji
)
@property
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer.vocab_size
def vocab_size(self):
# self.vocab contains support for character fluctuation unique to Japanese, and has a large number of vocab
return len(self.raw_vocab)
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer.get_vocab
def get_vocab(self):
return dict(self.raw_vocab, **self.added_tokens_encoder)
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer._tokenize
def _tokenize(self, text):
return self.subword_tokenizer.tokenize(text, clean=self.do_clean_text)
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer._convert_token_to_id
def _convert_token_to_id(self, token):
"""Converts a token (str) in an id using the vocab."""
return self.vocab.get(token, self.vocab.get(self.unk_token))
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer._convert_id_to_token
def _convert_id_to_token(self, index):
"""Converts an index (integer) in a token (str) using the vocab."""
return self.subword_tokenizer.convert_id_to_token(index)
def convert_tokens_to_string(self, tokens):
"""Converts a sequence of tokens (string) in a single string."""
words = []
byte_tokens = []
for word in tokens:
if word[:6] == "<|byte" and word[-2:] == "|>":
byte_tokens.append(int(word[6:-2]))
else:
if len(byte_tokens) > 0:
words.append(bytearray(byte_tokens).decode("utf-8", errors="replace"))
byte_tokens = []
if word[:7] == "<|emoji" and word[-2:] == "|>":
words.append(self.emoji["emoji_inv"][word])
elif word == "<SP>":
words.append(" ")
elif word == "<BR>":
words.append("\n")
elif word == "<TAB>":
words.append("\t")
elif word == "<BLOCK>":
words.append("▀")
elif word == "<KIGOU>":
words.append("ǀ")
elif word == "<U2000U2BFF>":
words.append("‖")
elif word == "<|bagoftoken|>":
if len(words) > 0:
words.append(words[-1])
words.append(words[-1])
words.append(words[-1])
elif word.startswith("<|") and word.endswith("|>"):
words.append("")
else:
words.append(word)
if len(byte_tokens) > 0:
words.append(bytearray(byte_tokens).decode("utf-8", errors="replace"))
text = "".join(words)
return text
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer._build_conversation_input_ids
def _build_conversation_input_ids(self, conversation: "Conversation") -> List[int]:
"""This corresponds to DialoGPT variants of models."""
input_ids = []
for is_user, text in conversation.iter_texts():
input_ids.extend(self.encode(text, add_special_tokens=False) + [self.eos_token_id])
if len(input_ids) > self.model_max_length:
input_ids = input_ids[-self.model_max_length :]
return input_ids
# Copied from tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizer.save_vocabulary
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]:
index = 0
if os.path.isdir(save_directory):
vocab_file = os.path.join(
save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["vocab_file"]
)
emoji_file = os.path.join(
save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["emoji_file"]
)
else:
vocab_file = (
(filename_prefix + "-" if filename_prefix else "") + save_directory + VOCAB_FILES_NAMES["vocab_file"]
)
emoji_file = (
(filename_prefix + "-" if filename_prefix else "") + save_directory + VOCAB_FILES_NAMES["emoji_file"]
)
with open(vocab_file, "w", encoding="utf-8") as writer:
for token_index, token in self.ids_to_tokens.items():
if index != token_index:
logger.warning(
f"Saving vocabulary to {vocab_file}: vocabulary indices are not consecutive."
" Please check that the vocabulary is not corrupted!"
)
index = token_index
writer.write(",".join(token) + "\n")
index += 1
with open(emoji_file, "w", encoding="utf-8") as writer:
json.dump(self.emoji, writer)
return vocab_file, emoji_file
def create_token_type_ids_from_sequences(
self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None
) -> List[int]:
# docstyle-ignore
"""
The tokenizer returns token_type_ids as separators between the Prefix part and the rest.
token_type_ids is 1 for the Prefix part and 0 for the rest of the token.
Example:
```python
>>> x_token = tokenizer("アイウエ")
>>> # input_ids: | SOT | SEG | ア | イ | ウ | エ |
>>> # token_type_ids: | 1 | 0 | 0 | 0 | 0 | 0 |
>>> x_token = tokenizer("", prefix_text="アイウエ")
>>> # input_ids: | SOT | ア | イ | ウ | エ | SEG |
>>> # token_type_ids: | 1 | 1 | 1 | 1 | 1 | 0 |
>>> x_token = tokenizer("ウエ", prefix_text="アイ")
>>> # input_ids: | SOT | ア | イ | SEG | ウ | エ |
>>> # token_type_ids: | 1 | 1 | 1 | 0 | 0 | 0 |
```"""
prefix_len = 0
if self.sep_token in self.vocab:
segid = self.vocab[self.sep_token]
if segid in token_ids_0:
prefix_len = token_ids_0.index(segid)
if token_ids_1 is None:
total_len = len(token_ids_0)
else:
total_len = len(token_ids_0 + token_ids_1)
return prefix_len * [1] + (total_len - prefix_len) * [0]
def prepare_for_tokenization(self, text, prefix_text=None, add_sep_token=None, **kwargs):
# GPTSAN inserts extra SEP tokens in Prefix-LM in addition to SOT for text generation.
# SOT at the beginning of the text, and SEP at the separator between the Prefix part and the rest.
if add_sep_token is None:
add_sep_token = self.sep_token not in text # If insert un-prefix position explicitly
prepared = self.bos_token if self.bos_token in self.vocab else ""
prepared += prefix_text if prefix_text is not None else ""
if add_sep_token:
prepared += self.sep_token if self.sep_token in self.vocab else ""
prepared += text
return (prepared, kwargs)
def _batch_encode_plus(
self,
batch_text_or_text_pairs: Union[
List[TextInput], List[TextInputPair], List[PreTokenizedInput], List[PreTokenizedInputPair]
],
add_special_tokens: bool = True,
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
truncation_strategy: TruncationStrategy = TruncationStrategy.DO_NOT_TRUNCATE,
max_length: Optional[int] = None,
stride: int = 0,
is_split_into_words: bool = False,
pad_to_multiple_of: Optional[int] = None,
return_tensors: Optional[str] = None,
return_token_type_ids: Optional[bool] = None,
return_attention_mask: Optional[bool] = None,
return_overflowing_tokens: bool = False,
return_special_tokens_mask: bool = False,
return_offsets_mapping: bool = False,
return_length: bool = False,
verbose: bool = True,
) -> BatchEncoding:
# This tokenizer converts input text pairs into Prefix input and subsequent input
if type(batch_text_or_text_pairs[0]) is tuple or type(batch_text_or_text_pairs[0]) is list:
# As a single text with an explicit un-prefix position
batch_prefix_texts = []
for pref, txt in batch_text_or_text_pairs:
batch_prefix_texts.append(pref + self.sep_token + txt)
batch_text_or_text_pairs = batch_prefix_texts
return super()._batch_encode_plus(
batch_text_or_text_pairs,
add_special_tokens,
padding_strategy,
truncation_strategy,
max_length,
stride,
is_split_into_words,
pad_to_multiple_of,
return_tensors,
return_token_type_ids,
return_attention_mask,
return_overflowing_tokens,
return_special_tokens_mask,
return_offsets_mapping,
return_length,
verbose,
)
class SubWordJapaneseTokenizer(object):
"""
This tokenizer is based on GPTNeoXJapaneseTokenizer and has the following modifications
- Decoding byte0~byte255 tokens correctly
- Added bagofword token handling
https://github.com/tanreinama/Japanese-BPEEncoder_V2 This tokenizer class is under MIT Lisence according to the
original repository.
MIT License
Copyright (c) 2020 tanreinama
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of
the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
# Copied from tokenization_gpt_neox_japanese.SubWordJapaneseTokenizer.__init__
def __init__(self, vocab, ids_to_tokens, emoji):
self.vocab = vocab # same as swe
self.ids_to_tokens = ids_to_tokens # same as bpe
self.emoji = emoji
self.maxlen = np.max([len(w) for w in self.vocab.keys()])
self.content_repatter1 = re.compile(r"(https?|ftp)(:\/\/[-_\.!~*\'()a-zA-Z0-9;\/?:\@&=\+$,%#]+)")
self.content_repatter2 = re.compile(r"[A-Za-z0-9\._+]*@[\-_0-9A-Za-z]+(\.[A-Za-z]+)*")
self.content_repatter3 = re.compile(r"[\(]{0,1}[0-9]{2,4}[\)\-\(]{0,1}[0-9]{2,4}[\)\-]{0,1}[0-9]{3,4}")
self.content_repatter4 = re.compile(
r"([12]\d{3}[/\-年])*(0?[1-9]|1[0-2])[/\-月]((0?[1-9]|[12][0-9]|3[01])日?)*(\d{1,2}|:|\d{1,2}時|\d{1,2}分|\(日\)|\(月\)|\(火\)|\(水\)|\(木\)|\(金\)|\(土\)|㈰|㈪|㈫|㈬|㈭|㈮|㈯)*"
)
self.content_repatter5 = re.compile(
r"(明治|大正|昭和|平成|令和|㍾|㍽|㍼|㍻|\u32ff)\d{1,2}年(0?[1-9]|1[0-2])月(0?[1-9]|[12][0-9]|3[01])日(\d{1,2}|:|\d{1,2}時|\d{1,2}分|\(日\)|\(月\)|\(火\)|\(水\)|\(木\)|\(金\)|\(土\)|㈰|㈪|㈫|㈬|㈭|㈮|㈯)*"
)
self.content_repatter6 = re.compile(
r"((0|[1-9]\d*|[1-9]\d{0,2}(,\d{3})+)*億)*((0|[1-9]\d*|[1-9]\d{0,2}(,\d{3})+)*万)*((0|[1-9]\d*|[1-9]\d{0,2}(,\d{3})+)*千)*(0|[1-9]\d*|[1-9]\d{0,2}(,\d{3})+)*(千円|万円|千万円|円|千ドル|万ドル|千万ドル|ドル|千ユーロ|万ユーロ|千万ユーロ|ユーロ)+(\(税込\)|\(税抜\)|\+tax)*"
)
keisen = "─━│┃┄┅┆┇┈┉┊┋┌┍┎┏┐┑┒┓└┕┖┗┘┙┚┛├┝┞┟┠┡┢┣┤┥┦┧┨┩┪┫┬┭┮┯┰┱┲┳┴┵┶┷┸┹┺┻┼┽┾┿╀╁╂╃╄╅╆╇╈╉╊╋╌╍╎╏═║╒╓╔╕╖╗╘╙╚╛╜╝╞╟╠╡╢╣╤╥╦╧╨╩╪╫╬╭╮╯╰╱╲╳╴╵╶╷╸╹╺╻╼╽╾╿"
blocks = "▀▁▂▃▄▅▆▇█▉▊▋▌▍▎▏▐░▒▓▔▕▖▗▘▙▚▛▜▝▞▟"
self.content_trans1 = str.maketrans({k: "<BLOCK>" for k in keisen + blocks})
# Copied from tokenization_gpt_neox_japanese.SubWordJapaneseTokenizer.__len__
def __len__(self):
return len(self.ids_to_tokens)
# Copied from tokenization_gpt_neox_japanese.SubWordJapaneseTokenizer.clean_text
def clean_text(self, content):
content = self.content_repatter1.sub("<URL>", content)
content = self.content_repatter2.sub("<EMAIL>", content)
content = self.content_repatter3.sub("<TEL>", content)
content = self.content_repatter4.sub("<DATE>", content)
content = self.content_repatter5.sub("<DATE>", content)
content = self.content_repatter6.sub("<PRICE>", content)
content = content.translate(self.content_trans1)
while "<BLOCK><BLOCK>" in content:
content = content.replace("<BLOCK><BLOCK>", "<BLOCK>")
return content
# Copied from tokenization_gpt_neox_japanese.SubWordJapaneseTokenizer.tokenize
def tokenize(self, text, clean=False):
text = text.replace(" ", "<SP>")
text = text.replace(" ", "<SP>")
text = text.replace("\r\n", "<BR>")
text = text.replace("\n", "<BR>")
text = text.replace("\r", "<BR>")
text = text.replace("\t", "<TAB>")
text = text.replace("—", "ー")
text = text.replace("−", "ー")
for k, v in self.emoji["emoji"].items():
if k in text:
text = text.replace(k, v)
if clean:
text = self.clean_text(text)
def check_simbol(x):
e = x.encode()
if len(x) == 1 and len(e) == 2:
c = (int(e[0]) << 8) + int(e[1])
if (
(c >= 0xC2A1 and c <= 0xC2BF)
or (c >= 0xC780 and c <= 0xC783)
or (c >= 0xCAB9 and c <= 0xCBBF)
or (c >= 0xCC80 and c <= 0xCDA2)
):
return True
return False
def checku2e(x):
e = x.encode()
if len(x) == 1 and len(e) == 3:
c = (int(e[0]) << 16) + (int(e[1]) << 8) + int(e[2])
if c >= 0xE28080 and c <= 0xE2B07F:
return True
return False
pos = 0
result = []
while pos < len(text):
end = min(len(text), pos + self.maxlen + 1) if text[pos] == "<" else pos + 3
candidates = [] # (token_id, token, pos)
for e in range(end, pos, -1):
wd = text[pos:e]
if wd in self.vocab:
if wd[0] == "<" and len(wd) > 2:
candidates = [(self.vocab[wd], wd, e)]
break
else:
candidates.append((self.vocab[wd], wd, e))
if len(candidates) > 0:
# the smallest token_id is adopted
_, wd, e = sorted(candidates, key=lambda x: x[0])[0]
result.append(wd)
pos = e
else:
end = pos + 1
wd = text[pos:end]
if check_simbol(wd):
result.append("<KIGOU>")
elif checku2e(wd):
result.append("<U2000U2BFF>")
else:
for i in wd.encode("utf-8"):
result.append("<|byte%d|>" % i)
pos = end
return result
def convert_id_to_token(self, index):
return self.ids_to_tokens[index][0]
......@@ -3085,6 +3085,30 @@ class GPTJPreTrainedModel(metaclass=DummyObject):
requires_backends(self, ["torch"])
GPTSAN_JAPANESE_PRETRAINED_MODEL_ARCHIVE_LIST = None
class GPTSanJapaneseForConditionalGeneration(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class GPTSanJapaneseModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class GPTSanJapanesePreTrainedModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
GRAPHORMER_PRETRAINED_MODEL_ARCHIVE_LIST = None
......
# coding=utf-8
# Copyright 2023 Toshiyuki Sakamoto(tanreinama) and HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from transformers import (
GPTSanJapaneseConfig,
GPTSanJapaneseForConditionalGeneration,
GPTSanJapaneseModel,
GPTSanJapaneseTokenizer,
is_torch_available,
)
from transformers.generation import GenerationConfig
from transformers.testing_utils import require_torch, slow, tooslow, torch_device
from ...generation.test_utils import GenerationTesterMixin
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, ids_tensor
class GPTSanJapaneseTester:
def __init__(
self,
parent,
vocab_size=36000,
batch_size=13,
num_contexts=7,
# For common tests
is_training=True,
hidden_size=32,
ext_size=42,
num_hidden_layers=5,
num_ext_layers=2,
num_attention_heads=4,
num_experts=2,
d_ff=32,
d_ext=80,
d_spout=33,
dropout_rate=0.0,
layer_norm_epsilon=1e-6,
expert_capacity=100,
router_jitter_noise=0.0,
):
self.vocab_size = vocab_size
self.parent = parent
self.batch_size = batch_size
self.num_contexts = num_contexts
# For common tests
self.seq_length = self.num_contexts
self.is_training = is_training
self.hidden_size = hidden_size
self.num_ext_layers = num_ext_layers
self.ext_size = ext_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.num_experts = num_experts
self.d_ff = d_ff
self.d_ext = d_ext
self.d_spout = d_spout
self.dropout_rate = dropout_rate
self.layer_norm_epsilon = layer_norm_epsilon
self.expert_capacity = expert_capacity
self.router_jitter_noise = router_jitter_noise
def get_large_model_config(self):
return GPTSanJapaneseConfig.from_pretrained("Tanrei/GPTSAN-japanese")
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
config = self.get_config()
return (config, input_ids)
def prepare_config_and_inputs_for_common(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
config = self.get_config()
return (config, {"input_ids": input_ids})
def get_config(self):
return GPTSanJapaneseConfig(
vocab_size=36000,
num_contexts=self.seq_length,
d_model=self.hidden_size,
d_ff=self.d_ff,
d_ext=self.d_ext,
d_spout=self.d_spout,
num_switch_layers=self.num_hidden_layers - self.num_ext_layers,
num_ext_layers=self.num_ext_layers,
num_heads=self.num_attention_heads,
num_experts=self.num_experts,
expert_capacity=self.expert_capacity,
dropout_rate=self.dropout_rate,
layer_norm_epsilon=self.layer_norm_epsilon,
router_jitter_noise=self.router_jitter_noise,
)
def create_and_check_model(
self,
config,
input_ids,
):
model = GPTSanJapaneseForConditionalGeneration(config=config)
model.to(torch_device)
model.eval()
result = model(
input_ids=input_ids,
)
self.parent.assertIsNotNone(result)
@require_torch
class GPTSanJapaneseTest(ModelTesterMixin, unittest.TestCase):
all_model_classes = (GPTSanJapaneseModel,) if is_torch_available() else ()
fx_compatible = False
is_encoder_decoder = False
test_pruning = False
test_headmasking = False
test_cpu_offload = False
test_disk_offload = False
test_save_load_fast_init_to_base = False
test_training = False
# The small GPTSAN_JAPANESE model needs higher percentages for CPU/MP tests
model_split_percents = [0.8, 0.9]
def setUp(self):
self.model_tester = GPTSanJapaneseTester(self)
self.config_tester = ConfigTester(self, config_class=GPTSanJapaneseConfig, d_model=37)
def test_config(self):
GPTSanJapaneseConfig()
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
@require_torch
class GPTSanJapaneseForConditionalGenerationTest(ModelTesterMixin, GenerationTesterMixin, unittest.TestCase):
all_model_classes = (GPTSanJapaneseForConditionalGeneration,) if is_torch_available() else ()
fx_compatible = False
is_encoder_decoder = False
test_pruning = False
test_headmasking = False
test_cpu_offload = False
test_disk_offload = False
# The small GPTSAN_JAPANESE model needs higher percentages for CPU/MP tests
model_split_percents = [0.8, 0.9]
def setUp(self):
self.model_tester = GPTSanJapaneseTester(self)
self.config_tester = ConfigTester(self, config_class=GPTSanJapaneseConfig, d_model=37)
def test_config(self):
GPTSanJapaneseConfig()
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
@slow
def test_logits(self):
model = GPTSanJapaneseForConditionalGeneration.from_pretrained("Tanrei/GPTSAN-japanese")
tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
input_ids = tokenizer.encode("武田信玄は", return_tensors="pt")
outputs = model(input_ids)
output_logits = outputs.logits.detach().cpu().numpy()
# Output of original model created with mesh-tensoflow
target = [
# fmt: off
[-12.037839889526367, -12.433061599731445, -14.333840370178223, -12.450345993041992, -11.1661376953125,
-11.930137634277344, -10.659740447998047, -12.909574508666992, -13.241043090820312, -13.398579597473145,
-11.107524871826172, -12.3685941696167, -22.97943115234375, -10.481067657470703, -12.484030723571777,
-12.807360649108887, -14.769700050354004, -12.233579635620117, -13.428145408630371, -22.624177932739258],
[-7.511149883270264, -8.281851768493652, -7.943127155303955, -7.55021333694458, -6.49869966506958,
-7.586796283721924, -6.978085994720459, -7.839145183563232, -8.21964168548584, -8.695091247558594,
-6.706910610198975, -6.6585798263549805, -19.565698623657227, -5.353842735290527, -8.350686073303223,
-8.039388656616211, -10.856569290161133, -7.75154447555542, -8.819022178649902, -19.51532745361328],
[-9.73066234588623, -10.223922729492188, -9.932981491088867, -11.857836723327637, -7.662626266479492,
-11.13529109954834, -7.765097618103027, -11.472923278808594, -9.543149948120117, -11.905633926391602,
-9.366164207458496, -11.5734281539917, -23.699003219604492, -9.429590225219727, -10.42839241027832,
-10.585240364074707, -10.94771957397461, -11.095416069030762, -10.390240669250488, -23.769372940063477],
[-9.728265762329102, -9.859712600708008, -10.09729290008545, -9.678522109985352, -6.879519939422607,
-9.68487548828125, -4.2803425788879395, -10.018914222717285, -9.308445930480957, -10.63394546508789,
-8.083646774291992, -9.06301498413086, -21.904266357421875, -8.90160846710205, -8.841876029968262,
-11.856719970703125, -12.079398155212402, -11.233753204345703, -10.177338600158691, -21.87256622314453],
[-9.669764518737793, -9.614198684692383, -9.814510345458984, -9.996501922607422, -11.375690460205078,
-10.113405227661133, -10.546867370605469, -10.04369068145752, -10.907809257507324, -10.504216194152832,
-11.129199028015137, -10.151124000549316, -21.96586799621582, -9.086349487304688, -11.730339050292969,
-10.460667610168457, -10.298049926757812, -10.784148216247559, -10.840693473815918, -22.03152847290039],
# fmt: on
]
target = np.array(target).flatten()
predict = output_logits[0, :, :20].flatten()
def check(a, b, epsilon=5e-4):
return abs(a - b) < epsilon * max(abs(a), abs(b))
self.assertTrue(np.all([check(target[i], predict[i]) for i in range(len(target))]))
@slow
def test_batch_generation(self):
model = GPTSanJapaneseForConditionalGeneration.from_pretrained("Tanrei/GPTSAN-japanese")
tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
model.to(torch_device)
# set deterministically
generation_config = GenerationConfig.from_pretrained("Tanrei/GPTSAN-japanese")
generation_config.top_k = 1
# use different length sentences to test batching
sentences = [
"甲斐なら武田と言うほど",
"織田信長は、",
]
tokenizer.padding_side = "left"
inputs = tokenizer(sentences, return_tensors="pt", padding=True)
input_ids = inputs["input_ids"].to(torch_device)
self.assertNotEqual(inputs["attention_mask"][0].numpy().tolist(), inputs["attention_mask"][1].numpy().tolist())
outputs = model.generate(
input_ids=input_ids,
attention_mask=inputs["attention_mask"].to(torch_device),
max_new_tokens=3,
generation_config=generation_config,
)
inputs_non_padded = tokenizer(sentences[0], return_tensors="pt").input_ids.to(torch_device)
output_non_padded = model.generate(
input_ids=inputs_non_padded, max_new_tokens=3, generation_config=generation_config
)
inputs_padded = tokenizer(sentences[1], return_tensors="pt").input_ids.to(torch_device)
output_padded = model.generate(input_ids=inputs_padded, max_new_tokens=3, generation_config=generation_config)
self.assertNotEqual(inputs_non_padded.shape, inputs_padded.shape)
batch_out_sentence = tokenizer.batch_decode(outputs, skip_special_tokens=True)
non_padded_sentence = tokenizer.decode(output_non_padded[0], skip_special_tokens=True)
padded_sentence = tokenizer.decode(output_padded[0], skip_special_tokens=True)
expected_output_sentence = [
"甲斐なら武田と言うほど甲斐の武田",
"織田信長は、このような",
]
self.assertListEqual(expected_output_sentence, batch_out_sentence)
self.assertListEqual(batch_out_sentence, [non_padded_sentence, padded_sentence])
@tooslow
def test_sample(self):
model = GPTSanJapaneseForConditionalGeneration.from_pretrained("Tanrei/GPTSAN-japanese")
tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
# Output of original model created with mesh-tensoflow
target = [
("武田信玄は", 35675),
("武田信玄は、", 45),
("武田信玄は、この", 29),
("武田信玄は、このよう", 30642),
("武田信玄は、このような", 35680),
("武田信玄は、このような「", 8640),
("武田信玄は、このような「武田", 31617),
("武田信玄は、このような「武田家", 30646),
("武田信玄は、このような「武田家の", 31617),
("武田信玄は、このような「武田家の家", 31381),
]
for input, output in target:
input_ids = tokenizer.encode(input, return_tensors="pt")
outputs = model(input_ids)
output_logits = outputs.logits.detach().cpu().numpy()[0]
output_id = np.argmax(output_logits[-1])
self.assertEqual(output_id, output)
@slow
def test_spout_generation(self):
model = GPTSanJapaneseForConditionalGeneration.from_pretrained("Tanrei/GPTSAN-japanese")
tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
model.to(torch_device)
# set deterministically
generation_config = GenerationConfig.from_pretrained("Tanrei/GPTSAN-japanese")
generation_config.top_k = 1
input_text = "武田信玄は、"
input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(torch_device)
input_ids_batch = tokenizer([input_text, input_text], return_tensors="pt").input_ids.to(torch_device)
# spout from uniform and one-hot
spouts = [
# fmt: off
[0.87882208, 0.38426396, 0.33220248, 0.43890406, 0.16562252,
0.04803985, 0.211572 , 0.23188473, 0.37153068, 0.7836377 ,
0.02160172, 0.38761719, 0.75290772, 0.90198857, 0.34365777,
0.64168169, 0.44318471, 0.14575746, 0.92562881, 0.40812148,
0.29019122, 0.88861599, 0.65524846, 0.43563456, 0.38177187,
0.70832965, 0.81527892, 0.68832812, 0.38833192, 0.4561522 ,
0.14828817, 0.47248213, 0.54357335, 0.82009566, 0.1338884 ,
0.02755417, 0.19764677, 0.2422084 , 0.04757674, 0.65409606,
0.0824589 , 0.03304383, 0.94387689, 0.98764509, 0.82433901,
0.27646741, 0.64907493, 0.76009406, 0.30087915, 0.17904689,
0.41601714, 0.67046398, 0.10422822, 0.08447374, 0.07354344,
0.61423565, 0.70284866, 0.7532333 , 0.1972038 , 0.29575659,
0.90583886, 0.29265307, 0.50000175, 0.70407655, 0.889363 ,
0.81904418, 0.66829128, 0.64468815, 0.56563723, 0.85601875,
0.94924672, 0.00166762, 0.25220643, 0.74540219, 0.67993247,
0.1549675 , 0.39385352, 0.92153607, 0.63745931, 0.27759043,
0.84702295, 0.65904271, 0.58676614, 0.8666936 , 0.39607438,
0.79954983, 0.42220697, 0.39650381, 0.7849864 , 0.56150201,
0.15678925, 0.14746032, 0.34542114, 0.47026783, 0.11956489,
0.25421435, 0.33788901, 0.68934842, 0.36424685, 0.71737898,
0.38983449, 0.94393779, 0.39575588, 0.36616553, 0.87104665,
0.64630203, 0.22516905, 0.88270804, 0.15031338, 0.75144345,
0.46459025, 0.85396454, 0.86355643, 0.65139851, 0.70266061,
0.30241389, 0.81056497, 0.88865969, 0.38773807, 0.70635849,
0.90718459, 0.43245789, 0.28000654, 0.45935562, 0.08773519,
0.9552151 , 0.93901511, 0.22489288], # uniform
[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0.],
# fmt: on
]
output1 = model.generate(
input_ids=input_ids,
spout=spouts[0],
max_new_tokens=20,
generation_config=generation_config,
)
output2 = model.generate(
input_ids=input_ids,
spout=spouts[1],
max_new_tokens=20,
generation_config=generation_config,
)
output3 = model.generate(
input_ids=input_ids_batch,
spout=spouts,
max_new_tokens=20,
generation_config=generation_config,
)
out1_sentence = tokenizer.decode(output1[0])
out2_sentence = tokenizer.decode(output2[0])
batch_out_sentence = tokenizer.batch_decode(output3)
expected_output_sentence = [
"武田信玄は、武田氏の滅亡後、武田氏の居城であった甲斐武田氏の居城である",
"武田信玄は、武田家の滅亡を防ぐため、武田家の家臣である武田信虎を討",
]
self.assertListEqual(expected_output_sentence, batch_out_sentence)
self.assertListEqual(batch_out_sentence, [out1_sentence, out2_sentence])
@slow
def test_prefix_lm_generation(self):
model = GPTSanJapaneseForConditionalGeneration.from_pretrained("Tanrei/GPTSAN-japanese")
tokenizer = GPTSanJapaneseTokenizer.from_pretrained("Tanrei/GPTSAN-japanese")
model.to(torch_device)
# set deterministically
generation_config = GenerationConfig.from_pretrained("Tanrei/GPTSAN-japanese")
generation_config.top_k = 1
prefix_text_1 = "武田信玄"
prefix_text_2 = "織田信長"
input_text_1 = "は、"
input_text_2 = "が、"
input_tok_1 = tokenizer(input_text_1, prefix_text=prefix_text_1, return_tensors="pt")
input_tok_2 = tokenizer(input_text_2, prefix_text=prefix_text_2, return_tensors="pt")
input_tok_3 = tokenizer([[prefix_text_1, input_text_1], [prefix_text_2, input_text_2]], return_tensors="pt")
output1 = model.generate(
input_ids=input_tok_1.input_ids.to(torch_device),
token_type_ids=input_tok_1.token_type_ids.to(torch_device),
max_new_tokens=20,
generation_config=generation_config,
)
output2 = model.generate(
input_ids=input_tok_2.input_ids.to(torch_device),
token_type_ids=input_tok_2.token_type_ids.to(torch_device),
max_new_tokens=20,
generation_config=generation_config,
)
output3 = model.generate(
input_ids=input_tok_3.input_ids.to(torch_device),
token_type_ids=input_tok_3.token_type_ids.to(torch_device),
attention_mask=input_tok_3.attention_mask.to(torch_device),
max_new_tokens=20,
generation_config=generation_config,
)
out1_sentence = tokenizer.decode(output1[0])
out2_sentence = tokenizer.decode(output2[0])
batch_out_sentence = tokenizer.batch_decode(output3)
expected_output_sentence = [
"武田信玄は、武田氏の祖である武田信虎を、その子・武田信友を擁して",
"織田信長が、織田信長の妻・お市の方を妻として迎えたという逸話が残",
]
self.assertListEqual(expected_output_sentence, batch_out_sentence)
self.assertListEqual(batch_out_sentence, [out1_sentence, out2_sentence])
# coding=utf-8
# Copyright 2023 Toshiyuki Sakamoto(tanreinama) and HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import unittest
from transformers.models.gptsan_japanese.tokenization_gptsan_japanese import (
VOCAB_FILES_NAMES,
GPTSanJapaneseTokenizer,
)
from transformers.testing_utils import require_tokenizers, slow
from ...test_tokenization_common import TokenizerTesterMixin
@require_tokenizers
class GPTSanJapaneseTokenizationTest(TokenizerTesterMixin, unittest.TestCase):
tokenizer_class = GPTSanJapaneseTokenizer
test_rust_tokenizer = False
from_pretrained_kwargs = {"do_clean_text": False, "add_prefix_space": False}
def setUp(self):
super().setUp()
# fmt: off
vocab_tokens = ["こん", "こんに", "にちは", "ばんは", "世界,㔺界", "、", "。", "<BR>", "<SP>", "<TAB>", "<URL>", "<EMAIL>", "<TEL>", "<DATE>", "<PRICE>", "<BLOCK>", "<KIGOU>", "<U2000U2BFF>", "<|emoji1|>", "<unk>", "<|bagoftoken|>", "<|endoftext|>"]
# fmt: on
emoji_tokens = {"emoji": {"\ud83d\ude00": "<|emoji1|>"}, "emoji_inv": {"<|emoji1|>": "\ud83d\ude00"}} # 😀
self.special_tokens_map = {"unk_token": "<unk>"}
self.vocab_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["vocab_file"])
self.emoji_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["emoji_file"])
with open(self.vocab_file, "w", encoding="utf-8") as vocab_writer:
vocab_writer.write("".join([x + "\n" for x in vocab_tokens]))
with open(self.emoji_file, "w") as emoji_writer:
emoji_writer.write(json.dumps(emoji_tokens))
def get_tokenizer(self, **kwargs):
kwargs.update(self.special_tokens_map)
return GPTSanJapaneseTokenizer.from_pretrained(self.tmpdirname, **kwargs)
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.get_input_output_texts
def get_input_output_texts(self, tokenizer):
input_text = "こんにちは、世界。 \nこんばんは、㔺界。😀"
output_text = "こんにちは、世界。 \nこんばんは、世界。😀"
return input_text, output_text
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.get_clean_sequence
def get_clean_sequence(self, tokenizer):
input_text, output_text = self.get_input_output_texts(tokenizer)
ids = tokenizer.encode(output_text, add_special_tokens=False)
text = tokenizer.decode(ids, clean_up_tokenization_spaces=False)
return text, ids
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.test_pretokenized_inputs
def test_pretokenized_inputs(self):
pass # TODO add if relevant
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.test_maximum_encoding_length_pair_input
def test_maximum_encoding_length_pair_input(self):
pass # TODO add if relevant
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.test_maximum_encoding_length_single_input
def test_maximum_encoding_length_single_input(self):
pass # TODO add if relevant
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.test_full_tokenizer
def test_full_tokenizer(self):
tokenizer = self.get_tokenizer()
# Testing tokenization
input_text = "こんにちは、世界。 こんばんは、㔺界。"
expected_token = ["こん", "にちは", "、", "世界", "。", "<SP>", "こん", "ばんは", "、", "㔺界", "。"]
tokens = tokenizer.tokenize(input_text)
self.assertListEqual(tokens, expected_token)
# Testing conversion to ids without special tokens
expected_ids = [0, 2, 5, 4, 6, 8, 0, 3, 5, 4, 6]
input_ids = tokenizer.convert_tokens_to_ids(tokens)
self.assertListEqual(input_ids, expected_ids)
# Testing conversion to ids with special tokens
input_tokens = tokens + [tokenizer.unk_token]
expected_ids = [0, 2, 5, 4, 6, 8, 0, 3, 5, 4, 6, 19]
input_ids = tokenizer.convert_tokens_to_ids(input_tokens)
self.assertListEqual(input_ids, expected_ids)
def test_token_bagging(self):
tokenizer = self.get_tokenizer()
# Testing tokenization
input_text = "こんにちは、<|bagoftoken|>世界。こんばんは、<|bagoftoken|>㔺界。"
expected_text = "こんにちは、、、、世界。こんばんは、、、、世界。"
tokens = tokenizer.encode(input_text)
output_text = tokenizer.decode(tokens)
self.assertEqual(output_text, expected_text)
@slow
def test_prefix_input(self):
tokenizer = self.tokenizer_class.from_pretrained("Tanrei/GPTSAN-japanese")
# Testing tokenization
prefix_text = "こんにちは、世界。"
input_text = "こんばんは、㔺界。😀"
expected_text = "こんにちは、世界。こんばんは、世界。😀"
tokens_1 = tokenizer.encode(prefix_text + input_text)
tokens_2 = tokenizer.encode("", prefix_text=prefix_text + input_text)
tokens_3 = tokenizer.encode(input_text, prefix_text=prefix_text)
output_text_1 = tokenizer.decode(tokens_1)
output_text_2 = tokenizer.decode(tokens_2)
output_text_3 = tokenizer.decode(tokens_3)
self.assertEqual(output_text_1, expected_text)
self.assertEqual(output_text_2, expected_text)
self.assertEqual(output_text_3, expected_text)
@slow
def test_token_type_ids(self):
tokenizer = self.tokenizer_class.from_pretrained("Tanrei/GPTSAN-japanese")
# Testing tokenization
prefix_text = "こんにちは、世界。"
input_text = "こんばんは、㔺界。😀"
len_prefix = len(tokenizer.encode(prefix_text)) - 2
len_text = len(tokenizer.encode(input_text)) - 2
expected_mask_1 = [1] + [0] * (len_prefix + len_text + 1)
expected_mask_2 = [1] * (len_prefix + len_text + 1) + [0]
expected_mask_3 = [1] + [1] * (len_prefix) + [0] * (len_text + 1)
type_id_1 = tokenizer(prefix_text + input_text).token_type_ids
type_id_2 = tokenizer("", prefix_text=prefix_text + input_text).token_type_ids
type_id_3 = tokenizer(input_text, prefix_text=prefix_text).token_type_ids
self.assertListEqual(type_id_1, expected_mask_1)
self.assertListEqual(type_id_2, expected_mask_2)
self.assertListEqual(type_id_3, expected_mask_3)
@slow
def test_prefix_tokens(self):
tokenizer = self.tokenizer_class.from_pretrained("Tanrei/GPTSAN-japanese")
x_token_1 = tokenizer.encode("あンいワ")
x_token_2 = tokenizer.encode("", prefix_text="あンいワ")
x_token_3 = tokenizer.encode("いワ", prefix_text="あン")
self.assertEqual(tokenizer.decode(x_token_1), tokenizer.decode(x_token_2))
self.assertEqual(tokenizer.decode(x_token_1), tokenizer.decode(x_token_3))
self.assertNotEqual(x_token_1, x_token_2)
self.assertNotEqual(x_token_1, x_token_3)
self.assertEqual(x_token_1[1], x_token_2[-1]) # SEG token
self.assertEqual(x_token_1[1], x_token_3[3]) # SEG token
@slow
def test_batch_encode(self):
tokenizer = self.tokenizer_class.from_pretrained("Tanrei/GPTSAN-japanese")
input_pairs = [["武田信玄", "は、"], ["織田信長", "の配下の、"]]
x_token = tokenizer(input_pairs, padding=True)
x_token_2 = tokenizer.batch_encode_plus(input_pairs, padding=True)
# fmt: off
expected_outputs = [[35993, 8640, 25948, 35998, 30647, 35675, 35999, 35999], [35993, 10382, 9868, 35998, 30646, 9459, 30646, 35675]]
expected_typeids = [[1, 1, 1, 0, 0, 0, 0, 0], [1, 1, 1, 0, 0, 0, 0, 0]]
expected_attmask = [[1, 1, 1, 1, 1, 1, 0, 0], [1, 1, 1, 1, 1, 1, 1, 1]]
# fmt: on
self.assertListEqual(x_token.input_ids, expected_outputs)
self.assertListEqual(x_token.token_type_ids, expected_typeids)
self.assertListEqual(x_token.attention_mask, expected_attmask)
self.assertListEqual(x_token_2.input_ids, expected_outputs)
self.assertListEqual(x_token_2.token_type_ids, expected_typeids)
self.assertListEqual(x_token_2.attention_mask, expected_attmask)
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.test_conversion_reversible
def test_conversion_reversible(self):
# Intentionally convert some words to accommodate character fluctuations unique to Japanese
pass
# Copied from tests.models.gpt_neox_japanese.test_tokenization_gpt_neox_japanese.GPTNeoXJapaneseTokenizationTest.test_padding_different_model_input_name
def test_padding_different_model_input_name(self):
# tokenizer has no padding token
pass
......@@ -198,6 +198,7 @@ IGNORE_NON_AUTO_CONFIGURED = PRIVATE_MODELS.copy() + [
"CLIPSegVisionModel",
"CLIPSegTextModel",
"EsmForProteinFolding",
"GPTSanJapaneseModel",
"TimeSeriesTransformerForPrediction",
"JukeboxVQVAE",
"JukeboxPrior",
......
......@@ -92,6 +92,7 @@ src/transformers/models/glpn/modeling_glpn.py
src/transformers/models/gpt2/configuration_gpt2.py
src/transformers/models/gpt2/modeling_gpt2.py
src/transformers/models/gptj/modeling_gptj.py
src/transformers/models/gptsan_japanese/modeling_gptsan_japanese.py
src/transformers/models/gpt_neo/configuration_gpt_neo.py
src/transformers/models/gpt_neox/configuration_gpt_neox.py
src/transformers/models/gpt_neox_japanese/configuration_gpt_neox_japanese.py
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment