Commit 3b2dedc6 authored by mashun1's avatar mashun1
Browse files

lettucedetect

parents
Pipeline #2497 failed with stages
in 0 seconds
from abc import ABC, abstractmethod
import torch
from transformers import AutoModelForTokenClassification, AutoTokenizer
from lettucedetect.datasets.ragtruth import RagTruthDataset
PROMPT_QA = """
Briefly answer the following question:
{question}
Bear in mind that your response should be strictly based on the following {num_passages} passages:
{context}
In case the passages do not contain the necessary information to answer the question, please reply with: "Unable to answer based on given passages."
output:
"""
PROMPT_SUMMARY = """
Summarize the following text:
{text}
output:
"""
class BaseDetector(ABC):
@abstractmethod
def predict(self, context: str, answer: str, output_format: str = "tokens") -> list:
"""Given a context and an answer, returns predictions.
:param context: The context string.
:param answer: The answer string.
:param output_format: "tokens" to return token-level predictions, or "spans" to return grouped spans.
"""
pass
class TransformerDetector(BaseDetector):
def __init__(self, model_path: str, max_length: int = 4096, device=None, **kwargs):
"""Initialize the TransformerDetector.
:param model_path: The path to the model.
:param max_length: The maximum length of the input sequence.
:param device: The device to run the model on.
"""
self.tokenizer = AutoTokenizer.from_pretrained(model_path, **kwargs)
self.model = AutoModelForTokenClassification.from_pretrained(model_path, **kwargs)
self.max_length = max_length
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(self.device)
self.model.eval()
def _form_prompt(self, context: list[str], question: str | None) -> str:
"""Form a prompt from the provided context and question. We use different prompts for summary and QA tasks.
:param context: A list of context strings.
:param question: The question string.
:return: The formatted prompt.
"""
context_str = "\n".join(
[f"passage {i + 1}: {passage}" for i, passage in enumerate(context)]
)
if question is None:
return PROMPT_SUMMARY.format(text=context_str)
else:
return PROMPT_QA.format(
question=question, num_passages=len(context), context=context_str
)
def _predict(self, context: str, answer: str, output_format: str = "tokens") -> list:
"""Predict hallucination tokens or spans from the provided context and answer.
:param context: The context string.
:param answer: The answer string.
:param output_format: "tokens" to return token-level predictions, or "spans" to return grouped spans.
"""
# Use the shared tokenization logic from RagTruthDataset
encoding, labels, offsets, answer_start_token = RagTruthDataset.prepare_tokenized_input(
self.tokenizer, context, answer, self.max_length
)
# Create a label tensor: mark tokens before answer as -100 (ignored) and answer tokens as 0.
labels = torch.full_like(encoding.input_ids[0], -100, device=self.device)
labels[answer_start_token:] = 0
# Move encoding to the device
encoding = {key: value.to(self.device) for key, value in encoding.items()}
labels = torch.tensor(labels, device=self.device)
# Run model inference
with torch.no_grad():
outputs = self.model(**encoding)
logits = outputs.logits
token_preds = torch.argmax(logits, dim=-1)[0]
probabilities = torch.softmax(logits, dim=-1)[0]
# Mask out predictions for context tokens.
token_preds = torch.where(labels == -100, labels, token_preds)
if output_format == "tokens":
# return token probabilities for each token (with the tokens as well, if not -100)
token_probs = []
input_ids = encoding["input_ids"][0] # Get the input_ids tensor from the encoding dict
for i, (token, pred, prob) in enumerate(zip(input_ids, token_preds, probabilities)):
if not labels[i].item() == -100:
token_probs.append(
{
"token": self.tokenizer.decode([token]),
"pred": pred.item(),
"prob": prob[1].item(), # Get probability for class 1 (hallucination)
}
)
return token_probs
elif output_format == "spans":
# Compute the answer's character offset (the first token of the answer).
if answer_start_token < offsets.size(0):
answer_char_offset = offsets[answer_start_token][0].item()
else:
answer_char_offset = 0
spans: list[dict] = []
current_span: dict | None = None
# Iterate over tokens in the answer region.
for i in range(answer_start_token, token_preds.size(0)):
# Skip tokens marked as ignored.
if labels[i].item() == -100:
continue
token_start, token_end = offsets[i].tolist()
# Skip special tokens with zero length.
if token_start == token_end:
continue
# Adjust offsets relative to the answer text.
rel_start = token_start - answer_char_offset
rel_end = token_end - answer_char_offset
is_hallucination = (
token_preds[i].item() == 1
) # assuming class 1 indicates hallucination.
confidence = probabilities[i, 1].item() if is_hallucination else 0.0
if is_hallucination:
if current_span is None:
current_span = {
"start": rel_start,
"end": rel_end,
"confidence": confidence,
}
else:
# Extend the current span.
current_span["end"] = rel_end
current_span["confidence"] = max(current_span["confidence"], confidence)
else:
# If we were building a hallucination span, finalize it.
if current_span is not None:
# Extract the hallucinated text from the answer.
span_text = answer[current_span["start"] : current_span["end"]]
current_span["text"] = span_text
spans.append(current_span)
current_span = None
# Append any span still in progress.
if current_span is not None:
span_text = answer[current_span["start"] : current_span["end"]]
current_span["text"] = span_text
spans.append(current_span)
return spans
else:
raise ValueError("Invalid output_format. Use 'tokens' or 'spans'.")
def predict_prompt(self, prompt: str, answer: str, output_format: str = "tokens") -> list:
"""Predict hallucination tokens or spans from the provided prompt and answer.
:param prompt: The prompt string.
:param answer: The answer string.
:param output_format: "tokens" to return token-level predictions, or "spans" to return grouped spans.
"""
return self._predict(prompt, answer, output_format)
def predict(
self,
context: list[str],
answer: str,
question: str | None = None,
output_format: str = "tokens",
) -> list:
"""Predict hallucination tokens or spans from the provided context, answer, and question.
This is a useful interface when we don't want to predict a specific prompt, but rather we have a list of contexts, answers, and questions. Useful to interface with RAG systems.
:param context: A list of context strings.
:param answer: The answer string.
:param question: The question string.
:param output_format: "tokens" to return token-level predictions, or "spans" to return grouped spans.
"""
prompt = self._form_prompt(context, question)
return self._predict(prompt, answer, output_format)
class HallucinationDetector:
def __init__(self, method: str = "transformer", **kwargs):
"""Facade for the hallucination detector.
:param method: "transformer" for the model-based approach.
:param kwargs: Additional keyword arguments passed to the underlying detector.
"""
if method == "transformer":
self.detector = TransformerDetector(**kwargs)
else:
raise ValueError("Unsupported method. Choose 'transformer'.")
def predict(
self,
context: list[str],
answer: str,
question: str | None = None,
output_format: str = "tokens",
) -> list:
"""Predict hallucination tokens or spans from the provided context, answer, and question.
This is a useful interface when we don't want to predict a specific prompt, but rather we have a list of contexts, answers, and questions. Useful to interface with RAG systems.
:param context: A list of context strings.
:param answer: The answer string.
:param question: The question string.
"""
return self.detector.predict(context, answer, question, output_format)
def predict_prompt(self, prompt: str, answer: str, output_format: str = "tokens") -> list:
"""Predict hallucination tokens or spans from the provided prompt and answer.
:param prompt: The prompt string.
:param answer: The answer string.
"""
return self.detector.predict_prompt(prompt, answer, output_format)
import time
from datetime import timedelta
import torch
from torch.nn import Module
from torch.optim import Optimizer
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
from transformers import PreTrainedTokenizer
from lettucedetect.models.evaluator import evaluate_model, print_metrics
class Trainer:
def __init__(
self,
model: Module,
tokenizer: PreTrainedTokenizer,
train_loader: DataLoader,
test_loader: DataLoader,
epochs: int = 6,
learning_rate: float = 1e-5,
save_path: str = "best_model",
device: torch.device | None = None,
):
"""Initialize the trainer.
:param model: The model to train
:param tokenizer: Tokenizer for the model
:param train_loader: DataLoader for training data
:param test_loader: DataLoader for test data
:param epochs: Number of training epochs
:param learning_rate: Learning rate for optimization
:param save_path: Path to save the best model
:param device: Device to train on (defaults to cuda if available)
"""
self.model = model
self.tokenizer = tokenizer
self.train_loader = train_loader
self.test_loader = test_loader
self.epochs = epochs
self.learning_rate = learning_rate
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.save_path = save_path
self.optimizer: Optimizer = torch.optim.AdamW(
self.model.parameters(), lr=self.learning_rate
)
self.model.to(self.device)
def train(self) -> float:
"""Train the model.
Returns:
Best F1 score achieved during training
"""
best_f1: float = 0
start_time = time.time()
print(f"\nStarting training on {self.device}")
print(
f"Training samples: {len(self.train_loader.dataset)}, "
f"Test samples: {len(self.test_loader.dataset)}\n"
)
for epoch in range(self.epochs):
epoch_start = time.time()
print(f"\nEpoch {epoch + 1}/{self.epochs}")
self.model.train()
total_loss = 0
num_batches = 0
progress_bar = tqdm(self.train_loader, desc="Training", leave=True)
for batch in progress_bar:
self.optimizer.zero_grad()
outputs = self.model(
batch["input_ids"].to(self.device),
attention_mask=batch["attention_mask"].to(self.device),
labels=batch["labels"].to(self.device),
)
loss = outputs.loss
loss.backward()
self.optimizer.step()
total_loss += loss.item()
num_batches += 1
progress_bar.set_postfix(
{
"loss": f"{loss.item():.4f}",
"avg_loss": f"{total_loss / num_batches:.4f}",
}
)
avg_loss = total_loss / num_batches
epoch_time = time.time() - epoch_start
print(
f"Epoch {epoch + 1} completed in {timedelta(seconds=int(epoch_time))}. Average loss: {avg_loss:.4f}"
)
print("\nEvaluating...")
metrics = evaluate_model(self.model, self.test_loader, self.device)
print_metrics(metrics)
if metrics["hallucinated"]["f1"] > best_f1:
best_f1 = metrics["hallucinated"]["f1"]
self.model.save_pretrained(self.save_path)
self.tokenizer.save_pretrained(self.save_path)
print(f"\n🎯 New best F1: {best_f1:.4f}, model saved at '{self.save_path}'!")
print("-" * 50)
total_time = time.time() - start_time
print(f"\nTraining completed in {timedelta(seconds=int(total_time))}")
print(f"Best F1 score: {best_f1:.4f}")
return best_f1
import argparse
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from datasets import load_dataset
@dataclass
class RagBenchSample:
prompt: str
answer: str
labels: list[dict]
split: Literal["train", "dev", "test"]
task_type: str
def to_json(self) -> dict:
return {
"prompt": self.prompt,
"answer": self.answer,
"labels": self.labels,
"split": self.split,
"task_type": self.task_type,
}
@classmethod
def from_json(cls, json_dict: dict) -> "RagBenchSample":
return cls(
prompt=json_dict["prompt"],
answer=json_dict["answer"],
labels=json_dict["labels"],
split=json_dict["split"],
task_type=json_dict["task_type"],
)
@dataclass
class RagBenchData:
samples: list[RagBenchSample]
def to_json(self) -> list[dict]:
return [sample.to_json() for sample in self.samples]
@classmethod
def from_json(cls, json_dict: list[dict]) -> "RagBenchSample":
return cls(
samples=[RagBenchSample.from_json(sample) for sample in json_dict],
)
def load_data(hugging_dir: str) -> dict:
"""Load the RAG Bench data.
:param input_dir: Path to the input directory.
"""
ragbench = {}
for dataset in [
"covidqa",
"cuad",
"delucionqa",
"emanual",
"expertqa",
"finqa",
"hagrid",
"hotpotqa",
"msmarco",
"pubmedqa",
"tatqa",
"techqa",
]:
ragbench[dataset] = load_dataset(hugging_dir, dataset)
return ragbench
def create_labels(response, halucinations):
labels = []
resp = " ".join([sentence for label, sentence in response["response_sentences"]])
for hal in halucinations:
match = re.search(re.escape(hal), resp)
labels.append({"start": match.start(), "end": match.end(), "label": "Not supported"})
return labels
def create_sample(response: dict) -> RagBenchSample:
"""Create a sample from the RAGBench data.
:param response: The response from the RAG bench data.
"""
prompt = (
"Instruction:"
+ "\n"
+ " Answer the question: "
+ response["question"]
+ "\n"
+ "Use only the following information:"
+ "\n".join(response["documents"])
)
answer = " ".join([sentence for label, sentence in response["response_sentences"]])
split = response["dataset_name"].split("_")[1]
task_type = response["dataset_name"].split("_")[0]
labels = []
hallucinations = []
if len(response["unsupported_response_sentence_keys"]) > 0:
hallucinations = [
sentence
for label, sentence in response["response_sentences"]
if label in response["unsupported_response_sentence_keys"]
]
labels = create_labels(response, hallucinations)
return RagBenchSample(prompt, answer, labels, split, task_type)
def get_data_split(data, name, split):
dataset = data.get(name)
data_split = dataset.get(split)
return data_split
def main(input_dir: str, output_dir: Path):
"""Preprocess the RAGBench data.
param input_dir: Path to HuggingFace directory
param output_dir: Path to the output directory.
"""
output_dir = Path(output_dir)
data = load_data(input_dir)
rag_bench_data = RagBenchData(samples=[])
for dataset_name in data:
for split in ["train", "test", "validation"]:
data_split = get_data_split(data, dataset_name, split)
for response in data_split:
if not response["dataset_name"]:
continue
sample = create_sample(response)
rag_bench_data.samples.append(sample)
(output_dir / "ragbench_data.json").write_text(json.dumps(rag_bench_data.to_json(), indent=4))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input_dir", type=str, required=True)
parser.add_argument("--output_dir", type=str, required=True)
args = parser.parse_args()
main(args.input_dir, args.output_dir)
import argparse
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
@dataclass
class RagTruthSample:
prompt: str
answer: str
labels: list[dict]
split: Literal["train", "dev", "test"]
task_type: str
def to_json(self) -> dict:
return {
"prompt": self.prompt,
"answer": self.answer,
"labels": self.labels,
"split": self.split,
"task_type": self.task_type,
}
@classmethod
def from_json(cls, json_dict: dict) -> "RagTruthSample":
return cls(
prompt=json_dict["prompt"],
answer=json_dict["answer"],
labels=json_dict["labels"],
split=json_dict["split"],
task_type=json_dict["task_type"],
)
@dataclass
class RagTruthData:
samples: list[RagTruthSample]
def to_json(self) -> list[dict]:
return [sample.to_json() for sample in self.samples]
@classmethod
def from_json(cls, json_dict: list[dict]) -> "RagTruthData":
return cls(
samples=[RagTruthSample.from_json(sample) for sample in json_dict],
)
def load_data(input_dir: Path) -> tuple[list[dict], list[dict]]:
"""Load the RAG truth data.
:param input_dir: Path to the input directory.
"""
responses = [
json.loads(line) for line in (input_dir / "response.jsonl").read_text().splitlines()
]
sources = [
json.loads(line) for line in (input_dir / "source_info.jsonl").read_text().splitlines()
]
return responses, sources
def create_sample(response: dict, source: dict) -> RagTruthSample:
"""Create a sample from the RAG truth data.
:param response: The response from the RAG truth data.
:param source: The source from the RAG truth data.
"""
prompt = source["prompt"]
answer = response["response"]
split = response["split"]
task_type = source["task_type"]
labels = []
for label in response["labels"]:
start_char = label["start"]
end_char = label["end"]
labels.append(
{
"start": start_char,
"end": end_char,
"label": label["label_type"],
}
)
return RagTruthSample(prompt, answer, labels, split, task_type)
def main(input_dir: Path, output_dir: Path):
"""Preprocess the RAG truth data.
:param input_dir: Path to the input directory.
:param output_dir: Path to the output directory.
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
responses, sources = load_data(input_dir)
sources_by_id = {source["source_id"]: source for source in sources}
rag_truth_data = RagTruthData(samples=[])
for response in responses:
sample = create_sample(response, sources_by_id[response["source_id"]])
rag_truth_data.samples.append(sample)
(output_dir / "ragtruth_data.json").write_text(json.dumps(rag_truth_data.to_json(), indent=4))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input_dir", type=str, required=True)
parser.add_argument("--output_dir", type=str, required=True)
args = parser.parse_args()
main(args.input_dir, args.output_dir)
# 模型编码
modelCode=1454
# 模型名称
modelName=LettuceDtect_pytorch
# 模型描述
modelDescription=轻量级RAG幻觉检测
# 应用场景
appScenario=训练,推理,文本检测,教育,零售,金融,通信
# 框架类型
frameType=pytorch
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "lettucedetect"
version = "0.1.6"
description = "Lettucedetect is a framework for detecting hallucinations in RAG applications."
readme = {file = "README.md", content-type = "text/markdown"}
requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{name = "Adam Kovacs", email = "kovacs@krlabs.eu"},
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
#"torch>=2.6.0",
"transformers>=4.48.3",
"tqdm>=4.65.0",
"scikit-learn>=1.6.1",
#"numpy>=2.2.2",
]
[project.urls]
Homepage = "https://github.com/krlabsorg/lettucedetect"
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-cov>=4.0.0",
"ruff>=0.0.270",
]
[tool.setuptools]
packages = ["lettucedetect"]
[tool.pytest]
testpaths = ["tests"]
python_files = "test_*_pytest.py"
[tool.ruff]
line-length = 100
[tool.ruff.lint]
# https://docs.astral.sh/ruff/rules/
select = [
"E", # flake8
"F", # pyflakes
"I", # isort
"C90", # mccabe
"D", # pydocstyle
"ANN", # type annotations
"S", # bandit
"EXE", # flake8 executable
"PTH", # use pathlib
"RUF", # ruff rules
]
ignore = [
"E501", # line length
"D100", # module docstring
"D104", # missing docstring in public package
"D203", # blank line required before class
"D211", # no blank line before class
"D213", # multi line summary second line
"ANN003", # **kwargs annotation
"ANN204", # missing return type for __init__
"PTH123", # path.open
]
import argparse
import json
from pathlib import Path
import numpy as np
from transformers import AutoTokenizer
from lettucedetect.preprocess.preprocess_ragtruth import RagTruthData
def analyze_token_distribution(samples, tokenizer):
token_counts = []
for sample in samples:
# Combine prompt and answer
full_text = f"{sample.prompt}\n{sample.answer}"
# Tokenize
tokens = tokenizer.encode(full_text)
token_counts.append(len(tokens))
# Calculate statistics
stats = {
"mean": np.mean(token_counts),
"median": np.median(token_counts),
"std": np.std(token_counts),
"min": np.min(token_counts),
"max": np.max(token_counts),
"percentile_90": np.percentile(token_counts, 90),
"percentile_95": np.percentile(token_counts, 95),
"total_samples": len(token_counts),
}
return token_counts, stats
def main():
parser = argparse.ArgumentParser(description="Analyze token distribution in the dataset")
parser.add_argument(
"--data_path",
type=str,
required=True,
help="Path to the data (JSON format)",
)
parser.add_argument(
"--model_name",
type=str,
default="bert-base-uncased",
help="Name or path of the tokenizer to use",
)
args = parser.parse_args()
# Load data
data_path = Path(args.data_path)
rag_truth_data = RagTruthData.from_json(json.loads(data_path.read_text()))
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained(args.model_name)
# Analyze all samples
print("\nAnalyzing token distribution for all samples...")
token_counts, stats = analyze_token_distribution(rag_truth_data.samples, tokenizer)
# Print results
print("\nToken Distribution Statistics:")
print(f"Total samples: {stats['total_samples']}")
print(f"Mean tokens: {stats['mean']:.1f}")
print(f"Median tokens: {stats['median']:.1f}")
print(f"Standard deviation: {stats['std']:.1f}")
print(f"Min tokens: {stats['min']}")
print(f"Max tokens: {stats['max']}")
print(f"90th percentile: {stats['percentile_90']:.1f}")
print(f"95th percentile: {stats['percentile_95']:.1f}")
# Print distribution by split
for split in ["train", "validation", "test"]:
split_samples = [s for s in rag_truth_data.samples if s.split == split]
if split_samples:
print(f"\n{split.capitalize()} split:")
_, split_stats = analyze_token_distribution(split_samples, tokenizer)
print(f"Samples: {split_stats['total_samples']}")
print(f"Mean tokens: {split_stats['mean']:.1f}")
print(f"Median tokens: {split_stats['median']:.1f}")
print(f"90th percentile: {split_stats['percentile_90']:.1f}")
if __name__ == "__main__":
main()
import argparse
import json
from pathlib import Path
import torch
from torch.utils.data import DataLoader
from transformers import (
AutoModelForTokenClassification,
AutoTokenizer,
DataCollatorForTokenClassification,
)
from lettucedetect.datasets.ragtruth import RagTruthDataset
from lettucedetect.models.evaluator import (
evaluate_detector_char_level,
evaluate_model,
evaluate_model_example_level,
print_metrics,
)
from lettucedetect.models.inference import HallucinationDetector
from lettucedetect.preprocess.preprocess_ragtruth import RagTruthData
def evaluate_task_samples(
samples,
evaluation_type,
model=None,
tokenizer=None,
detector=None,
device=None,
batch_size=8,
):
print(f"\nEvaluating model on {len(samples)} samples")
if evaluation_type in {"token_level", "example_level"}:
# Prepare the dataset and dataloader
test_dataset = RagTruthDataset(samples, tokenizer)
data_collator = DataCollatorForTokenClassification(
tokenizer=tokenizer, label_pad_token_id=-100
)
test_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
collate_fn=data_collator,
)
eval_map = {
"token_level": (evaluate_model, "Token-Level Evaluation"),
"example_level": (evaluate_model_example_level, "Example-Level Evaluation"),
}
eval_fn, eval_title = eval_map[evaluation_type]
print(f"\n---- {eval_title} ----")
metrics = eval_fn(model, test_loader, device)
print_metrics(metrics)
return metrics
else: # char_level
print("\n---- Character-Level Span Evaluation ----")
metrics = evaluate_detector_char_level(detector, samples)
print(f" Precision: {metrics['precision']:.4f}")
print(f" Recall: {metrics['recall']:.4f}")
print(f" F1: {metrics['f1']:.4f}")
return metrics
def main():
parser = argparse.ArgumentParser(description="Evaluate a hallucination detection model")
parser.add_argument("--model_path", type=str, required=True, help="Path to the saved model")
parser.add_argument(
"--data_path",
type=str,
required=True,
help="Path to the evaluation data (JSON format)",
)
parser.add_argument(
"--evaluation_type",
type=str,
default="example_level",
help="Evaluation type (token_level, example_level or char_level)",
)
parser.add_argument(
"--batch_size",
type=int,
default=8,
help="Batch size for evaluation",
)
args = parser.parse_args()
data_path = Path(args.data_path)
rag_truth_data = RagTruthData.from_json(json.loads(data_path.read_text()))
# Filter test samples from the data
test_samples = [sample for sample in rag_truth_data.samples if sample.split == "test"]
# group samples by task type
task_type_map = {}
for sample in test_samples:
if sample.task_type not in task_type_map:
task_type_map[sample.task_type] = []
task_type_map[sample.task_type].append(sample)
print(f"\nEvaluating model on test samples: {len(test_samples)}")
# Setup model/detector based on evaluation type
if args.evaluation_type in {"token_level", "example_level"}:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForTokenClassification.from_pretrained(args.model_path).to(device)
tokenizer = AutoTokenizer.from_pretrained(args.model_path)
detector = None
else: # char_level
model, tokenizer, device = None, None, None
detector = HallucinationDetector(method="transformer", model_path=args.model_path)
# Evaluate each task type separately
for task_type, samples in task_type_map.items():
print(f"\nTask type: {task_type}")
evaluate_task_samples(
samples,
args.evaluation_type,
model=model,
tokenizer=tokenizer,
detector=detector,
device=device,
batch_size=args.batch_size,
)
# Evaluate the whole dataset
print("\nTask type: whole dataset")
evaluate_task_samples(
test_samples,
args.evaluation_type,
model=model,
tokenizer=tokenizer,
detector=detector,
device=device,
batch_size=args.batch_size,
)
if __name__ == "__main__":
main()
import argparse
import json
from pathlib import Path
from torch.utils.data import DataLoader
from transformers import (
AutoModelForTokenClassification,
AutoTokenizer,
DataCollatorForTokenClassification,
)
from lettucedetect.datasets.ragtruth import RagTruthDataset
from lettucedetect.models.trainer import Trainer
from lettucedetect.preprocess.preprocess_ragtruth import RagTruthData
def parse_args():
parser = argparse.ArgumentParser(description="Train hallucination detector model")
parser.add_argument(
"--data-path",
type=str,
default="data/ragtruth/ragtruth_data.json",
help="Path to the training data JSON file",
)
parser.add_argument(
"--model-name",
type=str,
default="answerdotai/ModernBERT-base",
help="Name or path of the pretrained model",
)
parser.add_argument(
"--output-dir",
type=str,
default="output/hallucination_detector",
help="Directory to save the trained model",
)
parser.add_argument(
"--batch-size", type=int, default=4, help="Batch size for training and testing"
)
parser.add_argument("--epochs", type=int, default=6, help="Number of training epochs")
parser.add_argument(
"--learning-rate", type=float, default=1e-5, help="Learning rate for training"
)
return parser.parse_args()
def main():
args = parse_args()
data_path = Path(args.data_path)
rag_truth_data = RagTruthData.from_json(json.loads(data_path.read_text()))
train_samples = [sample for sample in rag_truth_data.samples if sample.split == "train"]
test_samples = [sample for sample in rag_truth_data.samples if sample.split == "test"]
tokenizer = AutoTokenizer.from_pretrained(args.model_name)
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer, label_pad_token_id=-100)
train_dataset = RagTruthDataset(train_samples, tokenizer)
test_dataset = RagTruthDataset(test_samples, tokenizer)
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
shuffle=True,
collate_fn=data_collator,
)
test_loader = DataLoader(
test_dataset,
batch_size=args.batch_size,
shuffle=False,
collate_fn=data_collator,
)
model = AutoModelForTokenClassification.from_pretrained(args.model_name, num_labels=2)
trainer = Trainer(
model=model,
tokenizer=tokenizer,
train_loader=train_loader,
test_loader=test_loader,
epochs=args.epochs,
learning_rate=args.learning_rate,
save_path=args.output_dir,
)
trainer.train()
if __name__ == "__main__":
main()
import argparse
from transformers import AutoModelForTokenClassification, AutoTokenizer
def main():
parser = argparse.ArgumentParser(
description="Upload a trained model and tokenizer to the Hugging Face Hub."
)
parser.add_argument(
"--model_path",
type=str,
required=True,
help="Local path to the saved model directory (contains model and tokenizer files).",
)
parser.add_argument(
"--repo_id",
type=str,
required=True,
help="Target repository id on Hugging Face (e.g., KRLabsOrg/lettucedect-base-modernbert-en-v1).",
)
parser.add_argument(
"--use_auth_token",
action="store_true",
help="Include this flag to use your Hugging Face authentication token (if not already set up).",
)
args = parser.parse_args()
print(f"Loading model and tokenizer from {args.model_path} ...")
model = AutoModelForTokenClassification.from_pretrained(args.model_path)
tokenizer = AutoTokenizer.from_pretrained(args.model_path)
print(f"Uploading model to Hugging Face Hub at repo: {args.repo_id} ...")
model.push_to_hub(args.repo_id, use_auth_token=args.use_auth_token)
tokenizer.push_to_hub(args.repo_id, use_auth_token=args.use_auth_token)
print("Upload complete!")
if __name__ == "__main__":
main()
"""Tests for the lettucedetect package."""
"""Shared fixtures for pytest tests."""
from unittest.mock import MagicMock
import pytest
import torch
@pytest.fixture
def mock_tokenizer():
"""Create a mock tokenizer for testing."""
tokenizer = MagicMock()
tokenizer.encode.return_value = [101, 102, 103, 104, 105]
# Mock tokenizer call to return encoding
tokenizer.return_value = {
"input_ids": torch.tensor([[101, 102, 103, 104, 105, 106, 107, 108]]),
"attention_mask": torch.tensor([[1, 1, 1, 1, 1, 1, 1, 1]]),
"offset_mapping": torch.tensor(
[
[0, 0], # [CLS]
[0, 4], # "This"
[5, 7], # "is"
[8, 9], # "a"
[10, 16], # "prompt"
[0, 0], # [SEP]
[0, 4], # "This"
[5, 12], # "answer"
]
),
}
return tokenizer
@pytest.fixture
def mock_model():
"""Create a mock model for testing."""
model = MagicMock()
mock_output = MagicMock()
mock_output.logits = torch.tensor([[[0.1, 0.9], [0.8, 0.2], [0.3, 0.7]]])
model.return_value = mock_output
return model
[pytest]
testpaths = tests
python_files = test_*_pytest.py
python_classes = Test*
python_functions = test_*
addopts = -v --tb=short
\ No newline at end of file
#!/usr/bin/env python
"""Script to run pytest tests for the lettucedetect package."""
import sys
import pytest
def run_tests():
"""Run pytest tests for the lettucedetect package."""
# Run pytest with specified arguments
args = [
"-v", # verbose output
"--tb=short", # shorter traceback format
"tests/test_inference_pytest.py", # only run inference tests
]
# Add any command line arguments
args.extend(sys.argv[1:])
# Run pytest and return the exit code
return pytest.main(args)
if __name__ == "__main__":
sys.exit(run_tests())
"""Pytest tests for the inference module."""
from unittest.mock import MagicMock, patch
import pytest
import torch
from lettucedetect.models.inference import HallucinationDetector, TransformerDetector
@pytest.fixture
def mock_tokenizer():
"""Create a mock tokenizer for testing."""
tokenizer = MagicMock()
tokenizer.encode.return_value = [101, 102, 103, 104, 105]
return tokenizer
@pytest.fixture
def mock_model():
"""Create a mock model for testing."""
model = MagicMock()
mock_output = MagicMock()
mock_output.logits = torch.tensor([[[0.1, 0.9], [0.8, 0.2], [0.3, 0.7]]])
model.return_value = mock_output
return model
class TestHallucinationDetector:
"""Tests for the HallucinationDetector class."""
def test_init_with_transformer_method(self):
"""Test initialization with transformer method."""
with patch("lettucedetect.models.inference.TransformerDetector") as mock_transformer:
detector = HallucinationDetector(method="transformer", model_path="dummy_path")
mock_transformer.assert_called_once_with(model_path="dummy_path")
assert isinstance(detector.detector, MagicMock)
def test_init_with_invalid_method(self):
"""Test initialization with invalid method."""
with pytest.raises(ValueError):
HallucinationDetector(method="invalid_method")
def test_predict(self):
"""Test predict method."""
# Create a mock detector with the predict method
mock_detector = MagicMock()
mock_detector.predict.return_value = []
with patch(
"lettucedetect.models.inference.TransformerDetector", return_value=mock_detector
):
detector = HallucinationDetector(method="transformer")
context = ["This is a test context."]
answer = "This is a test answer."
question = "What is the test?"
result = detector.predict(context, answer, question)
# Check that the mock detector's predict method was called with the correct arguments
mock_detector.predict.assert_called_once()
call_args = mock_detector.predict.call_args[0]
assert call_args[0] == context
assert call_args[1] == answer
assert call_args[2] == question
assert call_args[3] == "tokens"
def test_predict_prompt(self):
"""Test predict_prompt method."""
# Create a mock detector with the predict_prompt method
mock_detector = MagicMock()
mock_detector.predict_prompt.return_value = []
with patch(
"lettucedetect.models.inference.TransformerDetector", return_value=mock_detector
):
detector = HallucinationDetector(method="transformer")
prompt = "This is a test prompt."
answer = "This is a test answer."
result = detector.predict_prompt(prompt, answer)
# Check that the mock detector's predict_prompt method was called with the correct arguments
mock_detector.predict_prompt.assert_called_once()
call_args = mock_detector.predict_prompt.call_args[0]
assert call_args[0] == prompt
assert call_args[1] == answer
assert call_args[2] == "tokens"
class TestTransformerDetector:
"""Tests for the TransformerDetector class."""
@pytest.fixture(autouse=True)
def setup(self, mock_tokenizer, mock_model):
"""Set up test fixtures."""
self.mock_tokenizer = mock_tokenizer
self.mock_model = mock_model
# Patch the AutoTokenizer and AutoModelForTokenClassification
self.tokenizer_patcher = patch(
"lettucedetect.models.inference.AutoTokenizer.from_pretrained",
return_value=self.mock_tokenizer,
)
self.model_patcher = patch(
"lettucedetect.models.inference.AutoModelForTokenClassification.from_pretrained",
return_value=self.mock_model,
)
self.mock_tokenizer_cls = self.tokenizer_patcher.start()
self.mock_model_cls = self.model_patcher.start()
yield
self.tokenizer_patcher.stop()
self.model_patcher.stop()
def test_init(self):
"""Test initialization."""
detector = TransformerDetector(model_path="dummy_path")
self.mock_tokenizer_cls.assert_called_once_with("dummy_path")
self.mock_model_cls.assert_called_once_with("dummy_path")
assert detector.tokenizer == self.mock_tokenizer
assert detector.model == self.mock_model
assert detector.max_length == 4096
def test_predict(self):
"""Test predict method."""
# Create a proper mock encoding with input_ids as a tensor attribute
class MockEncoding:
def __init__(self):
self.input_ids = torch.tensor([[101, 102, 103]])
mock_encoding = MockEncoding()
mock_labels = torch.tensor([0, 0, 0])
mock_offsets = torch.tensor([[0, 0], [0, 1], [1, 2]])
mock_answer_start = 1
# Patch the _predict method to avoid the actual implementation
with patch.object(TransformerDetector, "_predict", return_value=[]):
detector = TransformerDetector(model_path="dummy_path")
context = ["This is a test context."]
answer = "This is a test answer."
question = "What is the test?"
result = detector.predict(context, answer, question)
# Verify the result
assert isinstance(result, list)
def test_form_prompt_with_question(self):
"""Test _form_prompt method with a question."""
detector = TransformerDetector(model_path="dummy_path")
context = ["This is passage 1.", "This is passage 2."]
question = "What is the test?"
prompt = detector._form_prompt(context, question)
# Check that the prompt contains the question and passages
assert question in prompt
assert "passage 1: This is passage 1." in prompt
assert "passage 2: This is passage 2." in prompt
def test_form_prompt_without_question(self):
"""Test _form_prompt method without a question (summary task)."""
detector = TransformerDetector(model_path="dummy_path")
context = ["This is a text to summarize."]
prompt = detector._form_prompt(context, None)
# Check that the prompt contains the text to summarize
assert "This is a text to summarize." in prompt
assert "Summarize" in prompt
import argparse
import json
import re
from pathlib import Path
from vllm import LLM
from vllm.sampling_params import SamplingParams
from lettucedetect.preprocess.preprocess_ragtruth import RagTruthData, RagTruthSample
def translate_text(text, model, sampling_params, source_lang="EN", target_lang="DE", hal=False):
if hal:
translation_prompt = f"""Translate the following text from {source_lang} to {target_lang}.
- If the original text contains <HAL> tags, translate the content inside <HAL> tags and ensure the number of the <HAL> tags remain exactly the same in the output.
- Do NOT add any <HAL> tags if they were not in the original text.
- Do NOT remove any <HAL> tags that were in the original text.
- Do not include any additional sentences summarizing or explaining the translation.
{source_lang}: {text}
{target_lang}:
"""
else:
translation_prompt = f"""Translate the following text from {source_lang} to {target_lang}.
- Translate only the given text.
- Do not include any additional sentences summarizing or explaining the translation.
{source_lang}: {text}
{target_lang}:
"""
system_prompt = f"You are an expert linguist, specializing in translation from {source_lang} to {target_lang}. Translate only the given text."
messages = [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": translation_prompt,
},
]
res = model.chat(messages=messages, sampling_params=sampling_params)
return res[0].outputs[0].text
def merge_overlapping_spans(labels):
"""Merge overlapping hallucination spans into a single span."""
if not labels:
return []
labels.sort(key=lambda x: x["start"])
new_labels = []
current_span = labels[0]
for span in labels[1:]:
if span["start"] <= current_span["end"]:
current_span["end"] = max(current_span["end"], span["end"])
else:
new_labels.append(current_span)
current_span = span
new_labels.append(current_span)
return new_labels
def put_hallucination_tags(sample, answer):
labels = merge_overlapping_spans(sample.labels)
labels = sorted(labels, key=lambda x: (x["end"], x["start"]), reverse=True)
for label in labels:
start, end = label["start"], label["end"]
answer = answer[:end] + "<HAL>" + answer[end:]
answer = answer[:start] + "<HAL>" + answer[start:]
return answer, labels
def find_hallucination_tags(text, labels, i, log_file):
pattern = r"<HAL>(.*?)<HAL>"
hal_spans = []
j = 0
with open(log_file, "a") as log:
for span in re.finditer(pattern, text):
start = span.start(1)
end = span.end(1)
if j < len(labels):
label = labels[j]["label"]
else:
label = "Unknown"
log.write(f"IndexError: No label for hallucinated text at sample ({i})\n")
hal_spans.append((start, end, label))
j += 1
return hal_spans
def create_sample_de(dict):
"""Create a sample from the RAG truth data.
:param response: The response from the RAG truth data.
:param source: The source from the RAG truth data.
"""
prompt = dict["prompt"]
answer = dict["answer"]
split = dict["split"]
labels = []
for label in dict["labels"]:
start_char = label["start"]
end_char = label["end"]
labels.append(
{
"start": start_char,
"end": end_char,
"label": label["label"],
}
)
task_type = dict["task_type"]
return RagTruthSample(prompt, answer, labels, split, task_type)
def translate_sample(sample, model, sampling_params, i, log_file):
"""Translate each sample of the RAG truth data."""
hal = len(sample.labels) > 0
dict_de = {}
dict_de["prompt"] = translate_text(sample.prompt, model, sampling_params)
answer, labels = put_hallucination_tags(sample, sample.answer)
dict_de["answer"] = translate_text(answer, model, sampling_params, hal=hal)
dict_de["split"] = sample.split
dict_de["task_type"] = translate_text(sample.task_type, model, sampling_params)
dict_de["labels"] = []
if hal:
hal_spans = find_hallucination_tags(dict_de["answer"], labels, i, log_file)
for span in hal_spans:
start, end, label = span
dict_de["labels"].append(
{
"start": start,
"end": end,
"label": translate_text(label, model, sampling_params),
}
)
sample_de = create_sample_de(dict_de)
return sample_de
def load_check_existing_data(output_file):
if output_file.exists():
return RagTruthData.from_json(json.loads(output_file.read_text()))
else:
return RagTruthData(samples=[])
def main(input_dir: Path, output_dir: Path):
"""Translates the already preprocessed RAG Truth Data
:param input_dir: Path to the input directory.
:param output_dir: Path to the output directory.
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
input_file = input_dir / "ragtruth_data.json"
output_file = output_dir / "ragtruth_data_de.json"
log_file = output_dir / "error_log.txt"
rag_truth_data = RagTruthData.from_json(json.loads(input_file.read_text()))
rag_truth_data_de = load_check_existing_data(output_file=output_file)
num_processed = len(rag_truth_data_de.samples)
total_samples = len(rag_truth_data.samples)
model_name = "mistralai/Mistral-7B-Instruct-v0.3"
sampling_params = SamplingParams(
max_tokens=3000,
seed=1111,
)
model = LLM(model=model_name)
for i, sample in enumerate(rag_truth_data.samples[num_processed:], start=num_processed):
sample_de = translate_sample(sample, model, sampling_params, i, log_file)
rag_truth_data_de.samples.append(sample_de)
if i % 50 == 0 or i == total_samples - 1:
(output_dir / "ragtruth_data_de.json").write_text(
json.dumps(rag_truth_data_de.to_json(), indent=4)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input_dir", type=str, required=True)
parser.add_argument("--output_dir", type=str, required=True)
args = parser.parse_args()
main(args.input_dir, args.output_dir)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment