import os import math import gzip import csv import logging import argparse from datetime import datetime from torch.utils.data import DataLoader from sentence_transformers import SentenceTransformer, SentencesDataset, LoggingHandler, losses, util, InputExample from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator #### Just some code to print debug information to stdout logging.basicConfig( format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO, handlers=[LoggingHandler()] ) parser = argparse.ArgumentParser() parser.add_argument('--data_path', type=str, default='datasets/stsbenchmark.tsv.gz', help='Input txt path') parser.add_argument('--train_batch_size', type=int, default=16) parser.add_argument('--num_epochs', type=int, default=10) parser.add_argument('--model_name_or_path', type=str, default="all-MiniLM-L6-v2") parser.add_argument('--save_root_path', type=str, default="output", help='Model output folder') parser.add_argument('--lr', default=2e-05) args = parser.parse_args() if __name__ == "__main__": sts_dataset_path = args.data_path # Check if dataset exists. If not, download and extract it if not os.path.exists(sts_dataset_path): util.http_get('https://public.ukp.informatik.tu-darmstadt.de/reimers/sentence-transformers/datasets/stsbenchmark.tsv.gz', sts_dataset_path) model_name = args.model_name_or_path train_batch_size = args.train_batch_size num_epochs = args.num_epochs model_save_path = args.save_root_path + "/training_stsbenchmark_" + model_name.replace("/", "-") + "-" + datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # Load a pre-trained sentence transformer model model = SentenceTransformer(model_name, device='cuda') # Convert the dataset to a DataLoader ready for training logging.info("Read STSbenchmark train dataset") # Read the dataset train_samples = [] dev_samples = [] test_samples = [] with gzip.open(sts_dataset_path, 'rt', encoding='utf8') as fIn: reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE) for row in reader: score = float(row['score']) / 5.0 # Normalize score to range 0 ... 1 inp_example = InputExample(texts=[row['sentence1'], row['sentence2']], label=score) if row['split'] == 'dev': dev_samples.append(inp_example) elif row['split'] == 'test': test_samples.append(inp_example) else: train_samples.append(inp_example) logging.info("Dealing data end.") train_dataset = SentencesDataset(train_samples, model) train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size) train_loss = losses.CosineSimilarityLoss(model=model) # Development set: Measure correlation between cosine score and gold labels logging.info("Read STSbenchmark dev dataset") evaluator = EmbeddingSimilarityEvaluator.from_input_examples(dev_samples, name="sts-dev") # Configure the training. We skip evaluation in this example warmup_steps = math.ceil(len(train_dataloader) * num_epochs * 0.1) # 10% of train data for warm-up logging.info("Warmup-steps: {}".format(warmup_steps)) print("Start training ...") # Train the model model.fit( train_objectives=[(train_dataloader, train_loss)], evaluator=evaluator, epochs=num_epochs, evaluation_steps=1000, warmup_steps=warmup_steps, optimizer_params={'lr': args.lr}, output_path=model_save_path, ) logging.info("Finetune end") ############################################################################## # # Load the stored model and evaluate its performance on STS benchmark dataset # ############################################################################## model = SentenceTransformer(model_save_path) test_evaluator = EmbeddingSimilarityEvaluator.from_input_examples(test_samples, name='sts-test') test_evaluator(model, output_path=model_save_path)