"docs/vscode:/vscode.git/clone" did not exist on "e330961da5d632c2133daa711d87ede73f9b54db"
Commit 24db6dab authored by Rayyyyy's avatar Rayyyyy
Browse files

first add

parents
Pipeline #850 failed with stages
in 0 seconds
import torch
from torch import nn, Tensor
from typing import Iterable, Dict, Callable
from ..SentenceTransformer import SentenceTransformer
import logging
logger = logging.getLogger(__name__)
class SoftmaxLoss(nn.Module):
def __init__(
self,
model: SentenceTransformer,
sentence_embedding_dimension: int,
num_labels: int,
concatenation_sent_rep: bool = True,
concatenation_sent_difference: bool = True,
concatenation_sent_multiplication: bool = False,
loss_fct: Callable = nn.CrossEntropyLoss(),
):
"""
This loss was used in our SBERT publication (https://arxiv.org/abs/1908.10084) to train the SentenceTransformer
model on NLI data. It adds a softmax classifier on top of the output of two transformer networks.
:class:`MultipleNegativesRankingLoss` is an alternative loss function that often yields better results,
as per https://arxiv.org/abs/2004.09813.
:param model: SentenceTransformer model
:param sentence_embedding_dimension: Dimension of your sentence embeddings
:param num_labels: Number of different labels
:param concatenation_sent_rep: Concatenate vectors u,v for the softmax classifier?
:param concatenation_sent_difference: Add abs(u-v) for the softmax classifier?
:param concatenation_sent_multiplication: Add u*v for the softmax classifier?
:param loss_fct: Optional: Custom pytorch loss function. If not set, uses nn.CrossEntropyLoss()
References:
- Sentence-BERT: Sentence Embeddings using Siamese BERT-Networks: https://arxiv.org/abs/1908.10084
- `Training Examples > Natural Language Inference <../../examples/training/nli/README.html>`_
Requirements:
1. sentence pairs with a class label
Inputs:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (sentence_A, sentence_B) pairs | class |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, SentencesDataset, losses
from sentence_transformers.readers import InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('distilbert-base-nli-mean-tokens')
train_examples = [
InputExample(texts=['First pair, sent A', 'First pair, sent B'], label=0),
InputExample(texts=['Second pair, sent A', 'Second pair, sent B'], label=1),
InputExample(texts=['Third pair, sent A', 'Third pair, sent B'], label=0),
InputExample(texts=['Fourth pair, sent A', 'Fourth pair, sent B'], label=2),
]
train_batch_size = 2
train_dataset = SentencesDataset(train_examples, model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)
train_loss = losses.SoftmaxLoss(
model=model,
sentence_embedding_dimension=model.get_sentence_embedding_dimension(),
num_labels=len(set(x.label for x in train_examples))
)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(SoftmaxLoss, self).__init__()
self.model = model
self.num_labels = num_labels
self.concatenation_sent_rep = concatenation_sent_rep
self.concatenation_sent_difference = concatenation_sent_difference
self.concatenation_sent_multiplication = concatenation_sent_multiplication
num_vectors_concatenated = 0
if concatenation_sent_rep:
num_vectors_concatenated += 2
if concatenation_sent_difference:
num_vectors_concatenated += 1
if concatenation_sent_multiplication:
num_vectors_concatenated += 1
logger.info("Softmax loss: #Vectors concatenated: {}".format(num_vectors_concatenated))
self.classifier = nn.Linear(
num_vectors_concatenated * sentence_embedding_dimension, num_labels, device=model.device
)
self.loss_fct = loss_fct
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
rep_a, rep_b = reps
vectors_concat = []
if self.concatenation_sent_rep:
vectors_concat.append(rep_a)
vectors_concat.append(rep_b)
if self.concatenation_sent_difference:
vectors_concat.append(torch.abs(rep_a - rep_b))
if self.concatenation_sent_multiplication:
vectors_concat.append(rep_a * rep_b)
features = torch.cat(vectors_concat, 1)
output = self.classifier(features)
if labels is not None:
loss = self.loss_fct(output, labels.view(-1))
return loss
else:
return reps, output
from torch import nn, Tensor
from typing import Iterable, Dict
import torch.nn.functional as F
from enum import Enum
from ..SentenceTransformer import SentenceTransformer
class TripletDistanceMetric(Enum):
"""
The metric for the triplet loss
"""
COSINE = lambda x, y: 1 - F.cosine_similarity(x, y)
EUCLIDEAN = lambda x, y: F.pairwise_distance(x, y, p=2)
MANHATTAN = lambda x, y: F.pairwise_distance(x, y, p=1)
class TripletLoss(nn.Module):
def __init__(
self, model: SentenceTransformer, distance_metric=TripletDistanceMetric.EUCLIDEAN, triplet_margin: float = 5
):
"""
This class implements triplet loss. Given a triplet of (anchor, positive, negative),
the loss minimizes the distance between anchor and positive while it maximizes the distance
between anchor and negative. It compute the following loss function:
``loss = max(||anchor - positive|| - ||anchor - negative|| + margin, 0)``.
Margin is an important hyperparameter and needs to be tuned respectively.
:param model: SentenceTransformerModel
:param distance_metric: Function to compute distance between two embeddings. The class TripletDistanceMetric
contains common distance metrices that can be used.
:param triplet_margin: The negative should be at least this much further away from the anchor than the positive.
References:
- For further details, see: https://en.wikipedia.org/wiki/Triplet_loss
Requirements:
1. (anchor, positive, negative) triplets
Inputs:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (anchor, positive, negative) triplets | none |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, SentencesDataset, losses
from sentence_transformers.readers import InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('distilbert-base-nli-mean-tokens')
train_examples = [
InputExample(texts=['Anchor 1', 'Positive 1', 'Negative 1']),
InputExample(texts=['Anchor 2', 'Positive 2', 'Negative 2']),
]
train_batch_size = 1
train_dataset = SentencesDataset(train_examples, model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)
train_loss = losses.TripletLoss(model=model)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(TripletLoss, self).__init__()
self.model = model
self.distance_metric = distance_metric
self.triplet_margin = triplet_margin
def get_config_dict(self):
distance_metric_name = self.distance_metric.__name__
for name, value in vars(TripletDistanceMetric).items():
if value == self.distance_metric:
distance_metric_name = "TripletDistanceMetric.{}".format(name)
break
return {"distance_metric": distance_metric_name, "triplet_margin": self.triplet_margin}
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
rep_anchor, rep_pos, rep_neg = reps
distance_pos = self.distance_metric(rep_anchor, rep_pos)
distance_neg = self.distance_metric(rep_anchor, rep_neg)
losses = F.relu(distance_pos - distance_neg + self.triplet_margin)
return losses.mean()
from .AdaptiveLayerLoss import AdaptiveLayerLoss
from .CosineSimilarityLoss import CosineSimilarityLoss
from .SoftmaxLoss import SoftmaxLoss
from .MultipleNegativesRankingLoss import MultipleNegativesRankingLoss
from .MultipleNegativesSymmetricRankingLoss import MultipleNegativesSymmetricRankingLoss
from .TripletLoss import TripletDistanceMetric, TripletLoss
from .MarginMSELoss import MarginMSELoss
from .MatryoshkaLoss import MatryoshkaLoss
from .Matryoshka2dLoss import Matryoshka2dLoss
from .MSELoss import MSELoss
from .CachedMultipleNegativesRankingLoss import CachedMultipleNegativesRankingLoss
from .ContrastiveLoss import SiameseDistanceMetric, ContrastiveLoss
from .ContrastiveTensionLoss import (
ContrastiveTensionLoss,
ContrastiveTensionLossInBatchNegatives,
ContrastiveTensionDataLoader,
)
from .CoSENTLoss import CoSENTLoss
from .AnglELoss import AnglELoss
from .OnlineContrastiveLoss import OnlineContrastiveLoss
from .MegaBatchMarginLoss import MegaBatchMarginLoss
from .DenoisingAutoEncoderLoss import DenoisingAutoEncoderLoss
from .GISTEmbedLoss import GISTEmbedLoss
# Triplet losses
from .BatchHardTripletLoss import BatchHardTripletLoss, BatchHardTripletLossDistanceFunction
from .BatchHardSoftMarginTripletLoss import BatchHardSoftMarginTripletLoss
from .BatchSemiHardTripletLoss import BatchSemiHardTripletLoss
from .BatchAllTripletLoss import BatchAllTripletLoss
__all__ = [
"AdaptiveLayerLoss",
"CosineSimilarityLoss",
"SoftmaxLoss",
"MultipleNegativesRankingLoss",
"MultipleNegativesSymmetricRankingLoss",
"TripletLoss",
"TripletDistanceMetric",
"MarginMSELoss",
"MatryoshkaLoss",
"Matryoshka2dLoss",
"MSELoss",
"ContrastiveLoss",
"SiameseDistanceMetric",
"CachedMultipleNegativesRankingLoss",
"ContrastiveTensionLoss",
"ContrastiveTensionLossInBatchNegatives",
"ContrastiveTensionDataLoader",
"CoSENTLoss",
"AnglELoss",
"OnlineContrastiveLoss",
"MegaBatchMarginLoss",
"DenoisingAutoEncoderLoss",
"GISTEmbedLoss",
"BatchHardTripletLoss",
"BatchHardTripletLossDistanceFunction",
"BatchHardSoftMarginTripletLoss",
"BatchSemiHardTripletLoss",
"BatchAllTripletLoss",
]
import logging
from .util import fullname
class ModelCardTemplate:
__TAGS__ = ["sentence-transformers", "feature-extraction", "sentence-similarity"]
__DEFAULT_VARS__ = {
"{PIPELINE_TAG}": "sentence-similarity",
"{MODEL_DESCRIPTION}": "<!--- Describe your model here -->",
"{TRAINING_SECTION}": "",
"{USAGE_TRANSFORMERS_SECTION}": "",
"{EVALUATION}": "<!--- Describe how your model was evaluated -->",
"{CITING}": "<!--- Describe where people can find more information -->",
}
__MODEL_CARD__ = """
---
library_name: sentence-transformers
pipeline_tag: {PIPELINE_TAG}
tags:
{TAGS}
{DATASETS}
---
# {MODEL_NAME}
This is a [sentence-transformers](https://www.SBERT.net) model: It maps sentences & paragraphs to a {NUM_DIMENSIONS} dimensional dense vector space and can be used for tasks like clustering or semantic search.
{MODEL_DESCRIPTION}
## Usage (Sentence-Transformers)
Using this model becomes easy when you have [sentence-transformers](https://www.SBERT.net) installed:
```
pip install -U sentence-transformers
```
Then you can use the model like this:
```python
from sentence_transformers import SentenceTransformer
sentences = ["This is an example sentence", "Each sentence is converted"]
model = SentenceTransformer('{MODEL_NAME}')
embeddings = model.encode(sentences)
print(embeddings)
```
{USAGE_TRANSFORMERS_SECTION}
## Evaluation Results
{EVALUATION}
For an automated evaluation of this model, see the *Sentence Embeddings Benchmark*: [https://seb.sbert.net](https://seb.sbert.net?model_name={MODEL_NAME})
{TRAINING_SECTION}
## Full Model Architecture
```
{FULL_MODEL_STR}
```
## Citing & Authors
{CITING}
"""
__TRAINING_SECTION__ = """
## Training
The model was trained with the parameters:
{LOSS_FUNCTIONS}
Parameters of the fit()-Method:
```
{FIT_PARAMETERS}
```
"""
__USAGE_TRANSFORMERS__ = """\n
## Usage (HuggingFace Transformers)
Without [sentence-transformers](https://www.SBERT.net), you can use the model like this: First, you pass your input through the transformer model, then you have to apply the right pooling-operation on-top of the contextualized word embeddings.
```python
from transformers import AutoTokenizer, AutoModel
import torch
{POOLING_FUNCTION}
# Sentences we want sentence embeddings for
sentences = ['This is an example sentence', 'Each sentence is converted']
# Load model from HuggingFace Hub
tokenizer = AutoTokenizer.from_pretrained('{MODEL_NAME}')
model = AutoModel.from_pretrained('{MODEL_NAME}')
# Tokenize sentences
encoded_input = tokenizer(sentences, padding=True, truncation=True, return_tensors='pt')
# Compute token embeddings
with torch.no_grad():
model_output = model(**encoded_input)
# Perform pooling. In this case, {POOLING_MODE} pooling.
sentence_embeddings = {POOLING_FUNCTION_NAME}(model_output, encoded_input['attention_mask'])
print("Sentence embeddings:")
print(sentence_embeddings)
```
"""
@staticmethod
def model_card_get_pooling_function(pooling_mode):
if pooling_mode == "max":
return (
"max_pooling",
"""
# Max Pooling - Take the max value over time for every dimension.
def max_pooling(model_output, attention_mask):
token_embeddings = model_output[0] #First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
token_embeddings[input_mask_expanded == 0] = -1e9 # Set padding tokens to large negative value
return torch.max(token_embeddings, 1)[0]
""",
)
elif pooling_mode == "mean":
return (
"mean_pooling",
"""
#Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] #First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
""",
)
elif pooling_mode == "cls":
return (
"cls_pooling",
"""
def cls_pooling(model_output, attention_mask):
return model_output[0][:,0]
""",
)
@staticmethod
def get_train_objective_info(dataloader, loss):
try:
if hasattr(dataloader, "get_config_dict"):
loader_params = dataloader.get_config_dict()
else:
loader_params = {}
loader_params["batch_size"] = dataloader.batch_size if hasattr(dataloader, "batch_size") else "unknown"
if hasattr(dataloader, "sampler"):
loader_params["sampler"] = fullname(dataloader.sampler)
if hasattr(dataloader, "batch_sampler"):
loader_params["batch_sampler"] = fullname(dataloader.batch_sampler)
dataloader_str = """**DataLoader**:\n\n`{}` of length {} with parameters:
```
{}
```""".format(fullname(dataloader), len(dataloader), loader_params)
loss_str = "**Loss**:\n\n`{}` {}".format(
fullname(loss),
"""with parameters:
```
{}
```""".format(loss.get_config_dict())
if hasattr(loss, "get_config_dict")
else "",
)
return [dataloader_str, loss_str]
except Exception as e:
logging.WARN("Exception when creating get_train_objective_info: {}".format(str(e)))
return ""
from torch import Tensor
from torch import nn
import os
import json
from ..util import import_from_string
from collections import OrderedDict
from typing import List, Dict, Union, Tuple
class Asym(nn.Sequential):
def __init__(self, sub_modules: Dict[str, List[nn.Module]], allow_empty_key: bool = True):
"""
This model allows to create asymmetric SentenceTransformer models, that apply different models depending on the specified input key.
In the below example, we create two different Dense models for 'query' and 'doc'. Text that is passed as {'query': 'My query'} will
be passed along along the first Dense model, and text that will be passed as {'doc': 'My document'} will use the other Dense model.
Note, that when you call encode(), that only inputs of the same type can be encoded. Mixed-Types cannot be encoded.
Example::
word_embedding_model = models.Transformer(model_name)
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension())
asym_model = models.Asym({'query': [models.Dense(word_embedding_model.get_word_embedding_dimension(), 128)], 'doc': [models.Dense(word_embedding_model.get_word_embedding_dimension(), 128)]})
model = SentenceTransformer(modules=[word_embedding_model, pooling_model, asym_model])
model.encode([{'query': 'Q1'}, {'query': 'Q2'}]
model.encode([{'doc': 'Doc1'}, {'doc': 'Doc2'}]
#You can train it with InputExample like this. Note, that the order must always be the same:
train_example = InputExample(texts=[{'query': 'Train query'}, {'doc': 'Document'}], label=1)
:param sub_modules: Dict in the format str -> List[models]. The models in the specified list will be applied for input marked with the respective key.
:param allow_empty_key: If true, inputs without a key can be processed. If false, an exception will be thrown if no key is specified.
"""
self.sub_modules = sub_modules
self.allow_empty_key = allow_empty_key
ordered_dict = OrderedDict()
for name, models in sub_modules.items():
if not isinstance(models, List):
models = [models]
for idx, model in enumerate(models):
ordered_dict[name + "-" + str(idx)] = model
super(Asym, self).__init__(ordered_dict)
def forward(self, features: Dict[str, Tensor]):
if "text_keys" in features and len(features["text_keys"]) > 0:
text_key = features["text_keys"][0]
for model in self.sub_modules[text_key]:
features = model(features)
elif not self.allow_empty_key:
raise ValueError("Input did not specify any keys and allow_empty_key is False")
return features
def get_sentence_embedding_dimension(self) -> int:
for name in self.sub_modules:
if hasattr(self.sub_modules[name][0], "get_sentence_embedding_dimension"):
return self.sub_modules[name][0].get_sentence_embedding_dimension()
return None
def save(self, output_path):
model_lookup = {}
model_types = {}
model_structure = {}
for name, models in self.sub_modules.items():
model_structure[name] = []
for model in models:
model_id = str(id(model)) + "_" + type(model).__name__
model_lookup[model_id] = model
model_types[model_id] = type(model).__module__
model_structure[name].append(model_id)
for model_id, model in model_lookup.items():
model_path = os.path.join(output_path, str(model_id))
os.makedirs(model_path, exist_ok=True)
model.save(model_path)
with open(os.path.join(output_path, "config.json"), "w", encoding="utf8") as fOut:
json.dump(
{
"types": model_types,
"structure": model_structure,
"parameters": {"allow_empty_key": self.allow_empty_key},
},
fOut,
indent=2,
)
def tokenize(self, texts: Union[List[str], List[Tuple[str, str]]], **kwargs):
"""
Tokenizes a text and maps tokens to token-ids
"""
if not isinstance(texts[0], dict):
raise AttributeError("Asym. model requires that texts are passed as dicts: {'key': 'text'}")
module_key = None
for lookup in texts:
text_key, text = next(iter(lookup.items()))
if module_key is None:
module_key = text_key
assert text_key == module_key # Mixed batches are not allowed
return self.sub_modules[module_key][0].tokenize(texts, **kwargs)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
modules = {}
for model_id, model_type in config["types"].items():
module_class = import_from_string(model_type)
module = module_class.load(os.path.join(input_path, model_id))
modules[model_id] = module
model_structure = {}
for key_name, models_list in config["structure"].items():
model_structure[key_name] = []
for model_id in models_list:
model_structure[key_name].append(modules[model_id])
model = Asym(model_structure, **config["parameters"])
return model
import torch
from torch import Tensor
from torch import nn
from typing import List, Dict
import os
import json
import logging
import numpy as np
from .tokenizer import WhitespaceTokenizer
logger = logging.getLogger(__name__)
class BoW(nn.Module):
"""Implements a Bag-of-Words (BoW) model to derive sentence embeddings.
A weighting can be added to allow the generation of tf-idf vectors. The output vector has the size of the vocab.
"""
def __init__(
self,
vocab: List[str],
word_weights: Dict[str, float] = {},
unknown_word_weight: float = 1,
cumulative_term_frequency: bool = True,
):
super(BoW, self).__init__()
vocab = list(set(vocab)) # Ensure vocab is unique
self.config_keys = ["vocab", "word_weights", "unknown_word_weight", "cumulative_term_frequency"]
self.vocab = vocab
self.word_weights = word_weights
self.unknown_word_weight = unknown_word_weight
self.cumulative_term_frequency = cumulative_term_frequency
# Maps wordIdx -> word weight
self.weights = []
num_unknown_words = 0
for word in vocab:
weight = unknown_word_weight
if word in word_weights:
weight = word_weights[word]
elif word.lower() in word_weights:
weight = word_weights[word.lower()]
else:
num_unknown_words += 1
self.weights.append(weight)
logger.info(
"{} out of {} words without a weighting value. Set weight to {}".format(
num_unknown_words, len(vocab), unknown_word_weight
)
)
self.tokenizer = WhitespaceTokenizer(vocab, stop_words=set(), do_lower_case=False)
self.sentence_embedding_dimension = len(vocab)
def forward(self, features: Dict[str, Tensor]):
# Nothing to do, everything is done in get_sentence_features
return features
def tokenize(self, texts: List[str], **kwargs) -> List[int]:
tokenized = [self.tokenizer.tokenize(text, **kwargs) for text in texts]
return self.get_sentence_features(tokenized)
def get_sentence_embedding_dimension(self):
return self.sentence_embedding_dimension
def get_sentence_features(self, tokenized_texts: List[List[int]], pad_seq_length: int = 0):
vectors = []
for tokens in tokenized_texts:
vector = np.zeros(self.get_sentence_embedding_dimension(), dtype=np.float32)
for token in tokens:
if self.cumulative_term_frequency:
vector[token] += self.weights[token]
else:
vector[token] = self.weights[token]
vectors.append(vector)
return {"sentence_embedding": torch.tensor(vectors, dtype=torch.float)}
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
return BoW(**config)
from typing import Union
from torch import nn
import transformers
import torch
from PIL import Image
class CLIPModel(nn.Module):
def __init__(self, model_name: str = "openai/clip-vit-base-patch32", processor_name=None):
super(CLIPModel, self).__init__()
if processor_name is None:
processor_name = model_name
self.model = transformers.CLIPModel.from_pretrained(model_name)
self.processor = transformers.CLIPProcessor.from_pretrained(processor_name)
def __repr__(self):
return "CLIPModel()"
def forward(self, features):
image_embeds = []
text_embeds = []
if "pixel_values" in features:
vision_outputs = self.model.vision_model(pixel_values=features["pixel_values"])
image_embeds = self.model.visual_projection(vision_outputs[1])
if "input_ids" in features:
text_outputs = self.model.text_model(
input_ids=features.get("input_ids"),
attention_mask=features.get("attention_mask", None),
position_ids=features.get("position_ids", None),
output_attentions=features.get("output_attentions", None),
output_hidden_states=features.get("output_hidden_states", None),
)
text_embeds = self.model.text_projection(text_outputs[1])
sentence_embedding = []
image_features = iter(image_embeds)
text_features = iter(text_embeds)
for idx, input_type in enumerate(features["image_text_info"]):
if input_type == 0:
sentence_embedding.append(next(image_features))
else:
sentence_embedding.append(next(text_features))
features["sentence_embedding"] = torch.stack(sentence_embedding).float()
return features
def tokenize(self, texts, padding: Union[str, bool] = True):
images = []
texts_values = []
image_text_info = []
for idx, data in enumerate(texts):
if isinstance(data, Image.Image): # An Image
images.append(data)
image_text_info.append(0)
else: # A text
texts_values.append(data)
image_text_info.append(1)
if len(texts_values) == 0:
texts_values = None
if len(images) == 0:
images = None
inputs = self.processor(text=texts_values, images=images, return_tensors="pt", padding=padding)
inputs["image_text_info"] = image_text_info
return inputs
def save(self, output_path: str):
self.model.save_pretrained(output_path)
self.processor.save_pretrained(output_path)
@staticmethod
def load(input_path: str):
return CLIPModel(model_name=input_path)
import torch
from torch import nn
from typing import List
import os
import json
class CNN(nn.Module):
"""CNN-layer with multiple kernel-sizes over the word embeddings"""
def __init__(
self,
in_word_embedding_dimension: int,
out_channels: int = 256,
kernel_sizes: List[int] = [1, 3, 5],
stride_sizes: List[int] = None,
):
nn.Module.__init__(self)
self.config_keys = ["in_word_embedding_dimension", "out_channels", "kernel_sizes"]
self.in_word_embedding_dimension = in_word_embedding_dimension
self.out_channels = out_channels
self.kernel_sizes = kernel_sizes
self.embeddings_dimension = out_channels * len(kernel_sizes)
self.convs = nn.ModuleList()
in_channels = in_word_embedding_dimension
if stride_sizes is None:
stride_sizes = [1] * len(kernel_sizes)
for kernel_size, stride in zip(kernel_sizes, stride_sizes):
padding_size = int((kernel_size - 1) / 2)
conv = nn.Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding_size,
)
self.convs.append(conv)
def forward(self, features):
token_embeddings = features["token_embeddings"]
token_embeddings = token_embeddings.transpose(1, -1)
vectors = [conv(token_embeddings) for conv in self.convs]
out = torch.cat(vectors, 1).transpose(1, -1)
features.update({"token_embeddings": out})
return features
def get_word_embedding_dimension(self) -> int:
return self.embeddings_dimension
def tokenize(self, text: str, **kwargs) -> List[int]:
raise NotImplementedError()
def save(self, output_path: str):
with open(os.path.join(output_path, "cnn_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
@staticmethod
def load(input_path: str):
with open(os.path.join(input_path, "cnn_config.json"), "r") as fIn:
config = json.load(fIn)
weights = torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
model = CNN(**config)
model.load_state_dict(weights)
return model
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
from ..util import fullname, import_from_string
class Dense(nn.Module):
"""Feed-forward function with activiation function.
This layer takes a fixed-sized sentence embedding and passes it through a feed-forward layer. Can be used to generate deep averaging networks (DAN).
:param in_features: Size of the input dimension
:param out_features: Output size
:param bias: Add a bias vector
:param activation_function: Pytorch activation function applied on output
:param init_weight: Initial value for the matrix of the linear layer
:param init_bias: Initial value for the bias of the linear layer
"""
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = True,
activation_function=nn.Tanh(),
init_weight: Tensor = None,
init_bias: Tensor = None,
):
super(Dense, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.bias = bias
self.activation_function = activation_function
self.linear = nn.Linear(in_features, out_features, bias=bias)
if init_weight is not None:
self.linear.weight = nn.Parameter(init_weight)
if init_bias is not None:
self.linear.bias = nn.Parameter(init_bias)
def forward(self, features: Dict[str, Tensor]):
features.update({"sentence_embedding": self.activation_function(self.linear(features["sentence_embedding"]))})
return features
def get_sentence_embedding_dimension(self) -> int:
return self.out_features
def get_config_dict(self):
return {
"in_features": self.in_features,
"out_features": self.out_features,
"bias": self.bias,
"activation_function": fullname(self.activation_function),
}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
def __repr__(self):
return "Dense({})".format(self.get_config_dict())
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
config["activation_function"] = import_from_string(config["activation_function"])()
model = Dense(**config)
model.load_state_dict(
torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
)
return model
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class Dropout(nn.Module):
"""Dropout layer.
:param dropout: Sets a dropout value for dense layer.
"""
def __init__(self, dropout: float = 0.2):
super(Dropout, self).__init__()
self.dropout = dropout
self.dropout_layer = nn.Dropout(self.dropout)
def forward(self, features: Dict[str, Tensor]):
features.update({"sentence_embedding": self.dropout_layer(features["sentence_embedding"])})
return features
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump({"dropout": self.dropout}, fOut)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
model = Dropout(**config)
return model
import torch
from torch import nn
from typing import List
import os
import json
class LSTM(nn.Module):
"""
Bidirectional LSTM running over word embeddings.
"""
def __init__(
self,
word_embedding_dimension: int,
hidden_dim: int,
num_layers: int = 1,
dropout: float = 0,
bidirectional: bool = True,
):
nn.Module.__init__(self)
self.config_keys = ["word_embedding_dimension", "hidden_dim", "num_layers", "dropout", "bidirectional"]
self.word_embedding_dimension = word_embedding_dimension
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.dropout = dropout
self.bidirectional = bidirectional
self.embeddings_dimension = hidden_dim
if self.bidirectional:
self.embeddings_dimension *= 2
self.encoder = nn.LSTM(
word_embedding_dimension,
hidden_dim,
num_layers=num_layers,
dropout=dropout,
bidirectional=bidirectional,
batch_first=True,
)
def forward(self, features):
token_embeddings = features["token_embeddings"]
sentence_lengths = torch.clamp(features["sentence_lengths"], min=1)
packed = nn.utils.rnn.pack_padded_sequence(
token_embeddings, sentence_lengths.cpu(), batch_first=True, enforce_sorted=False
)
packed = self.encoder(packed)
unpack = nn.utils.rnn.pad_packed_sequence(packed[0], batch_first=True)[0]
features.update({"token_embeddings": unpack})
return features
def get_word_embedding_dimension(self) -> int:
return self.embeddings_dimension
def tokenize(self, text: str, **kwargs) -> List[int]:
raise NotImplementedError()
def save(self, output_path: str):
with open(os.path.join(output_path, "lstm_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
@staticmethod
def load(input_path: str):
with open(os.path.join(input_path, "lstm_config.json"), "r") as fIn:
config = json.load(fIn)
weights = torch.load(os.path.join(input_path, "pytorch_model.bin"))
model = LSTM(**config)
model.load_state_dict(weights)
return model
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class LayerNorm(nn.Module):
def __init__(self, dimension: int):
super(LayerNorm, self).__init__()
self.dimension = dimension
self.norm = nn.LayerNorm(dimension)
def forward(self, features: Dict[str, Tensor]):
features["sentence_embedding"] = self.norm(features["sentence_embedding"])
return features
def get_sentence_embedding_dimension(self):
return self.dimension
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump({"dimension": self.dimension}, fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
model = LayerNorm(**config)
model.load_state_dict(
torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
)
return model
from torch import Tensor
from torch import nn
from typing import Dict
import torch.nn.functional as F
class Normalize(nn.Module):
"""
This layer normalizes embeddings to unit length
"""
def __init__(self):
super(Normalize, self).__init__()
def forward(self, features: Dict[str, Tensor]):
features.update({"sentence_embedding": F.normalize(features["sentence_embedding"], p=2, dim=1)})
return features
def save(self, output_path):
pass
@staticmethod
def load(input_path):
return Normalize()
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class Pooling(nn.Module):
"""Performs pooling (max or mean) on the token embeddings.
Using pooling, it generates from a variable sized sentence a fixed sized sentence embedding. This layer also allows
to use the CLS token if it is returned by the underlying word embedding model. You can concatenate multiple poolings
together.
:param word_embedding_dimension: Dimensions for the word embeddings
:param pooling_mode: Either "cls", "lasttoken", "max", "mean", "mean_sqrt_len_tokens", or "weightedmean". If set, overwrites the other pooling_mode_* settings
:param pooling_mode_cls_token: Use the first token (CLS token) as text representations
:param pooling_mode_max_tokens: Use max in each dimension over all tokens.
:param pooling_mode_mean_tokens: Perform mean-pooling
:param pooling_mode_mean_sqrt_len_tokens: Perform mean-pooling, but divide by sqrt(input_length).
:param pooling_mode_weightedmean_tokens: Perform (position) weighted mean pooling. See `SGPT: GPT Sentence Embeddings for Semantic Search <https://arxiv.org/abs/2202.08904>`_.
:param pooling_mode_lasttoken: Perform last token pooling. See `SGPT: GPT Sentence Embeddings for Semantic Search <https://arxiv.org/abs/2202.08904>`_ and `Text and Code Embeddings by Contrastive Pre-Training <https://arxiv.org/abs/2201.10005>`_.
"""
POOLING_MODES = (
"cls",
"lasttoken",
"max",
"mean",
"mean_sqrt_len_tokens",
"weightedmean",
)
def __init__(
self,
word_embedding_dimension: int,
pooling_mode: str = None,
pooling_mode_cls_token: bool = False,
pooling_mode_max_tokens: bool = False,
pooling_mode_mean_tokens: bool = True,
pooling_mode_mean_sqrt_len_tokens: bool = False,
pooling_mode_weightedmean_tokens: bool = False,
pooling_mode_lasttoken: bool = False,
include_prompt=True,
) -> None:
super(Pooling, self).__init__()
self.config_keys = [
"word_embedding_dimension",
"pooling_mode_cls_token",
"pooling_mode_mean_tokens",
"pooling_mode_max_tokens",
"pooling_mode_mean_sqrt_len_tokens",
"pooling_mode_weightedmean_tokens",
"pooling_mode_lasttoken",
"include_prompt",
]
if pooling_mode is not None: # Set pooling mode by string
pooling_mode = pooling_mode.lower()
if pooling_mode not in self.POOLING_MODES:
raise ValueError(
f"Set invalid pooling mode: {pooling_mode}. Valid pooling modes are: {self.POOLING_MODES}."
)
pooling_mode_cls_token = pooling_mode == "cls"
pooling_mode_max_tokens = pooling_mode == "max"
pooling_mode_mean_tokens = pooling_mode == "mean"
pooling_mode_mean_sqrt_len_tokens = pooling_mode == "mean_sqrt_len_tokens"
pooling_mode_weightedmean_tokens = pooling_mode == "weightedmean"
pooling_mode_lasttoken = pooling_mode == "lasttoken"
self.word_embedding_dimension = word_embedding_dimension
self.pooling_mode_cls_token = pooling_mode_cls_token
self.pooling_mode_mean_tokens = pooling_mode_mean_tokens
self.pooling_mode_max_tokens = pooling_mode_max_tokens
self.pooling_mode_mean_sqrt_len_tokens = pooling_mode_mean_sqrt_len_tokens
self.pooling_mode_weightedmean_tokens = pooling_mode_weightedmean_tokens
self.pooling_mode_lasttoken = pooling_mode_lasttoken
self.include_prompt = include_prompt
pooling_mode_multiplier = sum(
[
pooling_mode_cls_token,
pooling_mode_max_tokens,
pooling_mode_mean_tokens,
pooling_mode_mean_sqrt_len_tokens,
pooling_mode_weightedmean_tokens,
pooling_mode_lasttoken,
]
)
self.pooling_output_dimension = pooling_mode_multiplier * word_embedding_dimension
def __repr__(self):
return "Pooling({})".format(self.get_config_dict())
def get_pooling_mode_str(self) -> str:
"""
Returns the pooling mode as string
"""
modes = []
if self.pooling_mode_cls_token:
modes.append("cls")
if self.pooling_mode_mean_tokens:
modes.append("mean")
if self.pooling_mode_max_tokens:
modes.append("max")
if self.pooling_mode_mean_sqrt_len_tokens:
modes.append("mean_sqrt_len_tokens")
if self.pooling_mode_weightedmean_tokens:
modes.append("weightedmean")
if self.pooling_mode_lasttoken:
modes.append("lasttoken")
return "+".join(modes)
def forward(self, features: Dict[str, Tensor]):
token_embeddings = features["token_embeddings"]
attention_mask = features["attention_mask"]
if not self.include_prompt and "prompt_length" in features:
attention_mask[:, : features["prompt_length"]] = 0
## Pooling strategy
output_vectors = []
if self.pooling_mode_cls_token:
cls_token = features.get("cls_token_embeddings", token_embeddings[:, 0]) # Take first token by default
output_vectors.append(cls_token)
if self.pooling_mode_max_tokens:
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
token_embeddings[input_mask_expanded == 0] = -1e9 # Set padding tokens to large negative value
max_over_time = torch.max(token_embeddings, 1)[0]
output_vectors.append(max_over_time)
if self.pooling_mode_mean_tokens or self.pooling_mode_mean_sqrt_len_tokens:
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
# If tokens are weighted (by WordWeights layer), feature 'token_weights_sum' will be present
if "token_weights_sum" in features:
sum_mask = features["token_weights_sum"].unsqueeze(-1).expand(sum_embeddings.size())
else:
sum_mask = input_mask_expanded.sum(1)
sum_mask = torch.clamp(sum_mask, min=1e-9)
if self.pooling_mode_mean_tokens:
output_vectors.append(sum_embeddings / sum_mask)
if self.pooling_mode_mean_sqrt_len_tokens:
output_vectors.append(sum_embeddings / torch.sqrt(sum_mask))
if self.pooling_mode_weightedmean_tokens:
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
# token_embeddings shape: bs, seq, hidden_dim
weights = (
torch.arange(start=1, end=token_embeddings.shape[1] + 1)
.unsqueeze(0)
.unsqueeze(-1)
.expand(token_embeddings.size())
.to(token_embeddings.dtype)
.to(token_embeddings.device)
)
assert weights.shape == token_embeddings.shape == input_mask_expanded.shape
input_mask_expanded = input_mask_expanded * weights
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
# If tokens are weighted (by WordWeights layer), feature 'token_weights_sum' will be present
if "token_weights_sum" in features:
sum_mask = features["token_weights_sum"].unsqueeze(-1).expand(sum_embeddings.size())
else:
sum_mask = input_mask_expanded.sum(1)
sum_mask = torch.clamp(sum_mask, min=1e-9)
output_vectors.append(sum_embeddings / sum_mask)
if self.pooling_mode_lasttoken:
bs, seq_len, hidden_dim = token_embeddings.shape
# attention_mask shape: (bs, seq_len)
# Get shape [bs] indices of the last token (i.e. the last token for each batch item)
# Use flip and max() to get the last index of 1 in the attention mask
if torch.jit.is_tracing():
# Avoid tracing the argmax with int64 input that can not be handled by ONNX Runtime: https://github.com/microsoft/onnxruntime/issues/10068
attention_mask = attention_mask.to(torch.int32)
values, indices = attention_mask.flip(1).max(1)
indices = torch.where(values == 0, seq_len - 1, indices)
gather_indices = seq_len - indices - 1
# Turn indices from shape [bs] --> [bs, 1, hidden_dim]
gather_indices = gather_indices.unsqueeze(-1).repeat(1, hidden_dim)
gather_indices = gather_indices.unsqueeze(1)
assert gather_indices.shape == (bs, 1, hidden_dim)
# Gather along the 1st dim (seq_len) (bs, seq_len, hidden_dim -> bs, hidden_dim)
# Actually no need for the attention mask as we gather the last token where attn_mask = 1
# but as we set some indices (which shouldn't be attended to) to 0 with clamp, we
# use the attention mask to ignore them again
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
embedding = torch.gather(token_embeddings * input_mask_expanded, 1, gather_indices).squeeze(dim=1)
output_vectors.append(embedding)
output_vector = torch.cat(output_vectors, 1)
features.update({"sentence_embedding": output_vector})
return features
def get_sentence_embedding_dimension(self):
return self.pooling_output_dimension
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
return Pooling(**config)
from torch import nn
from transformers import AutoModel, AutoTokenizer, AutoConfig, T5Config, MT5Config
import json
from typing import List, Dict, Optional, Union, Tuple
import os
class Transformer(nn.Module):
"""Huggingface AutoModel to generate token embeddings.
Loads the correct class, e.g. BERT / RoBERTa etc.
:param model_name_or_path: Huggingface models name (https://huggingface.co/models)
:param max_seq_length: Truncate any inputs longer than max_seq_length
:param model_args: Arguments (key, value pairs) passed to the Huggingface Transformers model
:param cache_dir: Cache dir for Huggingface Transformers to store/load models
:param tokenizer_args: Arguments (key, value pairs) passed to the Huggingface Tokenizer model
:param do_lower_case: If true, lowercases the input (independent if the model is cased or not)
:param tokenizer_name_or_path: Name or path of the tokenizer. When None, then model_name_or_path is used
"""
def __init__(
self,
model_name_or_path: str,
max_seq_length: Optional[int] = None,
model_args: Dict = {},
cache_dir: Optional[str] = None,
tokenizer_args: Dict = {},
do_lower_case: bool = False,
tokenizer_name_or_path: str = None,
):
super(Transformer, self).__init__()
self.config_keys = ["max_seq_length", "do_lower_case"]
self.do_lower_case = do_lower_case
config = AutoConfig.from_pretrained(model_name_or_path, **model_args, cache_dir=cache_dir)
self._load_model(model_name_or_path, config, cache_dir, **model_args)
self.tokenizer = AutoTokenizer.from_pretrained(
tokenizer_name_or_path if tokenizer_name_or_path is not None else model_name_or_path,
cache_dir=cache_dir,
**tokenizer_args,
)
# No max_seq_length set. Try to infer from model
if max_seq_length is None:
if (
hasattr(self.auto_model, "config")
and hasattr(self.auto_model.config, "max_position_embeddings")
and hasattr(self.tokenizer, "model_max_length")
):
max_seq_length = min(self.auto_model.config.max_position_embeddings, self.tokenizer.model_max_length)
self.max_seq_length = max_seq_length
if tokenizer_name_or_path is not None:
self.auto_model.config.tokenizer_class = self.tokenizer.__class__.__name__
def _load_model(self, model_name_or_path, config, cache_dir, **model_args):
"""Loads the transformer model"""
if isinstance(config, T5Config):
self._load_t5_model(model_name_or_path, config, cache_dir, **model_args)
elif isinstance(config, MT5Config):
self._load_mt5_model(model_name_or_path, config, cache_dir, **model_args)
else:
self.auto_model = AutoModel.from_pretrained(
model_name_or_path, config=config, cache_dir=cache_dir, **model_args
)
def _load_t5_model(self, model_name_or_path, config, cache_dir, **model_args):
"""Loads the encoder model from T5"""
from transformers import T5EncoderModel
T5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"]
self.auto_model = T5EncoderModel.from_pretrained(
model_name_or_path, config=config, cache_dir=cache_dir, **model_args
)
def _load_mt5_model(self, model_name_or_path, config, cache_dir, **model_args):
"""Loads the encoder model from T5"""
from transformers import MT5EncoderModel
MT5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"]
self.auto_model = MT5EncoderModel.from_pretrained(
model_name_or_path, config=config, cache_dir=cache_dir, **model_args
)
def __repr__(self):
return "Transformer({}) with Transformer model: {} ".format(
self.get_config_dict(), self.auto_model.__class__.__name__
)
def forward(self, features):
"""Returns token_embeddings, cls_token"""
trans_features = {"input_ids": features["input_ids"], "attention_mask": features["attention_mask"]}
if "token_type_ids" in features:
trans_features["token_type_ids"] = features["token_type_ids"]
output_states = self.auto_model(**trans_features, return_dict=False)
output_tokens = output_states[0]
features.update({"token_embeddings": output_tokens, "attention_mask": features["attention_mask"]})
if self.auto_model.config.output_hidden_states:
all_layer_idx = 2
if len(output_states) < 3: # Some models only output last_hidden_states and all_hidden_states
all_layer_idx = 1
hidden_states = output_states[all_layer_idx]
features.update({"all_layer_embeddings": hidden_states})
return features
def get_word_embedding_dimension(self) -> int:
return self.auto_model.config.hidden_size
def tokenize(self, texts: Union[List[str], List[Dict], List[Tuple[str, str]]], padding: Union[str, bool] = True):
"""
Tokenizes a text and maps tokens to token-ids
"""
output = {}
if isinstance(texts[0], str):
to_tokenize = [texts]
elif isinstance(texts[0], dict):
to_tokenize = []
output["text_keys"] = []
for lookup in texts:
text_key, text = next(iter(lookup.items()))
to_tokenize.append(text)
output["text_keys"].append(text_key)
to_tokenize = [to_tokenize]
else:
batch1, batch2 = [], []
for text_tuple in texts:
batch1.append(text_tuple[0])
batch2.append(text_tuple[1])
to_tokenize = [batch1, batch2]
# strip
to_tokenize = [[str(s).strip() for s in col] for col in to_tokenize]
# Lowercase
if self.do_lower_case:
to_tokenize = [[s.lower() for s in col] for col in to_tokenize]
output.update(
self.tokenizer(
*to_tokenize,
padding=padding,
truncation="longest_first",
return_tensors="pt",
max_length=self.max_seq_length,
)
)
return output
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path: str, safe_serialization: bool = True):
self.auto_model.save_pretrained(output_path, safe_serialization=safe_serialization)
self.tokenizer.save_pretrained(output_path)
with open(os.path.join(output_path, "sentence_bert_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path: str):
# Old classes used other config names than 'sentence_bert_config.json'
for config_name in [
"sentence_bert_config.json",
"sentence_roberta_config.json",
"sentence_distilbert_config.json",
"sentence_camembert_config.json",
"sentence_albert_config.json",
"sentence_xlm-roberta_config.json",
"sentence_xlnet_config.json",
]:
sbert_config_path = os.path.join(input_path, config_name)
if os.path.exists(sbert_config_path):
break
with open(sbert_config_path) as fIn:
config = json.load(fIn)
# Don't allow configs to set trust_remote_code
if "model_args" in config:
config["model_args"].pop("trust_remote_code")
return Transformer(model_name_or_path=input_path, **config)
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class WeightedLayerPooling(nn.Module):
"""
Token embeddings are weighted mean of their different hidden layer representations
"""
def __init__(
self, word_embedding_dimension, num_hidden_layers: int = 12, layer_start: int = 4, layer_weights=None
):
super(WeightedLayerPooling, self).__init__()
self.config_keys = ["word_embedding_dimension", "layer_start", "num_hidden_layers"]
self.word_embedding_dimension = word_embedding_dimension
self.layer_start = layer_start
self.num_hidden_layers = num_hidden_layers
self.layer_weights = (
layer_weights
if layer_weights is not None
else nn.Parameter(torch.tensor([1] * (num_hidden_layers + 1 - layer_start), dtype=torch.float))
)
def forward(self, features: Dict[str, Tensor]):
ft_all_layers = features["all_layer_embeddings"]
all_layer_embedding = torch.stack(ft_all_layers)
all_layer_embedding = all_layer_embedding[self.layer_start :, :, :, :] # Start from 4th layers output
weight_factor = self.layer_weights.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1).expand(all_layer_embedding.size())
weighted_average = (weight_factor * all_layer_embedding).sum(dim=0) / self.layer_weights.sum()
features.update({"token_embeddings": weighted_average})
return features
def get_word_embedding_dimension(self):
return self.word_embedding_dimension
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
model = WeightedLayerPooling(**config)
model.load_state_dict(
torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
)
return model
import torch
from torch import nn
from typing import List
import logging
import gzip
from tqdm import tqdm
import numpy as np
import os
import json
from ..util import import_from_string, fullname, http_get
from .tokenizer import WordTokenizer, WhitespaceTokenizer
logger = logging.getLogger(__name__)
class WordEmbeddings(nn.Module):
def __init__(
self,
tokenizer: WordTokenizer,
embedding_weights,
update_embeddings: bool = False,
max_seq_length: int = 1000000,
):
nn.Module.__init__(self)
if isinstance(embedding_weights, list):
embedding_weights = np.asarray(embedding_weights)
if isinstance(embedding_weights, np.ndarray):
embedding_weights = torch.from_numpy(embedding_weights)
num_embeddings, embeddings_dimension = embedding_weights.size()
self.embeddings_dimension = embeddings_dimension
self.emb_layer = nn.Embedding(num_embeddings, embeddings_dimension)
self.emb_layer.load_state_dict({"weight": embedding_weights})
self.emb_layer.weight.requires_grad = update_embeddings
self.tokenizer = tokenizer
self.update_embeddings = update_embeddings
self.max_seq_length = max_seq_length
def forward(self, features):
token_embeddings = self.emb_layer(features["input_ids"])
cls_tokens = None
features.update(
{
"token_embeddings": token_embeddings,
"cls_token_embeddings": cls_tokens,
"attention_mask": features["attention_mask"],
}
)
return features
def tokenize(self, texts: List[str], **kwargs):
tokenized_texts = [self.tokenizer.tokenize(text, **kwargs) for text in texts]
sentence_lengths = [len(tokens) for tokens in tokenized_texts]
max_len = max(sentence_lengths)
input_ids = []
attention_masks = []
for tokens in tokenized_texts:
padding = [0] * (max_len - len(tokens))
input_ids.append(tokens + padding)
attention_masks.append([1] * len(tokens) + padding)
output = {
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"attention_mask": torch.tensor(attention_masks, dtype=torch.long),
"sentence_lengths": torch.tensor(sentence_lengths, dtype=torch.long),
}
return output
def get_word_embedding_dimension(self) -> int:
return self.embeddings_dimension
def save(self, output_path: str):
with open(os.path.join(output_path, "wordembedding_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
self.tokenizer.save(output_path)
def get_config_dict(self):
return {
"tokenizer_class": fullname(self.tokenizer),
"update_embeddings": self.update_embeddings,
"max_seq_length": self.max_seq_length,
}
@staticmethod
def load(input_path: str):
with open(os.path.join(input_path, "wordembedding_config.json"), "r") as fIn:
config = json.load(fIn)
tokenizer_class = import_from_string(config["tokenizer_class"])
tokenizer = tokenizer_class.load(input_path)
weights = torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
embedding_weights = weights["emb_layer.weight"]
model = WordEmbeddings(
tokenizer=tokenizer, embedding_weights=embedding_weights, update_embeddings=config["update_embeddings"]
)
return model
@staticmethod
def from_text_file(
embeddings_file_path: str,
update_embeddings: bool = False,
item_separator: str = " ",
tokenizer=WhitespaceTokenizer(),
max_vocab_size: int = None,
):
logger.info("Read in embeddings file {}".format(embeddings_file_path))
if not os.path.exists(embeddings_file_path):
logger.info("{} does not exist, try to download from server".format(embeddings_file_path))
if "/" in embeddings_file_path or "\\" in embeddings_file_path:
raise ValueError("Embeddings file not found: {}".format(embeddings_file_path))
url = "https://public.ukp.informatik.tu-darmstadt.de/reimers/embeddings/" + embeddings_file_path
http_get(url, embeddings_file_path)
embeddings_dimension = None
vocab = []
embeddings = []
with gzip.open(embeddings_file_path, "rt", encoding="utf8") if embeddings_file_path.endswith(".gz") else open(
embeddings_file_path, encoding="utf8"
) as fIn:
iterator = tqdm(fIn, desc="Load Word Embeddings", unit="Embeddings")
for line in iterator:
split = line.rstrip().split(item_separator)
if not vocab and len(split) == 2: # Handle Word2vec format
continue
word = split[0]
if embeddings_dimension is None:
embeddings_dimension = len(split) - 1
vocab.append("PADDING_TOKEN")
embeddings.append(np.zeros(embeddings_dimension))
if (
len(split) - 1
) != embeddings_dimension: # Assure that all lines in the embeddings file are of the same length
logger.error(
"ERROR: A line in the embeddings file had more or less dimensions than expected. Skip token."
)
continue
vector = np.array([float(num) for num in split[1:]])
embeddings.append(vector)
vocab.append(word)
if max_vocab_size is not None and max_vocab_size > 0 and len(vocab) > max_vocab_size:
break
embeddings = np.asarray(embeddings)
tokenizer.set_vocab(vocab)
return WordEmbeddings(
tokenizer=tokenizer, embedding_weights=embeddings, update_embeddings=update_embeddings
)
import torch
from torch import Tensor
from torch import nn
from typing import List, Dict
import os
import json
import logging
logger = logging.getLogger(__name__)
class WordWeights(nn.Module):
"""This model can weight word embeddings, for example, with idf-values."""
def __init__(self, vocab: List[str], word_weights: Dict[str, float], unknown_word_weight: float = 1):
"""
:param vocab:
Vocabulary of the tokenizer
:param word_weights:
Mapping of tokens to a float weight value. Words embeddings are multiplied by this float value. Tokens in word_weights must not be equal to the vocab (can contain more or less values)
:param unknown_word_weight:
Weight for words in vocab, that do not appear in the word_weights lookup. These can be for example rare words in the vocab, where no weight exists.
"""
super(WordWeights, self).__init__()
self.config_keys = ["vocab", "word_weights", "unknown_word_weight"]
self.vocab = vocab
self.word_weights = word_weights
self.unknown_word_weight = unknown_word_weight
weights = []
num_unknown_words = 0
for word in vocab:
weight = unknown_word_weight
if word in word_weights:
weight = word_weights[word]
elif word.lower() in word_weights:
weight = word_weights[word.lower()]
else:
num_unknown_words += 1
weights.append(weight)
logger.info(
"{} of {} words without a weighting value. Set weight to {}".format(
num_unknown_words, len(vocab), unknown_word_weight
)
)
self.emb_layer = nn.Embedding(len(vocab), 1)
self.emb_layer.load_state_dict({"weight": torch.FloatTensor(weights).unsqueeze(1)})
def forward(self, features: Dict[str, Tensor]):
attention_mask = features["attention_mask"]
token_embeddings = features["token_embeddings"]
# Compute a weight value for each token
token_weights_raw = self.emb_layer(features["input_ids"]).squeeze(-1)
token_weights = token_weights_raw * attention_mask.float()
token_weights_sum = torch.sum(token_weights, 1)
# Multiply embedding by token weight value
token_weights_expanded = token_weights.unsqueeze(-1).expand(token_embeddings.size())
token_embeddings = token_embeddings * token_weights_expanded
features.update({"token_embeddings": token_embeddings, "token_weights_sum": token_weights_sum})
return features
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
return WordWeights(**config)
from .Transformer import Transformer
from .Asym import Asym
from .BoW import BoW
from .CNN import CNN
from .Dense import Dense
from .Dropout import Dropout
from .LayerNorm import LayerNorm
from .LSTM import LSTM
from .Normalize import Normalize
from .Pooling import Pooling
from .WeightedLayerPooling import WeightedLayerPooling
from .WordEmbeddings import WordEmbeddings
from .WordWeights import WordWeights
from .CLIPModel import CLIPModel
__all__ = [
"Transformer",
"Asym",
"BoW",
"CNN",
"Dense",
"Dropout",
"LayerNorm",
"LSTM",
"Normalize",
"Pooling",
"WeightedLayerPooling",
"WordEmbeddings",
"WordWeights",
"CLIPModel",
]
from typing import List, Iterable
import collections
import string
import os
import json
import logging
from .WordTokenizer import WordTokenizer, ENGLISH_STOP_WORDS
from transformers.utils.import_utils import is_nltk_available, NLTK_IMPORT_ERROR
logger = logging.getLogger(__name__)
class PhraseTokenizer(WordTokenizer):
"""Tokenizes the text with respect to existent phrases in the vocab.
This tokenizers respects phrases that are in the vocab. Phrases are separated with 'ngram_separator', for example,
in Google News word2vec file, ngrams are separated with a _ like New_York. These phrases are detected in text and merged as one special token. (New York is the ... => [New_York, is, the])
"""
def __init__(
self,
vocab: Iterable[str] = [],
stop_words: Iterable[str] = ENGLISH_STOP_WORDS,
do_lower_case: bool = False,
ngram_separator: str = "_",
max_ngram_length: int = 5,
):
if not is_nltk_available():
raise ImportError(NLTK_IMPORT_ERROR.format(self.__class__.__name__))
self.stop_words = set(stop_words)
self.do_lower_case = do_lower_case
self.ngram_separator = ngram_separator
self.max_ngram_length = max_ngram_length
self.set_vocab(vocab)
def get_vocab(self):
return self.vocab
def set_vocab(self, vocab: Iterable[str]):
self.vocab = vocab
self.word2idx = collections.OrderedDict([(word, idx) for idx, word in enumerate(vocab)])
# Check for ngram in vocab
self.ngram_lookup = set()
self.ngram_lengths = set()
for word in vocab:
if self.ngram_separator is not None and self.ngram_separator in word:
# Sum words might me malformed in e.g. google news word2vec, containing two or more _ after each other
ngram_count = word.count(self.ngram_separator) + 1
if self.ngram_separator + self.ngram_separator not in word and ngram_count <= self.max_ngram_length:
self.ngram_lookup.add(word)
self.ngram_lengths.add(ngram_count)
if len(vocab) > 0:
logger.info("PhraseTokenizer - Phrase ngram lengths: {}".format(self.ngram_lengths))
logger.info("PhraseTokenizer - Num phrases: {}".format(len(self.ngram_lookup)))
def tokenize(self, text: str, **kwargs) -> List[int]:
from nltk import word_tokenize
tokens = word_tokenize(text, preserve_line=True)
# phrase detection
for ngram_len in sorted(self.ngram_lengths, reverse=True):
idx = 0
while idx <= len(tokens) - ngram_len:
ngram = self.ngram_separator.join(tokens[idx : idx + ngram_len])
if ngram in self.ngram_lookup:
tokens[idx : idx + ngram_len] = [ngram]
elif ngram.lower() in self.ngram_lookup:
tokens[idx : idx + ngram_len] = [ngram.lower()]
idx += 1
# Map tokens to idx, filter stop words
tokens_filtered = []
for token in tokens:
if token in self.stop_words:
continue
elif token in self.word2idx:
tokens_filtered.append(self.word2idx[token])
continue
token = token.lower()
if token in self.stop_words:
continue
elif token in self.word2idx:
tokens_filtered.append(self.word2idx[token])
continue
token = token.strip(string.punctuation)
if token in self.stop_words:
continue
elif len(token) > 0 and token in self.word2idx:
tokens_filtered.append(self.word2idx[token])
continue
return tokens_filtered
def save(self, output_path: str):
with open(os.path.join(output_path, "phrasetokenizer_config.json"), "w") as fOut:
json.dump(
{
"vocab": list(self.word2idx.keys()),
"stop_words": list(self.stop_words),
"do_lower_case": self.do_lower_case,
"ngram_separator": self.ngram_separator,
"max_ngram_length": self.max_ngram_length,
},
fOut,
)
@staticmethod
def load(input_path: str):
with open(os.path.join(input_path, "phrasetokenizer_config.json"), "r") as fIn:
config = json.load(fIn)
return PhraseTokenizer(**config)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment