Commit 0fccd232 authored by Rayyyyy's avatar Rayyyyy
Browse files

First add

parents
Pipeline #1027 failed with stages
in 0 seconds
import random
from typing import Any, Dict, Iterable, List, Optional, Union
import warnings
from torch import Tensor, nn
import torch.nn.functional as F
from sentence_transformers import SentenceTransformer
from sentence_transformers.losses.CachedMultipleNegativesRankingLoss import CachedMultipleNegativesRankingLoss
class ForwardDecorator:
def __init__(self, fn):
self.fn = fn
self.dim = None
self.cache = []
self.cache_dim = None
self.idx = 0
def set_dim(self, dim):
self.dim = dim
self.idx = 0
def shrink(self, tensor: Tensor) -> Tensor:
tensor = tensor[..., : self.dim]
tensor = F.normalize(tensor, p=2, dim=-1)
return tensor
def __call__(self, features):
# Growing cache:
if self.cache_dim is None or self.cache_dim == self.dim:
output = self.fn(features)
self.cache.append(output)
self.cache_dim = self.dim
# Using cache:
else:
output = self.cache[self.idx]
output["token_embeddings"] = self.shrink(output["token_embeddings"])
output["sentence_embedding"] = self.shrink(output["sentence_embedding"])
self.idx += 1
return output
class MatryoshkaLoss(nn.Module):
def __init__(
self,
model: SentenceTransformer,
loss: nn.Module,
matryoshka_dims: List[int],
matryoshka_weights: Optional[List[Union[float, int]]] = None,
n_dims_per_step: int = -1,
) -> None:
"""
The MatryoshkaLoss can be seen as a loss *modifier* that allows you to use other loss functions at various
different embedding dimensions. This is useful for when you want to train a model where users have the option
to lower the embedding dimension to improve their embedding comparison speed and costs.
:param model: SentenceTransformer model
:param loss: The loss function to be used, e.g. :class:`MultipleNegativesRankingLoss`, :class:`CoSENTLoss`, etc.
:param matryoshka_dims: A list of embedding dimensions to be used for the loss function, e.g. [768, 512, 256, 128, 64].
:param matryoshka_weights: A list of weights to be used for the loss function, e.g. [1, 1, 1, 1, 1]. If None, then the
weights will be set to 1 for all dimensions.
:param n_dims_per_step: The number of dimensions to use per step. If -1, then all dimensions are used. If > 0, then
a random sample of n_dims_per_step dimensions are used per step. The default value is -1.
References:
- The concept was introduced in this paper: https://arxiv.org/abs/2205.13147
- `Matryoshka Embeddings <../../examples/training/matryoshka/README.html>`_
Requirements:
1. The base loss cannot be :class:`CachedMultipleNegativesRankingLoss`.
Relations:
- :class:`Matryoshka2dLoss` uses this loss in combination with :class:`AdaptiveLayerLoss` which allows for
layer reduction for faster inference.
Input:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| any | any |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, losses, InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('microsoft/mpnet-base')
train_examples = [
InputExample(texts=['Anchor 1', 'Positive 1']),
InputExample(texts=['Anchor 2', 'Positive 2']),
]
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=32)
train_loss = losses.MultipleNegativesRankingLoss(model=model)
train_loss = losses.MatryoshkaLoss(model, train_loss, [768, 512, 256, 128, 64])
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super().__init__()
self.model = model
self.loss = loss
if isinstance(loss, CachedMultipleNegativesRankingLoss):
warnings.warn("MatryoshkaLoss is not compatible with CachedMultipleNegativesRankingLoss.", stacklevel=2)
self.matryoshka_dims = matryoshka_dims
if matryoshka_weights is None:
matryoshka_weights = [1] * len(matryoshka_dims)
self.matryoshka_weights = matryoshka_weights
self.n_dims_per_step = n_dims_per_step
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor) -> Tensor:
original_forward = self.model.forward
decorated_forward = ForwardDecorator(original_forward)
self.model.forward = decorated_forward
dim_indices = range(len(self.matryoshka_dims))
if self.n_dims_per_step > 0 and self.n_dims_per_step < len(dim_indices):
dim_indices = random.sample(dim_indices, self.n_dims_per_step)
loss = 0.0
for idx in dim_indices:
dim = self.matryoshka_dims[idx]
weight = self.matryoshka_weights[idx]
decorated_forward.set_dim(dim)
loss += weight * self.loss(sentence_features, labels)
self.model.forward = original_forward
return loss
def get_config_dict(self) -> Dict[str, Any]:
return {
"loss": self.loss.__class__.__name__,
"matryoshka_dims": self.matryoshka_dims,
"matryoshka_weights": self.matryoshka_weights,
"n_dims_per_step": self.n_dims_per_step,
}
from .. import util
import torch
from torch import nn, Tensor
from typing import Iterable, Dict
import torch.nn.functional as F
class MegaBatchMarginLoss(nn.Module):
def __init__(
self,
model,
positive_margin: float = 0.8,
negative_margin: float = 0.3,
use_mini_batched_version: bool = True,
mini_batch_size: int = 50,
):
"""
Given a large batch (like 500 or more examples) of (anchor_i, positive_i) pairs, find for each pair in the batch
the hardest negative, i.e. find j != i such that cos_sim(anchor_i, positive_j) is maximal. Then create from this a
triplet (anchor_i, positive_i, positive_j) where positive_j serves as the negative for this triplet.
Then train as with the triplet loss.
:param model: SentenceTransformerModel
:param positive_margin: Positive margin, cos(anchor, positive) should be > positive_margin
:param negative_margin: Negative margin, cos(anchor, negative) should be < negative_margin
:param use_mini_batched_version: As large batch sizes require a lot of memory, we can use a mini-batched version.
We break down the large batch into smaller batches with fewer examples.
:param mini_batch_size: Size for the mini-batches. Should be a devisor for the batch size in your data loader.
References:
- This loss function was inspired by the ParaNMT paper: https://www.aclweb.org/anthology/P18-1042/
Requirements:
1. (anchor, positive) pairs
2. Large batches (500 or more examples)
Input:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (anchor, positive) pairs | none |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, InputExample, losses
from torch.utils.data import DataLoader
model = SentenceTransformer('all-MiniLM-L6-v2')
total_examples = 500
train_batch_size = 250
train_mini_batch_size = 32
train_examples = [
InputExample(texts=[f"This is sentence number {i}", f"This is sentence number {i+1}"]) for i in range(total_examples)
]
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=train_batch_size)
train_loss = losses.MegaBatchMarginLoss(model=model, mini_batch_size=train_mini_batch_size)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(MegaBatchMarginLoss, self).__init__()
self.model = model
self.positive_margin = positive_margin
self.negative_margin = negative_margin
self.mini_batch_size = mini_batch_size
self.forward = self.forward_mini_batched if use_mini_batched_version else self.forward_non_mini_batched
def forward_mini_batched(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
anchor, positive = sentence_features
feature_names = list(anchor.keys())
with torch.no_grad():
self.model.eval()
all_positive_emb = self.model(positive)["sentence_embedding"].detach()
self.model.train()
diagonal_matrix = torch.eye(len(all_positive_emb), len(all_positive_emb), device=all_positive_emb.device)
# Iterate over the triplets (anchor, positive, hardest_negative) in smaller mini_batch sizes
for start_idx in range(0, len(all_positive_emb), self.mini_batch_size):
end_idx = start_idx + self.mini_batch_size
anchor_emb = self.model({key: anchor[key][start_idx:end_idx] for key in feature_names})[
"sentence_embedding"
]
# Find hard negatives. For each anchor, find the hardest negative
# Store them in the triplets (anchor, positive, hardest_negative)
hard_negative_features = {key: [] for key in feature_names}
with torch.no_grad():
cos_scores = util.pytorch_cos_sim(anchor_emb, all_positive_emb)
negative_scores = (
cos_scores - 2 * diagonal_matrix[start_idx:end_idx]
) # Remove positive scores along the diagonal, set them to -1 so that they are not selected by the max() operation
negatives_max, negatives_ids = torch.max(negative_scores, dim=1)
for hard_negative_id in negatives_ids:
for key in feature_names:
hard_negative_features[key].append(positive[key][hard_negative_id])
for key in feature_names:
hard_negative_features[key] = torch.stack(hard_negative_features[key])
# Compute differentiable negative and positive embeddings
positive_emb = self.model({key: positive[key][start_idx:end_idx] for key in feature_names})[
"sentence_embedding"
]
negative_emb = self.model(hard_negative_features)["sentence_embedding"]
assert anchor_emb.shape == positive_emb.shape
assert anchor_emb.shape == negative_emb.shape
# Compute loss
pos_cosine = F.cosine_similarity(anchor_emb, positive_emb)
neg_cosine = F.cosine_similarity(anchor_emb, negative_emb)
losses = F.relu(self.positive_margin - pos_cosine) + F.relu(neg_cosine - self.negative_margin)
losses = losses.mean()
# Backpropagate unless it is the last mini batch. The last mini-batch will be back propagated by the outside train loop
if end_idx < len(cos_scores):
losses.backward()
return losses
##### Non mini-batched version ###
def forward_non_mini_batched(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
embeddings_a, embeddings_b = reps
cos_scores = util.pytorch_cos_sim(embeddings_a, embeddings_b)
positive_scores = torch.diagonal(cos_scores)
negative_scores = cos_scores - (
2 * torch.eye(*cos_scores.shape, device=cos_scores.device)
) # Remove positive scores along the diagonal
negatives_max, _ = torch.max(negative_scores, dim=1)
losses = F.relu(self.positive_margin - positive_scores) + F.relu(negatives_max - self.negative_margin)
return losses.mean()
import torch
from torch import nn, Tensor
from typing import Iterable, Dict
from ..SentenceTransformer import SentenceTransformer
from .. import util
class MultipleNegativesRankingLoss(nn.Module):
def __init__(self, model: SentenceTransformer, scale: float = 20.0, similarity_fct=util.cos_sim):
"""
This loss expects as input a batch consisting of sentence pairs ``(a_1, p_1), (a_2, p_2)..., (a_n, p_n)``
where we assume that ``(a_i, p_i)`` are a positive pair and ``(a_i, p_j)`` for ``i != j`` a negative pair.
For each ``a_i``, it uses all other ``p_j`` as negative samples, i.e., for ``a_i``, we have 1 positive example
(``p_i``) and ``n-1`` negative examples (``p_j``). It then minimizes the negative log-likehood for softmax
normalized scores.
This loss function works great to train embeddings for retrieval setups where you have positive pairs
(e.g. (query, relevant_doc)) as it will sample in each batch ``n-1`` negative docs randomly.
The performance usually increases with increasing batch sizes.
You can also provide one or multiple hard negatives per anchor-positive pair by structering the data like this:
``(a_1, p_1, n_1), (a_2, p_2, n_2)``. Then, ``n_1`` is a hard negative for ``(a_1, p_1)``. The loss will use for
the pair ``(a_i, p_i)`` all ``p_j`` for ``j != i`` and all ``n_j`` as negatives.
:param model: SentenceTransformer model
:param scale: Output of similarity function is multiplied by scale value
:param similarity_fct: similarity function between sentence embeddings. By default, cos_sim. Can also be set to dot product (and then set scale to 1)
References:
- Efficient Natural Language Response Suggestion for Smart Reply, Section 4.4: https://arxiv.org/pdf/1705.00652.pdf
- `Training Examples > Natural Language Inference <../../examples/training/nli/README.html>`_
- `Training Examples > Paraphrase Data <../../examples/training/paraphrases/README.html>`_
- `Training Examples > Quora Duplicate Questions <../../examples/training/quora_duplicate_questions/README.html>`_
- `Training Examples > MS MARCO <../../examples/training/ms_marco/README.html>`_
- `Unsupervised Learning > SimCSE <../../examples/unsupervised_learning/SimCSE/README.html>`_
- `Unsupervised Learning > GenQ <../../examples/unsupervised_learning/query_generation/README.html>`_
Requirements:
1. (anchor, positive) pairs or (anchor, positive, negative) triplets
Relations:
- :class:`CachedMultipleNegativesRankingLoss` is equivalent to this loss, but it uses caching that allows for
much higher batch sizes (and thus better performance) without extra memory usage. However, it requires more
training time.
- :class:`MultipleNegativesSymmetricRankingLoss` is equivalent to this loss, but with an additional loss term.
- :class:`GISTEmbedLoss` is equivalent to this loss, but uses a guide model to guide the in-batch negative
sample selection. `GISTEmbedLoss` yields a stronger training signal at the cost of some training overhead.
Inputs:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (anchor, positive) pairs | none |
+---------------------------------------+--------+
| (anchor, positive, negative) triplets | none |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, losses, InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('distilbert-base-uncased')
train_examples = [
InputExample(texts=['Anchor 1', 'Positive 1']),
InputExample(texts=['Anchor 2', 'Positive 2']),
]
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=32)
train_loss = losses.MultipleNegativesRankingLoss(model=model)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(MultipleNegativesRankingLoss, self).__init__()
self.model = model
self.scale = scale
self.similarity_fct = similarity_fct
self.cross_entropy_loss = nn.CrossEntropyLoss()
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
embeddings_a = reps[0]
embeddings_b = torch.cat(reps[1:])
scores = self.similarity_fct(embeddings_a, embeddings_b) * self.scale
labels = torch.tensor(
range(len(scores)), dtype=torch.long, device=scores.device
) # Example a[i] should match with b[i]
return self.cross_entropy_loss(scores, labels)
def get_config_dict(self):
return {"scale": self.scale, "similarity_fct": self.similarity_fct.__name__}
import torch
from torch import nn, Tensor
from typing import Iterable, Dict
from ..SentenceTransformer import SentenceTransformer
from .. import util
class MultipleNegativesSymmetricRankingLoss(nn.Module):
def __init__(self, model: SentenceTransformer, scale: float = 20.0, similarity_fct=util.cos_sim):
"""
This loss is an adaptation of MultipleNegativesRankingLoss. MultipleNegativesRankingLoss computes the following loss:
For a given anchor and a list of candidates, find the positive candidate.
In MultipleNegativesSymmetricRankingLoss, we add another loss term: Given the positive and a list of all anchors,
find the correct (matching) anchor.
For the example of question-answering: You have (question, answer)-pairs. MultipleNegativesRankingLoss just computes
the loss to find the answer for a given question. MultipleNegativesSymmetricRankingLoss additionally computes the
loss to find the question for a given answer.
Note: If you pass triplets, the negative entry will be ignored. A anchor is just searched for the positive.
:param model: SentenceTransformer model
:param scale: Output of similarity function is multiplied by scale value
:param similarity_fct: similarity function between sentence embeddings. By default, cos_sim. Can also be set to dot product (and then set scale to 1)
Requirements:
1. (anchor, positive) pairs
Relations:
- Like :class:`MultipleNegativesRankingLoss`, but with an additional loss term.
Inputs:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (anchor, positive) pairs | none |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, losses, InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('distilbert-base-uncased')
train_examples = [
InputExample(texts=['Anchor 1', 'Positive 1']),
InputExample(texts=['Anchor 2', 'Positive 2']),
]
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=32)
train_loss = losses.MultipleNegativesSymmetricRankingLoss(model=model)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(MultipleNegativesSymmetricRankingLoss, self).__init__()
self.model = model
self.scale = scale
self.similarity_fct = similarity_fct
self.cross_entropy_loss = nn.CrossEntropyLoss()
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
anchor = reps[0]
candidates = torch.cat(reps[1:])
scores = self.similarity_fct(anchor, candidates) * self.scale
labels = torch.tensor(
range(len(scores)), dtype=torch.long, device=scores.device
) # Example a[i] should match with b[i]
anchor_positive_scores = scores[:, 0 : len(reps[1])]
forward_loss = self.cross_entropy_loss(scores, labels)
backward_loss = self.cross_entropy_loss(anchor_positive_scores.transpose(0, 1), labels)
return (forward_loss + backward_loss) / 2
def get_config_dict(self):
return {"scale": self.scale, "similarity_fct": self.similarity_fct.__name__}
from typing import Iterable, Dict
import torch.nn.functional as F
from torch import nn, Tensor
from .ContrastiveLoss import SiameseDistanceMetric
from sentence_transformers.SentenceTransformer import SentenceTransformer
class OnlineContrastiveLoss(nn.Module):
def __init__(
self, model: SentenceTransformer, distance_metric=SiameseDistanceMetric.COSINE_DISTANCE, margin: float = 0.5
):
"""
This Online Contrastive loss is similar to :class:`ConstrativeLoss`, but it selects hard positive (positives that
are far apart) and hard negative pairs (negatives that are close) and computes the loss only for these pairs.
This loss often yields better performances than ContrastiveLoss.
:param model: SentenceTransformer model
:param distance_metric: Function that returns a distance between two embeddings. The class SiameseDistanceMetric contains pre-defined metrics that can be used
:param margin: Negative samples (label == 0) should have a distance of at least the margin value.
References:
- `Training Examples > Quora Duplicate Questions <../../examples/training/quora_duplicate_questions/README.html>`_
Requirements:
1. (anchor, positive/negative) pairs
2. Data should include hard positives and hard negatives
Relations:
- :class:`ContrastiveLoss` is similar, but does not use hard positive and hard negative pairs.
:class:`OnlineContrastiveLoss` often yields better results.
Inputs:
+-----------------------------------------------+------------------------------+
| Texts | Labels |
+===============================================+==============================+
| (anchor, positive/negative) pairs | 1 if positive, 0 if negative |
+-----------------------------------------------+------------------------------+
Example:
::
from sentence_transformers import SentenceTransformer, losses, InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('all-MiniLM-L6-v2')
train_examples = [
InputExample(texts=['This is a positive pair', 'Where the distance will be minimized'], label=1),
InputExample(texts=['This is a negative pair', 'Their distance will be increased'], label=0),
]
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=2)
train_loss = losses.OnlineContrastiveLoss(model=model)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(OnlineContrastiveLoss, self).__init__()
self.model = model
self.margin = margin
self.distance_metric = distance_metric
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor, size_average=False):
embeddings = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
distance_matrix = self.distance_metric(embeddings[0], embeddings[1])
negs = distance_matrix[labels == 0]
poss = distance_matrix[labels == 1]
# select hard positive and hard negative pairs
negative_pairs = negs[negs < (poss.max() if len(poss) > 1 else negs.mean())]
positive_pairs = poss[poss > (negs.min() if len(negs) > 1 else poss.mean())]
positive_loss = positive_pairs.pow(2).sum()
negative_loss = F.relu(self.margin - negative_pairs).pow(2).sum()
loss = positive_loss + negative_loss
return loss
import torch
from torch import nn, Tensor
from typing import Iterable, Dict, Callable
from ..SentenceTransformer import SentenceTransformer
import logging
logger = logging.getLogger(__name__)
class SoftmaxLoss(nn.Module):
def __init__(
self,
model: SentenceTransformer,
sentence_embedding_dimension: int,
num_labels: int,
concatenation_sent_rep: bool = True,
concatenation_sent_difference: bool = True,
concatenation_sent_multiplication: bool = False,
loss_fct: Callable = nn.CrossEntropyLoss(),
):
"""
This loss was used in our SBERT publication (https://arxiv.org/abs/1908.10084) to train the SentenceTransformer
model on NLI data. It adds a softmax classifier on top of the output of two transformer networks.
:class:`MultipleNegativesRankingLoss` is an alternative loss function that often yields better results,
as per https://arxiv.org/abs/2004.09813.
:param model: SentenceTransformer model
:param sentence_embedding_dimension: Dimension of your sentence embeddings
:param num_labels: Number of different labels
:param concatenation_sent_rep: Concatenate vectors u,v for the softmax classifier?
:param concatenation_sent_difference: Add abs(u-v) for the softmax classifier?
:param concatenation_sent_multiplication: Add u*v for the softmax classifier?
:param loss_fct: Optional: Custom pytorch loss function. If not set, uses nn.CrossEntropyLoss()
References:
- Sentence-BERT: Sentence Embeddings using Siamese BERT-Networks: https://arxiv.org/abs/1908.10084
- `Training Examples > Natural Language Inference <../../examples/training/nli/README.html>`_
Requirements:
1. sentence pairs with a class label
Inputs:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (sentence_A, sentence_B) pairs | class |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, SentencesDataset, losses
from sentence_transformers.readers import InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('distilbert-base-nli-mean-tokens')
train_examples = [
InputExample(texts=['First pair, sent A', 'First pair, sent B'], label=0),
InputExample(texts=['Second pair, sent A', 'Second pair, sent B'], label=1),
InputExample(texts=['Third pair, sent A', 'Third pair, sent B'], label=0),
InputExample(texts=['Fourth pair, sent A', 'Fourth pair, sent B'], label=2),
]
train_batch_size = 2
train_dataset = SentencesDataset(train_examples, model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)
train_loss = losses.SoftmaxLoss(
model=model,
sentence_embedding_dimension=model.get_sentence_embedding_dimension(),
num_labels=len(set(x.label for x in train_examples))
)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(SoftmaxLoss, self).__init__()
self.model = model
self.num_labels = num_labels
self.concatenation_sent_rep = concatenation_sent_rep
self.concatenation_sent_difference = concatenation_sent_difference
self.concatenation_sent_multiplication = concatenation_sent_multiplication
num_vectors_concatenated = 0
if concatenation_sent_rep:
num_vectors_concatenated += 2
if concatenation_sent_difference:
num_vectors_concatenated += 1
if concatenation_sent_multiplication:
num_vectors_concatenated += 1
logger.info("Softmax loss: #Vectors concatenated: {}".format(num_vectors_concatenated))
self.classifier = nn.Linear(
num_vectors_concatenated * sentence_embedding_dimension, num_labels, device=model.device
)
self.loss_fct = loss_fct
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
rep_a, rep_b = reps
vectors_concat = []
if self.concatenation_sent_rep:
vectors_concat.append(rep_a)
vectors_concat.append(rep_b)
if self.concatenation_sent_difference:
vectors_concat.append(torch.abs(rep_a - rep_b))
if self.concatenation_sent_multiplication:
vectors_concat.append(rep_a * rep_b)
features = torch.cat(vectors_concat, 1)
output = self.classifier(features)
if labels is not None:
loss = self.loss_fct(output, labels.view(-1))
return loss
else:
return reps, output
from torch import nn, Tensor
from typing import Iterable, Dict
import torch.nn.functional as F
from enum import Enum
from ..SentenceTransformer import SentenceTransformer
class TripletDistanceMetric(Enum):
"""
The metric for the triplet loss
"""
COSINE = lambda x, y: 1 - F.cosine_similarity(x, y)
EUCLIDEAN = lambda x, y: F.pairwise_distance(x, y, p=2)
MANHATTAN = lambda x, y: F.pairwise_distance(x, y, p=1)
class TripletLoss(nn.Module):
def __init__(
self, model: SentenceTransformer, distance_metric=TripletDistanceMetric.EUCLIDEAN, triplet_margin: float = 5
):
"""
This class implements triplet loss. Given a triplet of (anchor, positive, negative),
the loss minimizes the distance between anchor and positive while it maximizes the distance
between anchor and negative. It compute the following loss function:
``loss = max(||anchor - positive|| - ||anchor - negative|| + margin, 0)``.
Margin is an important hyperparameter and needs to be tuned respectively.
:param model: SentenceTransformerModel
:param distance_metric: Function to compute distance between two embeddings. The class TripletDistanceMetric
contains common distance metrices that can be used.
:param triplet_margin: The negative should be at least this much further away from the anchor than the positive.
References:
- For further details, see: https://en.wikipedia.org/wiki/Triplet_loss
Requirements:
1. (anchor, positive, negative) triplets
Inputs:
+---------------------------------------+--------+
| Texts | Labels |
+=======================================+========+
| (anchor, positive, negative) triplets | none |
+---------------------------------------+--------+
Example:
::
from sentence_transformers import SentenceTransformer, SentencesDataset, losses
from sentence_transformers.readers import InputExample
from torch.utils.data import DataLoader
model = SentenceTransformer('distilbert-base-nli-mean-tokens')
train_examples = [
InputExample(texts=['Anchor 1', 'Positive 1', 'Negative 1']),
InputExample(texts=['Anchor 2', 'Positive 2', 'Negative 2']),
]
train_batch_size = 1
train_dataset = SentencesDataset(train_examples, model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)
train_loss = losses.TripletLoss(model=model)
model.fit(
[(train_dataloader, train_loss)],
epochs=10,
)
"""
super(TripletLoss, self).__init__()
self.model = model
self.distance_metric = distance_metric
self.triplet_margin = triplet_margin
def get_config_dict(self):
distance_metric_name = self.distance_metric.__name__
for name, value in vars(TripletDistanceMetric).items():
if value == self.distance_metric:
distance_metric_name = "TripletDistanceMetric.{}".format(name)
break
return {"distance_metric": distance_metric_name, "triplet_margin": self.triplet_margin}
def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
rep_anchor, rep_pos, rep_neg = reps
distance_pos = self.distance_metric(rep_anchor, rep_pos)
distance_neg = self.distance_metric(rep_anchor, rep_neg)
losses = F.relu(distance_pos - distance_neg + self.triplet_margin)
return losses.mean()
from .AdaptiveLayerLoss import AdaptiveLayerLoss
from .CosineSimilarityLoss import CosineSimilarityLoss
from .SoftmaxLoss import SoftmaxLoss
from .MultipleNegativesRankingLoss import MultipleNegativesRankingLoss
from .MultipleNegativesSymmetricRankingLoss import MultipleNegativesSymmetricRankingLoss
from .TripletLoss import TripletDistanceMetric, TripletLoss
from .MarginMSELoss import MarginMSELoss
from .MatryoshkaLoss import MatryoshkaLoss
from .Matryoshka2dLoss import Matryoshka2dLoss
from .MSELoss import MSELoss
from .CachedMultipleNegativesRankingLoss import CachedMultipleNegativesRankingLoss
from .ContrastiveLoss import SiameseDistanceMetric, ContrastiveLoss
from .ContrastiveTensionLoss import (
ContrastiveTensionLoss,
ContrastiveTensionLossInBatchNegatives,
ContrastiveTensionDataLoader,
)
from .CoSENTLoss import CoSENTLoss
from .AnglELoss import AnglELoss
from .OnlineContrastiveLoss import OnlineContrastiveLoss
from .MegaBatchMarginLoss import MegaBatchMarginLoss
from .DenoisingAutoEncoderLoss import DenoisingAutoEncoderLoss
from .GISTEmbedLoss import GISTEmbedLoss
# Triplet losses
from .BatchHardTripletLoss import BatchHardTripletLoss, BatchHardTripletLossDistanceFunction
from .BatchHardSoftMarginTripletLoss import BatchHardSoftMarginTripletLoss
from .BatchSemiHardTripletLoss import BatchSemiHardTripletLoss
from .BatchAllTripletLoss import BatchAllTripletLoss
__all__ = [
"AdaptiveLayerLoss",
"CosineSimilarityLoss",
"SoftmaxLoss",
"MultipleNegativesRankingLoss",
"MultipleNegativesSymmetricRankingLoss",
"TripletLoss",
"TripletDistanceMetric",
"MarginMSELoss",
"MatryoshkaLoss",
"Matryoshka2dLoss",
"MSELoss",
"ContrastiveLoss",
"SiameseDistanceMetric",
"CachedMultipleNegativesRankingLoss",
"ContrastiveTensionLoss",
"ContrastiveTensionLossInBatchNegatives",
"ContrastiveTensionDataLoader",
"CoSENTLoss",
"AnglELoss",
"OnlineContrastiveLoss",
"MegaBatchMarginLoss",
"DenoisingAutoEncoderLoss",
"GISTEmbedLoss",
"BatchHardTripletLoss",
"BatchHardTripletLossDistanceFunction",
"BatchHardSoftMarginTripletLoss",
"BatchSemiHardTripletLoss",
"BatchAllTripletLoss",
]
import logging
from .util import fullname
class ModelCardTemplate:
__TAGS__ = ["sentence-transformers", "feature-extraction", "sentence-similarity"]
__DEFAULT_VARS__ = {
"{PIPELINE_TAG}": "sentence-similarity",
"{MODEL_DESCRIPTION}": "<!--- Describe your model here -->",
"{TRAINING_SECTION}": "",
"{USAGE_TRANSFORMERS_SECTION}": "",
"{EVALUATION}": "<!--- Describe how your model was evaluated -->",
"{CITING}": "<!--- Describe where people can find more information -->",
}
__MODEL_CARD__ = """
---
library_name: sentence-transformers
pipeline_tag: {PIPELINE_TAG}
tags:
{TAGS}
{DATASETS}
---
# {MODEL_NAME}
This is a [sentence-transformers](https://www.SBERT.net) model: It maps sentences & paragraphs to a {NUM_DIMENSIONS} dimensional dense vector space and can be used for tasks like clustering or semantic search.
{MODEL_DESCRIPTION}
## Usage (Sentence-Transformers)
Using this model becomes easy when you have [sentence-transformers](https://www.SBERT.net) installed:
```
pip install -U sentence-transformers
```
Then you can use the model like this:
```python
from sentence_transformers import SentenceTransformer
sentences = ["This is an example sentence", "Each sentence is converted"]
model = SentenceTransformer('{MODEL_NAME}')
embeddings = model.encode(sentences)
print(embeddings)
```
{USAGE_TRANSFORMERS_SECTION}
## Evaluation Results
{EVALUATION}
For an automated evaluation of this model, see the *Sentence Embeddings Benchmark*: [https://seb.sbert.net](https://seb.sbert.net?model_name={MODEL_NAME})
{TRAINING_SECTION}
## Full Model Architecture
```
{FULL_MODEL_STR}
```
## Citing & Authors
{CITING}
"""
__TRAINING_SECTION__ = """
## Training
The model was trained with the parameters:
{LOSS_FUNCTIONS}
Parameters of the fit()-Method:
```
{FIT_PARAMETERS}
```
"""
__USAGE_TRANSFORMERS__ = """\n
## Usage (HuggingFace Transformers)
Without [sentence-transformers](https://www.SBERT.net), you can use the model like this: First, you pass your input through the transformer model, then you have to apply the right pooling-operation on-top of the contextualized word embeddings.
```python
from transformers import AutoTokenizer, AutoModel
import torch
{POOLING_FUNCTION}
# Sentences we want sentence embeddings for
sentences = ['This is an example sentence', 'Each sentence is converted']
# Load model from HuggingFace Hub
tokenizer = AutoTokenizer.from_pretrained('{MODEL_NAME}')
model = AutoModel.from_pretrained('{MODEL_NAME}')
# Tokenize sentences
encoded_input = tokenizer(sentences, padding=True, truncation=True, return_tensors='pt')
# Compute token embeddings
with torch.no_grad():
model_output = model(**encoded_input)
# Perform pooling. In this case, {POOLING_MODE} pooling.
sentence_embeddings = {POOLING_FUNCTION_NAME}(model_output, encoded_input['attention_mask'])
print("Sentence embeddings:")
print(sentence_embeddings)
```
"""
@staticmethod
def model_card_get_pooling_function(pooling_mode):
if pooling_mode == "max":
return (
"max_pooling",
"""
# Max Pooling - Take the max value over time for every dimension.
def max_pooling(model_output, attention_mask):
token_embeddings = model_output[0] #First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
token_embeddings[input_mask_expanded == 0] = -1e9 # Set padding tokens to large negative value
return torch.max(token_embeddings, 1)[0]
""",
)
elif pooling_mode == "mean":
return (
"mean_pooling",
"""
#Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] #First element of model_output contains all token embeddings
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
""",
)
elif pooling_mode == "cls":
return (
"cls_pooling",
"""
def cls_pooling(model_output, attention_mask):
return model_output[0][:,0]
""",
)
@staticmethod
def get_train_objective_info(dataloader, loss):
try:
if hasattr(dataloader, "get_config_dict"):
loader_params = dataloader.get_config_dict()
else:
loader_params = {}
loader_params["batch_size"] = dataloader.batch_size if hasattr(dataloader, "batch_size") else "unknown"
if hasattr(dataloader, "sampler"):
loader_params["sampler"] = fullname(dataloader.sampler)
if hasattr(dataloader, "batch_sampler"):
loader_params["batch_sampler"] = fullname(dataloader.batch_sampler)
dataloader_str = """**DataLoader**:\n\n`{}` of length {} with parameters:
```
{}
```""".format(fullname(dataloader), len(dataloader), loader_params)
loss_str = "**Loss**:\n\n`{}` {}".format(
fullname(loss),
"""with parameters:
```
{}
```""".format(loss.get_config_dict())
if hasattr(loss, "get_config_dict")
else "",
)
return [dataloader_str, loss_str]
except Exception as e:
logging.WARN("Exception when creating get_train_objective_info: {}".format(str(e)))
return ""
from torch import Tensor
from torch import nn
import os
import json
from ..util import import_from_string
from collections import OrderedDict
from typing import List, Dict, Union, Tuple
class Asym(nn.Sequential):
def __init__(self, sub_modules: Dict[str, List[nn.Module]], allow_empty_key: bool = True):
"""
This model allows to create asymmetric SentenceTransformer models, that apply different models depending on the specified input key.
In the below example, we create two different Dense models for 'query' and 'doc'. Text that is passed as {'query': 'My query'} will
be passed along along the first Dense model, and text that will be passed as {'doc': 'My document'} will use the other Dense model.
Note, that when you call encode(), that only inputs of the same type can be encoded. Mixed-Types cannot be encoded.
Example::
word_embedding_model = models.Transformer(model_name)
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension())
asym_model = models.Asym({'query': [models.Dense(word_embedding_model.get_word_embedding_dimension(), 128)], 'doc': [models.Dense(word_embedding_model.get_word_embedding_dimension(), 128)]})
model = SentenceTransformer(modules=[word_embedding_model, pooling_model, asym_model])
model.encode([{'query': 'Q1'}, {'query': 'Q2'}]
model.encode([{'doc': 'Doc1'}, {'doc': 'Doc2'}]
#You can train it with InputExample like this. Note, that the order must always be the same:
train_example = InputExample(texts=[{'query': 'Train query'}, {'doc': 'Document'}], label=1)
:param sub_modules: Dict in the format str -> List[models]. The models in the specified list will be applied for input marked with the respective key.
:param allow_empty_key: If true, inputs without a key can be processed. If false, an exception will be thrown if no key is specified.
"""
self.sub_modules = sub_modules
self.allow_empty_key = allow_empty_key
ordered_dict = OrderedDict()
for name, models in sub_modules.items():
if not isinstance(models, List):
models = [models]
for idx, model in enumerate(models):
ordered_dict[name + "-" + str(idx)] = model
super(Asym, self).__init__(ordered_dict)
def forward(self, features: Dict[str, Tensor]):
if "text_keys" in features and len(features["text_keys"]) > 0:
text_key = features["text_keys"][0]
for model in self.sub_modules[text_key]:
features = model(features)
elif not self.allow_empty_key:
raise ValueError("Input did not specify any keys and allow_empty_key is False")
return features
def get_sentence_embedding_dimension(self) -> int:
for name in self.sub_modules:
if hasattr(self.sub_modules[name][0], "get_sentence_embedding_dimension"):
return self.sub_modules[name][0].get_sentence_embedding_dimension()
return None
def save(self, output_path):
model_lookup = {}
model_types = {}
model_structure = {}
for name, models in self.sub_modules.items():
model_structure[name] = []
for model in models:
model_id = str(id(model)) + "_" + type(model).__name__
model_lookup[model_id] = model
model_types[model_id] = type(model).__module__
model_structure[name].append(model_id)
for model_id, model in model_lookup.items():
model_path = os.path.join(output_path, str(model_id))
os.makedirs(model_path, exist_ok=True)
model.save(model_path)
with open(os.path.join(output_path, "config.json"), "w", encoding="utf8") as fOut:
json.dump(
{
"types": model_types,
"structure": model_structure,
"parameters": {"allow_empty_key": self.allow_empty_key},
},
fOut,
indent=2,
)
def tokenize(self, texts: Union[List[str], List[Tuple[str, str]]], **kwargs):
"""
Tokenizes a text and maps tokens to token-ids
"""
if not isinstance(texts[0], dict):
raise AttributeError("Asym. model requires that texts are passed as dicts: {'key': 'text'}")
module_key = None
for lookup in texts:
text_key, text = next(iter(lookup.items()))
if module_key is None:
module_key = text_key
assert text_key == module_key # Mixed batches are not allowed
return self.sub_modules[module_key][0].tokenize(texts, **kwargs)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
modules = {}
for model_id, model_type in config["types"].items():
module_class = import_from_string(model_type)
module = module_class.load(os.path.join(input_path, model_id))
modules[model_id] = module
model_structure = {}
for key_name, models_list in config["structure"].items():
model_structure[key_name] = []
for model_id in models_list:
model_structure[key_name].append(modules[model_id])
model = Asym(model_structure, **config["parameters"])
return model
import torch
from torch import Tensor
from torch import nn
from typing import List, Dict
import os
import json
import logging
import numpy as np
from .tokenizer import WhitespaceTokenizer
logger = logging.getLogger(__name__)
class BoW(nn.Module):
"""Implements a Bag-of-Words (BoW) model to derive sentence embeddings.
A weighting can be added to allow the generation of tf-idf vectors. The output vector has the size of the vocab.
"""
def __init__(
self,
vocab: List[str],
word_weights: Dict[str, float] = {},
unknown_word_weight: float = 1,
cumulative_term_frequency: bool = True,
):
super(BoW, self).__init__()
vocab = list(set(vocab)) # Ensure vocab is unique
self.config_keys = ["vocab", "word_weights", "unknown_word_weight", "cumulative_term_frequency"]
self.vocab = vocab
self.word_weights = word_weights
self.unknown_word_weight = unknown_word_weight
self.cumulative_term_frequency = cumulative_term_frequency
# Maps wordIdx -> word weight
self.weights = []
num_unknown_words = 0
for word in vocab:
weight = unknown_word_weight
if word in word_weights:
weight = word_weights[word]
elif word.lower() in word_weights:
weight = word_weights[word.lower()]
else:
num_unknown_words += 1
self.weights.append(weight)
logger.info(
"{} out of {} words without a weighting value. Set weight to {}".format(
num_unknown_words, len(vocab), unknown_word_weight
)
)
self.tokenizer = WhitespaceTokenizer(vocab, stop_words=set(), do_lower_case=False)
self.sentence_embedding_dimension = len(vocab)
def forward(self, features: Dict[str, Tensor]):
# Nothing to do, everything is done in get_sentence_features
return features
def tokenize(self, texts: List[str], **kwargs) -> List[int]:
tokenized = [self.tokenizer.tokenize(text, **kwargs) for text in texts]
return self.get_sentence_features(tokenized)
def get_sentence_embedding_dimension(self):
return self.sentence_embedding_dimension
def get_sentence_features(self, tokenized_texts: List[List[int]], pad_seq_length: int = 0):
vectors = []
for tokens in tokenized_texts:
vector = np.zeros(self.get_sentence_embedding_dimension(), dtype=np.float32)
for token in tokens:
if self.cumulative_term_frequency:
vector[token] += self.weights[token]
else:
vector[token] = self.weights[token]
vectors.append(vector)
return {"sentence_embedding": torch.tensor(vectors, dtype=torch.float)}
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
return BoW(**config)
from typing import Union
from torch import nn
import transformers
import torch
from PIL import Image
class CLIPModel(nn.Module):
def __init__(self, model_name: str = "openai/clip-vit-base-patch32", processor_name=None):
super(CLIPModel, self).__init__()
if processor_name is None:
processor_name = model_name
self.model = transformers.CLIPModel.from_pretrained(model_name)
self.processor = transformers.CLIPProcessor.from_pretrained(processor_name)
def __repr__(self):
return "CLIPModel()"
def forward(self, features):
image_embeds = []
text_embeds = []
if "pixel_values" in features:
vision_outputs = self.model.vision_model(pixel_values=features["pixel_values"])
image_embeds = self.model.visual_projection(vision_outputs[1])
if "input_ids" in features:
text_outputs = self.model.text_model(
input_ids=features.get("input_ids"),
attention_mask=features.get("attention_mask", None),
position_ids=features.get("position_ids", None),
output_attentions=features.get("output_attentions", None),
output_hidden_states=features.get("output_hidden_states", None),
)
text_embeds = self.model.text_projection(text_outputs[1])
sentence_embedding = []
image_features = iter(image_embeds)
text_features = iter(text_embeds)
for idx, input_type in enumerate(features["image_text_info"]):
if input_type == 0:
sentence_embedding.append(next(image_features))
else:
sentence_embedding.append(next(text_features))
features["sentence_embedding"] = torch.stack(sentence_embedding).float()
return features
def tokenize(self, texts, padding: Union[str, bool] = True):
images = []
texts_values = []
image_text_info = []
for idx, data in enumerate(texts):
if isinstance(data, Image.Image): # An Image
images.append(data)
image_text_info.append(0)
else: # A text
texts_values.append(data)
image_text_info.append(1)
if len(texts_values) == 0:
texts_values = None
if len(images) == 0:
images = None
inputs = self.processor(text=texts_values, images=images, return_tensors="pt", padding=padding)
inputs["image_text_info"] = image_text_info
return inputs
def save(self, output_path: str):
self.model.save_pretrained(output_path)
self.processor.save_pretrained(output_path)
@staticmethod
def load(input_path: str):
return CLIPModel(model_name=input_path)
import torch
from torch import nn
from typing import List
import os
import json
class CNN(nn.Module):
"""CNN-layer with multiple kernel-sizes over the word embeddings"""
def __init__(
self,
in_word_embedding_dimension: int,
out_channels: int = 256,
kernel_sizes: List[int] = [1, 3, 5],
stride_sizes: List[int] = None,
):
nn.Module.__init__(self)
self.config_keys = ["in_word_embedding_dimension", "out_channels", "kernel_sizes"]
self.in_word_embedding_dimension = in_word_embedding_dimension
self.out_channels = out_channels
self.kernel_sizes = kernel_sizes
self.embeddings_dimension = out_channels * len(kernel_sizes)
self.convs = nn.ModuleList()
in_channels = in_word_embedding_dimension
if stride_sizes is None:
stride_sizes = [1] * len(kernel_sizes)
for kernel_size, stride in zip(kernel_sizes, stride_sizes):
padding_size = int((kernel_size - 1) / 2)
conv = nn.Conv1d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding_size,
)
self.convs.append(conv)
def forward(self, features):
token_embeddings = features["token_embeddings"]
token_embeddings = token_embeddings.transpose(1, -1)
vectors = [conv(token_embeddings) for conv in self.convs]
out = torch.cat(vectors, 1).transpose(1, -1)
features.update({"token_embeddings": out})
return features
def get_word_embedding_dimension(self) -> int:
return self.embeddings_dimension
def tokenize(self, text: str, **kwargs) -> List[int]:
raise NotImplementedError()
def save(self, output_path: str):
with open(os.path.join(output_path, "cnn_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
@staticmethod
def load(input_path: str):
with open(os.path.join(input_path, "cnn_config.json"), "r") as fIn:
config = json.load(fIn)
weights = torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
model = CNN(**config)
model.load_state_dict(weights)
return model
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
from ..util import fullname, import_from_string
class Dense(nn.Module):
"""Feed-forward function with activiation function.
This layer takes a fixed-sized sentence embedding and passes it through a feed-forward layer. Can be used to generate deep averaging networks (DAN).
:param in_features: Size of the input dimension
:param out_features: Output size
:param bias: Add a bias vector
:param activation_function: Pytorch activation function applied on output
:param init_weight: Initial value for the matrix of the linear layer
:param init_bias: Initial value for the bias of the linear layer
"""
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = True,
activation_function=nn.Tanh(),
init_weight: Tensor = None,
init_bias: Tensor = None,
):
super(Dense, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.bias = bias
self.activation_function = activation_function
self.linear = nn.Linear(in_features, out_features, bias=bias)
if init_weight is not None:
self.linear.weight = nn.Parameter(init_weight)
if init_bias is not None:
self.linear.bias = nn.Parameter(init_bias)
def forward(self, features: Dict[str, Tensor]):
features.update({"sentence_embedding": self.activation_function(self.linear(features["sentence_embedding"]))})
return features
def get_sentence_embedding_dimension(self) -> int:
return self.out_features
def get_config_dict(self):
return {
"in_features": self.in_features,
"out_features": self.out_features,
"bias": self.bias,
"activation_function": fullname(self.activation_function),
}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
def __repr__(self):
return "Dense({})".format(self.get_config_dict())
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
config["activation_function"] = import_from_string(config["activation_function"])()
model = Dense(**config)
model.load_state_dict(
torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
)
return model
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class Dropout(nn.Module):
"""Dropout layer.
:param dropout: Sets a dropout value for dense layer.
"""
def __init__(self, dropout: float = 0.2):
super(Dropout, self).__init__()
self.dropout = dropout
self.dropout_layer = nn.Dropout(self.dropout)
def forward(self, features: Dict[str, Tensor]):
features.update({"sentence_embedding": self.dropout_layer(features["sentence_embedding"])})
return features
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump({"dropout": self.dropout}, fOut)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
model = Dropout(**config)
return model
import torch
from torch import nn
from typing import List
import os
import json
class LSTM(nn.Module):
"""
Bidirectional LSTM running over word embeddings.
"""
def __init__(
self,
word_embedding_dimension: int,
hidden_dim: int,
num_layers: int = 1,
dropout: float = 0,
bidirectional: bool = True,
):
nn.Module.__init__(self)
self.config_keys = ["word_embedding_dimension", "hidden_dim", "num_layers", "dropout", "bidirectional"]
self.word_embedding_dimension = word_embedding_dimension
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.dropout = dropout
self.bidirectional = bidirectional
self.embeddings_dimension = hidden_dim
if self.bidirectional:
self.embeddings_dimension *= 2
self.encoder = nn.LSTM(
word_embedding_dimension,
hidden_dim,
num_layers=num_layers,
dropout=dropout,
bidirectional=bidirectional,
batch_first=True,
)
def forward(self, features):
token_embeddings = features["token_embeddings"]
sentence_lengths = torch.clamp(features["sentence_lengths"], min=1)
packed = nn.utils.rnn.pack_padded_sequence(
token_embeddings, sentence_lengths.cpu(), batch_first=True, enforce_sorted=False
)
packed = self.encoder(packed)
unpack = nn.utils.rnn.pad_packed_sequence(packed[0], batch_first=True)[0]
features.update({"token_embeddings": unpack})
return features
def get_word_embedding_dimension(self) -> int:
return self.embeddings_dimension
def tokenize(self, text: str, **kwargs) -> List[int]:
raise NotImplementedError()
def save(self, output_path: str):
with open(os.path.join(output_path, "lstm_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
@staticmethod
def load(input_path: str):
with open(os.path.join(input_path, "lstm_config.json"), "r") as fIn:
config = json.load(fIn)
weights = torch.load(os.path.join(input_path, "pytorch_model.bin"))
model = LSTM(**config)
model.load_state_dict(weights)
return model
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class LayerNorm(nn.Module):
def __init__(self, dimension: int):
super(LayerNorm, self).__init__()
self.dimension = dimension
self.norm = nn.LayerNorm(dimension)
def forward(self, features: Dict[str, Tensor]):
features["sentence_embedding"] = self.norm(features["sentence_embedding"])
return features
def get_sentence_embedding_dimension(self):
return self.dimension
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump({"dimension": self.dimension}, fOut, indent=2)
torch.save(self.state_dict(), os.path.join(output_path, "pytorch_model.bin"))
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
model = LayerNorm(**config)
model.load_state_dict(
torch.load(os.path.join(input_path, "pytorch_model.bin"), map_location=torch.device("cpu"))
)
return model
from torch import Tensor
from torch import nn
from typing import Dict
import torch.nn.functional as F
class Normalize(nn.Module):
"""
This layer normalizes embeddings to unit length
"""
def __init__(self):
super(Normalize, self).__init__()
def forward(self, features: Dict[str, Tensor]):
features.update({"sentence_embedding": F.normalize(features["sentence_embedding"], p=2, dim=1)})
return features
def save(self, output_path):
pass
@staticmethod
def load(input_path):
return Normalize()
import torch
from torch import Tensor
from torch import nn
from typing import Dict
import os
import json
class Pooling(nn.Module):
"""Performs pooling (max or mean) on the token embeddings.
Using pooling, it generates from a variable sized sentence a fixed sized sentence embedding. This layer also allows
to use the CLS token if it is returned by the underlying word embedding model. You can concatenate multiple poolings
together.
:param word_embedding_dimension: Dimensions for the word embeddings
:param pooling_mode: Either "cls", "lasttoken", "max", "mean", "mean_sqrt_len_tokens", or "weightedmean". If set, overwrites the other pooling_mode_* settings
:param pooling_mode_cls_token: Use the first token (CLS token) as text representations
:param pooling_mode_max_tokens: Use max in each dimension over all tokens.
:param pooling_mode_mean_tokens: Perform mean-pooling
:param pooling_mode_mean_sqrt_len_tokens: Perform mean-pooling, but divide by sqrt(input_length).
:param pooling_mode_weightedmean_tokens: Perform (position) weighted mean pooling. See `SGPT: GPT Sentence Embeddings for Semantic Search <https://arxiv.org/abs/2202.08904>`_.
:param pooling_mode_lasttoken: Perform last token pooling. See `SGPT: GPT Sentence Embeddings for Semantic Search <https://arxiv.org/abs/2202.08904>`_ and `Text and Code Embeddings by Contrastive Pre-Training <https://arxiv.org/abs/2201.10005>`_.
"""
POOLING_MODES = (
"cls",
"lasttoken",
"max",
"mean",
"mean_sqrt_len_tokens",
"weightedmean",
)
def __init__(
self,
word_embedding_dimension: int,
pooling_mode: str = None,
pooling_mode_cls_token: bool = False,
pooling_mode_max_tokens: bool = False,
pooling_mode_mean_tokens: bool = True,
pooling_mode_mean_sqrt_len_tokens: bool = False,
pooling_mode_weightedmean_tokens: bool = False,
pooling_mode_lasttoken: bool = False,
include_prompt=True,
) -> None:
super(Pooling, self).__init__()
self.config_keys = [
"word_embedding_dimension",
"pooling_mode_cls_token",
"pooling_mode_mean_tokens",
"pooling_mode_max_tokens",
"pooling_mode_mean_sqrt_len_tokens",
"pooling_mode_weightedmean_tokens",
"pooling_mode_lasttoken",
"include_prompt",
]
if pooling_mode is not None: # Set pooling mode by string
pooling_mode = pooling_mode.lower()
if pooling_mode not in self.POOLING_MODES:
raise ValueError(
f"Set invalid pooling mode: {pooling_mode}. Valid pooling modes are: {self.POOLING_MODES}."
)
pooling_mode_cls_token = pooling_mode == "cls"
pooling_mode_max_tokens = pooling_mode == "max"
pooling_mode_mean_tokens = pooling_mode == "mean"
pooling_mode_mean_sqrt_len_tokens = pooling_mode == "mean_sqrt_len_tokens"
pooling_mode_weightedmean_tokens = pooling_mode == "weightedmean"
pooling_mode_lasttoken = pooling_mode == "lasttoken"
self.word_embedding_dimension = word_embedding_dimension
self.pooling_mode_cls_token = pooling_mode_cls_token
self.pooling_mode_mean_tokens = pooling_mode_mean_tokens
self.pooling_mode_max_tokens = pooling_mode_max_tokens
self.pooling_mode_mean_sqrt_len_tokens = pooling_mode_mean_sqrt_len_tokens
self.pooling_mode_weightedmean_tokens = pooling_mode_weightedmean_tokens
self.pooling_mode_lasttoken = pooling_mode_lasttoken
self.include_prompt = include_prompt
pooling_mode_multiplier = sum(
[
pooling_mode_cls_token,
pooling_mode_max_tokens,
pooling_mode_mean_tokens,
pooling_mode_mean_sqrt_len_tokens,
pooling_mode_weightedmean_tokens,
pooling_mode_lasttoken,
]
)
self.pooling_output_dimension = pooling_mode_multiplier * word_embedding_dimension
def __repr__(self):
return "Pooling({})".format(self.get_config_dict())
def get_pooling_mode_str(self) -> str:
"""
Returns the pooling mode as string
"""
modes = []
if self.pooling_mode_cls_token:
modes.append("cls")
if self.pooling_mode_mean_tokens:
modes.append("mean")
if self.pooling_mode_max_tokens:
modes.append("max")
if self.pooling_mode_mean_sqrt_len_tokens:
modes.append("mean_sqrt_len_tokens")
if self.pooling_mode_weightedmean_tokens:
modes.append("weightedmean")
if self.pooling_mode_lasttoken:
modes.append("lasttoken")
return "+".join(modes)
def forward(self, features: Dict[str, Tensor]):
token_embeddings = features["token_embeddings"]
attention_mask = features["attention_mask"]
if not self.include_prompt and "prompt_length" in features:
attention_mask[:, : features["prompt_length"]] = 0
## Pooling strategy
output_vectors = []
if self.pooling_mode_cls_token:
cls_token = features.get("cls_token_embeddings", token_embeddings[:, 0]) # Take first token by default
output_vectors.append(cls_token)
if self.pooling_mode_max_tokens:
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
token_embeddings[input_mask_expanded == 0] = -1e9 # Set padding tokens to large negative value
max_over_time = torch.max(token_embeddings, 1)[0]
output_vectors.append(max_over_time)
if self.pooling_mode_mean_tokens or self.pooling_mode_mean_sqrt_len_tokens:
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
# If tokens are weighted (by WordWeights layer), feature 'token_weights_sum' will be present
if "token_weights_sum" in features:
sum_mask = features["token_weights_sum"].unsqueeze(-1).expand(sum_embeddings.size())
else:
sum_mask = input_mask_expanded.sum(1)
sum_mask = torch.clamp(sum_mask, min=1e-9)
if self.pooling_mode_mean_tokens:
output_vectors.append(sum_embeddings / sum_mask)
if self.pooling_mode_mean_sqrt_len_tokens:
output_vectors.append(sum_embeddings / torch.sqrt(sum_mask))
if self.pooling_mode_weightedmean_tokens:
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
# token_embeddings shape: bs, seq, hidden_dim
weights = (
torch.arange(start=1, end=token_embeddings.shape[1] + 1)
.unsqueeze(0)
.unsqueeze(-1)
.expand(token_embeddings.size())
.to(token_embeddings.dtype)
.to(token_embeddings.device)
)
assert weights.shape == token_embeddings.shape == input_mask_expanded.shape
input_mask_expanded = input_mask_expanded * weights
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
# If tokens are weighted (by WordWeights layer), feature 'token_weights_sum' will be present
if "token_weights_sum" in features:
sum_mask = features["token_weights_sum"].unsqueeze(-1).expand(sum_embeddings.size())
else:
sum_mask = input_mask_expanded.sum(1)
sum_mask = torch.clamp(sum_mask, min=1e-9)
output_vectors.append(sum_embeddings / sum_mask)
if self.pooling_mode_lasttoken:
bs, seq_len, hidden_dim = token_embeddings.shape
# attention_mask shape: (bs, seq_len)
# Get shape [bs] indices of the last token (i.e. the last token for each batch item)
# Use flip and max() to get the last index of 1 in the attention mask
if torch.jit.is_tracing():
# Avoid tracing the argmax with int64 input that can not be handled by ONNX Runtime: https://github.com/microsoft/onnxruntime/issues/10068
attention_mask = attention_mask.to(torch.int32)
values, indices = attention_mask.flip(1).max(1)
indices = torch.where(values == 0, seq_len - 1, indices)
gather_indices = seq_len - indices - 1
# Turn indices from shape [bs] --> [bs, 1, hidden_dim]
gather_indices = gather_indices.unsqueeze(-1).repeat(1, hidden_dim)
gather_indices = gather_indices.unsqueeze(1)
assert gather_indices.shape == (bs, 1, hidden_dim)
# Gather along the 1st dim (seq_len) (bs, seq_len, hidden_dim -> bs, hidden_dim)
# Actually no need for the attention mask as we gather the last token where attn_mask = 1
# but as we set some indices (which shouldn't be attended to) to 0 with clamp, we
# use the attention mask to ignore them again
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).to(token_embeddings.dtype)
)
embedding = torch.gather(token_embeddings * input_mask_expanded, 1, gather_indices).squeeze(dim=1)
output_vectors.append(embedding)
output_vector = torch.cat(output_vectors, 1)
features.update({"sentence_embedding": output_vector})
return features
def get_sentence_embedding_dimension(self):
return self.pooling_output_dimension
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path):
with open(os.path.join(output_path, "config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
return Pooling(**config)
from torch import nn
from transformers import AutoModel, AutoTokenizer, AutoConfig, T5Config, MT5Config
import json
from typing import List, Dict, Optional, Union, Tuple
import os
class Transformer(nn.Module):
"""Huggingface AutoModel to generate token embeddings.
Loads the correct class, e.g. BERT / RoBERTa etc.
:param model_name_or_path: Huggingface models name (https://huggingface.co/models)
:param max_seq_length: Truncate any inputs longer than max_seq_length
:param model_args: Arguments (key, value pairs) passed to the Huggingface Transformers model
:param cache_dir: Cache dir for Huggingface Transformers to store/load models
:param tokenizer_args: Arguments (key, value pairs) passed to the Huggingface Tokenizer model
:param do_lower_case: If true, lowercases the input (independent if the model is cased or not)
:param tokenizer_name_or_path: Name or path of the tokenizer. When None, then model_name_or_path is used
"""
def __init__(
self,
model_name_or_path: str,
max_seq_length: Optional[int] = None,
model_args: Dict = {},
cache_dir: Optional[str] = None,
tokenizer_args: Dict = {},
do_lower_case: bool = False,
tokenizer_name_or_path: str = None,
):
super(Transformer, self).__init__()
self.config_keys = ["max_seq_length", "do_lower_case"]
self.do_lower_case = do_lower_case
config = AutoConfig.from_pretrained(model_name_or_path, **model_args, cache_dir=cache_dir)
self._load_model(model_name_or_path, config, cache_dir, **model_args)
self.tokenizer = AutoTokenizer.from_pretrained(
tokenizer_name_or_path if tokenizer_name_or_path is not None else model_name_or_path,
cache_dir=cache_dir,
**tokenizer_args,
)
# No max_seq_length set. Try to infer from model
if max_seq_length is None:
if (
hasattr(self.auto_model, "config")
and hasattr(self.auto_model.config, "max_position_embeddings")
and hasattr(self.tokenizer, "model_max_length")
):
max_seq_length = min(self.auto_model.config.max_position_embeddings, self.tokenizer.model_max_length)
self.max_seq_length = max_seq_length
if tokenizer_name_or_path is not None:
self.auto_model.config.tokenizer_class = self.tokenizer.__class__.__name__
def _load_model(self, model_name_or_path, config, cache_dir, **model_args):
"""Loads the transformer model"""
if isinstance(config, T5Config):
self._load_t5_model(model_name_or_path, config, cache_dir, **model_args)
elif isinstance(config, MT5Config):
self._load_mt5_model(model_name_or_path, config, cache_dir, **model_args)
else:
self.auto_model = AutoModel.from_pretrained(
model_name_or_path, config=config, cache_dir=cache_dir, **model_args
)
def _load_t5_model(self, model_name_or_path, config, cache_dir, **model_args):
"""Loads the encoder model from T5"""
from transformers import T5EncoderModel
T5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"]
self.auto_model = T5EncoderModel.from_pretrained(
model_name_or_path, config=config, cache_dir=cache_dir, **model_args
)
def _load_mt5_model(self, model_name_or_path, config, cache_dir, **model_args):
"""Loads the encoder model from T5"""
from transformers import MT5EncoderModel
MT5EncoderModel._keys_to_ignore_on_load_unexpected = ["decoder.*"]
self.auto_model = MT5EncoderModel.from_pretrained(
model_name_or_path, config=config, cache_dir=cache_dir, **model_args
)
def __repr__(self):
return "Transformer({}) with Transformer model: {} ".format(
self.get_config_dict(), self.auto_model.__class__.__name__
)
def forward(self, features):
"""Returns token_embeddings, cls_token"""
trans_features = {"input_ids": features["input_ids"], "attention_mask": features["attention_mask"]}
if "token_type_ids" in features:
trans_features["token_type_ids"] = features["token_type_ids"]
output_states = self.auto_model(**trans_features, return_dict=False)
output_tokens = output_states[0]
features.update({"token_embeddings": output_tokens, "attention_mask": features["attention_mask"]})
if self.auto_model.config.output_hidden_states:
all_layer_idx = 2
if len(output_states) < 3: # Some models only output last_hidden_states and all_hidden_states
all_layer_idx = 1
hidden_states = output_states[all_layer_idx]
features.update({"all_layer_embeddings": hidden_states})
return features
def get_word_embedding_dimension(self) -> int:
return self.auto_model.config.hidden_size
def tokenize(self, texts: Union[List[str], List[Dict], List[Tuple[str, str]]], padding: Union[str, bool] = True):
"""
Tokenizes a text and maps tokens to token-ids
"""
output = {}
if isinstance(texts[0], str):
to_tokenize = [texts]
elif isinstance(texts[0], dict):
to_tokenize = []
output["text_keys"] = []
for lookup in texts:
text_key, text = next(iter(lookup.items()))
to_tokenize.append(text)
output["text_keys"].append(text_key)
to_tokenize = [to_tokenize]
else:
batch1, batch2 = [], []
for text_tuple in texts:
batch1.append(text_tuple[0])
batch2.append(text_tuple[1])
to_tokenize = [batch1, batch2]
# strip
to_tokenize = [[str(s).strip() for s in col] for col in to_tokenize]
# Lowercase
if self.do_lower_case:
to_tokenize = [[s.lower() for s in col] for col in to_tokenize]
output.update(
self.tokenizer(
*to_tokenize,
padding=padding,
truncation="longest_first",
return_tensors="pt",
max_length=self.max_seq_length,
)
)
return output
def get_config_dict(self):
return {key: self.__dict__[key] for key in self.config_keys}
def save(self, output_path: str, safe_serialization: bool = True):
self.auto_model.save_pretrained(output_path, safe_serialization=safe_serialization)
self.tokenizer.save_pretrained(output_path)
with open(os.path.join(output_path, "sentence_bert_config.json"), "w") as fOut:
json.dump(self.get_config_dict(), fOut, indent=2)
@staticmethod
def load(input_path: str):
# Old classes used other config names than 'sentence_bert_config.json'
for config_name in [
"sentence_bert_config.json",
"sentence_roberta_config.json",
"sentence_distilbert_config.json",
"sentence_camembert_config.json",
"sentence_albert_config.json",
"sentence_xlm-roberta_config.json",
"sentence_xlnet_config.json",
]:
sbert_config_path = os.path.join(input_path, config_name)
if os.path.exists(sbert_config_path):
break
with open(sbert_config_path) as fIn:
config = json.load(fIn)
# Don't allow configs to set trust_remote_code
if "model_args" in config:
config["model_args"].pop("trust_remote_code")
return Transformer(model_name_or_path=input_path, **config)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment