Commit be5bf7b8 authored by Morgan Funtowicz's avatar Morgan Funtowicz
Browse files

Added NER pipeline.

parent 80eacb8f
# coding=utf-8 # coding=utf-8
# Copyright 2018 The HuggingFace Inc. team. # Copyright 2018 The HuggingFace Inc. team.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals from __future__ import absolute_import, division, print_function, unicode_literals
import os import os
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Union, Optional, Tuple, List, Dict from itertools import groupby
from typing import Union, Optional, Tuple, List, Dict
import numpy as np
import numpy as np
from transformers import is_tf_available, is_torch_available, logger, AutoTokenizer, PreTrainedTokenizer, \
SquadExample, squad_convert_examples_to_features from transformers import AutoTokenizer, PreTrainedTokenizer, PretrainedConfig, \
SquadExample, squad_convert_examples_to_features, is_tf_available, is_torch_available, logger
if is_tf_available():
from transformers import TFAutoModelForSequenceClassification, TFAutoModelForQuestionAnswering if is_tf_available():
from transformers import TFAutoModelForSequenceClassification, TFAutoModelForQuestionAnswering, TFAutoModelForTokenClassification
if is_torch_available():
from transformers import AutoModelForSequenceClassification, AutoModelForQuestionAnswering if is_torch_available():
import torch
from transformers import AutoModelForSequenceClassification, AutoModelForQuestionAnswering, AutoModelForTokenClassification
class Pipeline(ABC):
def __init__(self, model, tokenizer: PreTrainedTokenizer = None, **kwargs):
self.model = model class Pipeline(ABC):
self.tokenizer = tokenizer def __init__(self, model, tokenizer: PreTrainedTokenizer = None, **kwargs):
self.model = model
@classmethod self.tokenizer = tokenizer
@abstractmethod
def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs): @classmethod
raise NotImplementedError() @abstractmethod
def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs):
def save_pretrained(self, save_directory): raise NotImplementedError()
if not os.path.isdir(save_directory):
logger.error("Provided path ({}) should be a directory".format(save_directory)) def save_pretrained(self, save_directory):
return if not os.path.isdir(save_directory):
logger.error("Provided path ({}) should be a directory".format(save_directory))
self.model.save_pretrained(save_directory) return
self.tokenizer.save_pretrained(save_directory)
self.model.save_pretrained(save_directory)
def transform(self, *texts, **kwargs): self.tokenizer.save_pretrained(save_directory)
# Generic compatibility with sklearn and Keras
return self(*texts, **kwargs) def transform(self, *texts, **kwargs):
# Generic compatibility with sklearn and Keras
def predict(self, *texts, **kwargs): return self(*texts, **kwargs)
# Generic compatibility with sklearn and Keras
return self(*texts, **kwargs) def predict(self, *texts, **kwargs):
# Generic compatibility with sklearn and Keras
@abstractmethod return self(*texts, **kwargs)
def __call__(self, *texts, **kwargs):
raise NotImplementedError() @abstractmethod
def __call__(self, *texts, **kwargs):
raise NotImplementedError()
class TextClassificationPipeline(Pipeline):
def __init__(self, model, tokenizer: PreTrainedTokenizer, nb_classes: int = 2):
super().__init__(model, tokenizer) class TextClassificationPipeline(Pipeline):
def __init__(self, model, tokenizer: PreTrainedTokenizer, nb_classes: int = 2):
if nb_classes < 2: super().__init__(model, tokenizer)
raise Exception('Invalid parameter nb_classes. int >= 2 is required (got: {})'.format(nb_classes))
self._nb_classes = nb_classes if nb_classes < 2:
raise Exception('Invalid parameter nb_classes. int >= 2 is required (got: {})'.format(nb_classes))
@classmethod self._nb_classes = nb_classes
def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs):
return cls(model, tokenizer, **kwargs) @classmethod
def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs):
def __call__(self, *texts, **kwargs): return cls(model, tokenizer, **kwargs)
# Generic compatibility with sklearn and Keras
if 'X' in kwargs and not texts: def __call__(self, *texts, **kwargs):
texts = kwargs.pop('X') # Generic compatibility with sklearn and Keras
if 'X' in kwargs and not texts:
inputs = self.tokenizer.batch_encode_plus( texts = kwargs.pop('X')
texts, add_special_tokens=True, return_tensors='tf' if is_tf_available() else 'pt'
) inputs = self.tokenizer.batch_encode_plus(
texts, add_special_tokens=True, return_tensors='tf' if is_tf_available() else 'pt'
special_tokens_mask = inputs.pop('special_tokens_mask') )
if is_tf_available(): special_tokens_mask = inputs.pop('special_tokens_mask')
# TODO trace model
predictions = self.model(**inputs)[0] if is_tf_available():
else: # TODO trace model
import torch predictions = self.model(**inputs)[0]
with torch.no_grad(): else:
predictions = self.model(**inputs)[0] import torch
with torch.no_grad():
return predictions.numpy().tolist() predictions = self.model(**inputs)[0]
return predictions.numpy().tolist()
class QuestionAnsweringPipeline(Pipeline):
"""
Question Answering pipeling involving Tokenization and Inference. class NerPipeline(Pipeline):
"""
def __init__(self, model, tokenizer: PreTrainedTokenizer):
@classmethod super().__init__(model, tokenizer)
def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs):
pass @classmethod
def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs):
@staticmethod pass
def create_sample(question: Union[str, List[str]], context: Union[str, List[str]]) -> Union[SquadExample, List[SquadExample]]:
is_list = isinstance(question, list) def __call__(self, *texts, **kwargs):
(texts, ), answers = texts, []
if is_list:
return [SquadExample(None, q, c, None, None, None) for q, c in zip(question, context)] for sentence in texts:
else:
return SquadExample(None, question, context, None, None, None) # Ugly token to word idx mapping (for now)
token_to_word, words = [], sentence.split(' ')
@staticmethod for i, w in enumerate(words):
def handle_args(*inputs, **kwargs) -> List[SquadExample]: tokens = self.tokenizer.tokenize(w)
# Position args, handling is sensibly the same as X and data, so forwarding to avoid duplicating token_to_word += [i] * len(tokens)
if inputs is not None and len(inputs) > 1: tokens = self.tokenizer.encode_plus(sentence, return_attention_mask=False, return_tensors='tf' if is_tf_available() else 'pt')
kwargs['X'] = inputs
# Forward
# Generic compatibility with sklearn and Keras if is_torch_available():
# Batched data with torch.no_grad():
if 'X' in kwargs or 'data' in kwargs: entities = self.model(**tokens)[0][0].cpu().numpy()
data = kwargs['X'] if 'X' in kwargs else kwargs['data'] else:
entities = self.model(tokens)[0][0].numpy()
if not isinstance(data, list):
data = [data] # Normalize scores
answer, token_start = [], 1
for i, item in enumerate(data): for idx, word in groupby(token_to_word[1:-1]):
if isinstance(item, dict):
if any(k not in item for k in ['question', 'context']): # Sum log prob over token, then normalize across labels
raise KeyError('You need to provide a dictionary with keys {question:..., context:...}') score = np.exp(entities[token_start]) / np.exp(entities[token_start]).sum(-1, keepdims=True)
data[i] = QuestionAnsweringPipeline.create_sample(**item) label_idx = score.argmax()
elif isinstance(item, SquadExample): answer += [{
continue 'word': words[idx - 1], 'score': score[label_idx], 'entity': self.model.config.id2label[label_idx]
else: }]
raise ValueError(
'{} argument needs to be of type (list[SquadExample | dict], SquadExample, dict)' # Update token start
.format('X' if 'X' in kwargs else 'data') token_start += len(list(word))
)
inputs = data # Append
answers += [answer]
# Tabular input return answers
elif 'question' in kwargs and 'context' in kwargs:
if isinstance(kwargs['question'], str):
kwargs['question'] = [kwargs['question']] class QuestionAnsweringPipeline(Pipeline):
"""
if isinstance(kwargs['context'], str): Question Answering pipeline involving Tokenization and Inference.
kwargs['context'] = [kwargs['context']] """
inputs = [QuestionAnsweringPipeline.create_sample(q, c) for q, c in zip(kwargs['question'], kwargs['context'])] @classmethod
else: def from_config(cls, model, tokenizer: PreTrainedTokenizer, **kwargs):
raise ValueError('Unknown arguments {}'.format(kwargs)) pass
if not isinstance(inputs, list): @staticmethod
inputs = [inputs] def create_sample(question: Union[str, List[str]], context: Union[str, List[str]]) -> Union[SquadExample, List[SquadExample]]:
is_list = isinstance(question, list)
return inputs
if is_list:
def __init__(self, model, tokenizer: Optional[PreTrainedTokenizer]): return [SquadExample(None, q, c, None, None, None) for q, c in zip(question, context)]
super().__init__(model, tokenizer) else:
return SquadExample(None, question, context, None, None, None)
def inputs_for_model(self, features: Union[SquadExample, List[SquadExample]]) -> Dict:
args = ['input_ids', 'attention_mask'] @staticmethod
model_type = type(self.model).__name__.lower() def handle_args(*inputs, **kwargs) -> List[SquadExample]:
# Position args, handling is sensibly the same as X and data, so forwarding to avoid duplicating
if 'distilbert' not in model_type and 'xlm' not in model_type: if inputs is not None and len(inputs) > 1:
args += ['token_type_ids'] kwargs['X'] = inputs
if 'xlnet' in model_type or 'xlm' in model_type: # Generic compatibility with sklearn and Keras
args += ['cls_index', 'p_mask'] # Batched data
if 'X' in kwargs or 'data' in kwargs:
if isinstance(features, SquadExample): data = kwargs['X'] if 'X' in kwargs else kwargs['data']
return {k: features.__dict__[k] for k in args}
else: if not isinstance(data, list):
return {k: [feature.__dict__[k] for feature in features] for k in args} data = [data]
def __call__(self, *texts, **kwargs): for i, item in enumerate(data):
# Set defaults values if isinstance(item, dict):
kwargs.setdefault('topk', 1) if any(k not in item for k in ['question', 'context']):
kwargs.setdefault('doc_stride', 128) raise KeyError('You need to provide a dictionary with keys {question:..., context:...}')
kwargs.setdefault('max_answer_len', 15) data[i] = QuestionAnsweringPipeline.create_sample(**item)
kwargs.setdefault('max_seq_len', 384)
kwargs.setdefault('max_question_len', 64) elif isinstance(item, SquadExample):
continue
if kwargs['topk'] < 1: else:
raise ValueError('topk parameter should be >= 1 (got {})'.format(kwargs['topk'])) raise ValueError(
'{} argument needs to be of type (list[SquadExample | dict], SquadExample, dict)'
if kwargs['max_answer_len'] < 1: .format('X' if 'X' in kwargs else 'data')
raise ValueError('max_answer_len parameter should be >= 1 (got {})'.format(kwargs['max_answer_len'])) )
inputs = data
examples = QuestionAnsweringPipeline.handle_args(texts, **kwargs)
# Tabular input
# Convert inputs to features elif 'question' in kwargs and 'context' in kwargs:
features = squad_convert_examples_to_features(examples, self.tokenizer, kwargs['max_seq_len'], kwargs['doc_stride'], kwargs['max_question_len'], False) if isinstance(kwargs['question'], str):
fw_args = self.inputs_for_model(features) kwargs['question'] = [kwargs['question']]
if is_tf_available(): if isinstance(kwargs['context'], str):
import tensorflow as tf kwargs['context'] = [kwargs['context']]
fw_args = {k: tf.constant(v) for (k, v) in fw_args.items()}
start, end = self.model(fw_args) inputs = [QuestionAnsweringPipeline.create_sample(q, c) for q, c in zip(kwargs['question'], kwargs['context'])]
start, end = start.numpy(), end.numpy() else:
else: raise ValueError('Unknown arguments {}'.format(kwargs))
import torch
with torch.no_grad(): if not isinstance(inputs, list):
# Retrieve the score for the context tokens only (removing question tokens) inputs = [inputs]
fw_args = {k: torch.tensor(v) for (k, v) in fw_args.items()}
start, end = self.model(**fw_args) return inputs
start, end = start.cpu().numpy(), end.cpu().numpy()
def __init__(self, model, tokenizer: Optional[PreTrainedTokenizer]):
answers = [] super().__init__(model, tokenizer)
for (example, feature, start_, end_) in zip(examples, features, start, end):
# Normalize logits and spans to retrieve the answer def inputs_for_model(self, features: Union[SquadExample, List[SquadExample]]) -> Dict:
start_ = np.exp(start_) / np.sum(np.exp(start_)) args = ['input_ids', 'attention_mask']
end_ = np.exp(end_) / np.sum(np.exp(end_)) model_type = type(self.model).__name__.lower()
# Mask padding and question if 'distilbert' not in model_type and 'xlm' not in model_type:
start_, end_ = start_ * np.abs(np.array(feature.p_mask) - 1), end_ * np.abs(np.array(feature.p_mask) - 1) args += ['token_type_ids']
# Mask CLS if 'xlnet' in model_type or 'xlm' in model_type:
start_[0] = end_[0] = 0 args += ['cls_index', 'p_mask']
starts, ends, scores = self.decode(start_, end_, kwargs['topk'], kwargs['max_answer_len']) if isinstance(features, SquadExample):
char_to_word = np.array(example.char_to_word_offset) return {k: features.__dict__[k] for k in args}
else:
# Convert the answer (tokens) back to the original text return {k: [feature.__dict__[k] for feature in features] for k in args}
answers += [[
{ def __call__(self, *texts, **kwargs):
'score': score, # Set defaults values
'start': np.where(char_to_word == feature.token_to_orig_map[s])[0][0], kwargs.setdefault('topk', 1)
'end': np.where(char_to_word == feature.token_to_orig_map[e])[0][-1], kwargs.setdefault('doc_stride', 128)
'answer': ' '.join(example.doc_tokens[feature.token_to_orig_map[s]: feature.token_to_orig_map[e] + 1]) kwargs.setdefault('max_answer_len', 15)
} kwargs.setdefault('max_seq_len', 384)
for s, e, score in zip(starts, ends, scores) kwargs.setdefault('max_question_len', 64)
]]
if kwargs['topk'] < 1:
return answers raise ValueError('topk parameter should be >= 1 (got {})'.format(kwargs['topk']))
def decode(self, start: np.ndarray, end: np.ndarray, topk: int, max_answer_len: int) -> Tuple: if kwargs['max_answer_len'] < 1:
# Ensure we have batch axis raise ValueError('max_answer_len parameter should be >= 1 (got {})'.format(kwargs['max_answer_len']))
if start.ndim == 1:
start = start[None] examples = QuestionAnsweringPipeline.handle_args(texts, **kwargs)
if end.ndim == 1: # Convert inputs to features
end = end[None] features = squad_convert_examples_to_features(examples, self.tokenizer, kwargs['max_seq_len'], kwargs['doc_stride'], kwargs['max_question_len'], False)
fw_args = self.inputs_for_model(features)
# Compute the score of each tuple(start, end) to be the real answer
outer = np.matmul(np.expand_dims(start, -1), np.expand_dims(end, 1)) if is_tf_available():
import tensorflow as tf
# Remove candidate with end < start and end - start > max_answer_len fw_args = {k: tf.constant(v) for (k, v) in fw_args.items()}
candidates = np.tril(np.triu(outer), max_answer_len - 1) start, end = self.model(fw_args)
start, end = start.numpy(), end.numpy()
# Inspired by Chen & al. (https://github.com/facebookresearch/DrQA) else:
scores_flat = candidates.flatten() import torch
if topk == 1: with torch.no_grad():
idx_sort = [np.argmax(scores_flat)] # Retrieve the score for the context tokens only (removing question tokens)
elif len(scores_flat) < topk: fw_args = {k: torch.tensor(v) for (k, v) in fw_args.items()}
idx_sort = np.argsort(-scores_flat) start, end = self.model(**fw_args)
else: start, end = start.cpu().numpy(), end.cpu().numpy()
idx = np.argpartition(-scores_flat, topk)[0:topk]
idx_sort = idx[np.argsort(-scores_flat[idx])] answers = []
for (example, feature, start_, end_) in zip(examples, features, start, end):
start, end = np.unravel_index(idx_sort, candidates.shape)[1:] # Normalize logits and spans to retrieve the answer
return start, end, candidates[0, start, end] start_ = np.exp(start_) / np.sum(np.exp(start_))
end_ = np.exp(end_) / np.sum(np.exp(end_))
def span_to_answer(self, text: str, start: int, end: int):
words = [] # Mask padding and question
token_idx = char_start_idx = char_end_idx = chars_idx = 0 start_, end_ = start_ * np.abs(np.array(feature.p_mask) - 1), end_ * np.abs(np.array(feature.p_mask) - 1)
for i, word in enumerate(text.split(" ")): # TODO : What happend if not possible
token = self.tokenizer.tokenize(word) # Mask CLS
start_[0] = end_[0] = 0
# Append words if they are in the span
if start <= token_idx <= end: starts, ends, scores = self.decode(start_, end_, kwargs['topk'], kwargs['max_answer_len'])
if token_idx == start: char_to_word = np.array(example.char_to_word_offset)
char_start_idx = chars_idx
# Convert the answer (tokens) back to the original text
if token_idx == end: answers += [[
char_end_idx = chars_idx + len(word) {
'score': score,
words += [word] 'start': np.where(char_to_word == feature.token_to_orig_map[s])[0][0],
'end': np.where(char_to_word == feature.token_to_orig_map[e])[0][-1],
# Stop if we went over the end of the answer 'answer': ' '.join(example.doc_tokens[feature.token_to_orig_map[s]: feature.token_to_orig_map[e] + 1])
if token_idx > end: }
break for s, e, score in zip(starts, ends, scores)
]]
# Append the subtokenization length to the running index
token_idx += len(token) return answers
chars_idx += len(word) + 1
def decode(self, start: np.ndarray, end: np.ndarray, topk: int, max_answer_len: int) -> Tuple:
# Join text with spaces # Ensure we have batch axis
return {'answer': ' '.join(words), 'start': max(0, char_start_idx), 'end': min(len(text), char_end_idx)} if start.ndim == 1:
start = start[None]
# Register all the supported task here if end.ndim == 1:
SUPPORTED_TASKS = { end = end[None]
'text-classification': {
'impl': TextClassificationPipeline, # Compute the score of each tuple(start, end) to be the real answer
'tf': TFAutoModelForSequenceClassification if is_tf_available() else None, outer = np.matmul(np.expand_dims(start, -1), np.expand_dims(end, 1))
'pt': AutoModelForSequenceClassification if is_torch_available() else None
}, # Remove candidate with end < start and end - start > max_answer_len
'question-answering': { candidates = np.tril(np.triu(outer), max_answer_len - 1)
'impl': QuestionAnsweringPipeline,
'tf': TFAutoModelForQuestionAnswering if is_tf_available() else None, # Inspired by Chen & al. (https://github.com/facebookresearch/DrQA)
'pt': AutoModelForQuestionAnswering if is_torch_available() else None scores_flat = candidates.flatten()
} if topk == 1:
} idx_sort = [np.argmax(scores_flat)]
elif len(scores_flat) < topk:
idx_sort = np.argsort(-scores_flat)
def pipeline(task: str, model, tokenizer: Optional[Union[str, PreTrainedTokenizer]] = None, **kwargs) -> Pipeline: else:
""" idx = np.argpartition(-scores_flat, topk)[0:topk]
Utility factory method to build pipeline. idx_sort = idx[np.argsort(-scores_flat[idx])]
"""
# Try to infer tokenizer from model name (if provided as str) start, end = np.unravel_index(idx_sort, candidates.shape)[1:]
if tokenizer is None and isinstance(model, str): return start, end, candidates[0, start, end]
tokenizer = model
else: def span_to_answer(self, text: str, start: int, end: int):
# Impossible to guest what is the right tokenizer here words = []
raise Exception('Tokenizer cannot be None if provided model is a PreTrainedModel instance') token_idx = char_start_idx = char_end_idx = chars_idx = 0
tokenizer = tokenizer if isinstance(tokenizer, PreTrainedTokenizer) else AutoTokenizer.from_pretrained(tokenizer) for i, word in enumerate(text.split(" ")):
token = self.tokenizer.tokenize(word)
if task not in SUPPORTED_TASKS:
raise KeyError("Unknown task {}, available tasks are {}".format(task, list(SUPPORTED_TASKS.keys()))) # Append words if they are in the span
if start <= token_idx <= end:
targeted_task = SUPPORTED_TASKS[task] if token_idx == start:
task, allocator = targeted_task['impl'], targeted_task['tf'] if is_tf_available() else targeted_task['pt'] char_start_idx = chars_idx
model = allocator.from_pretrained(model) if token_idx == end:
return task(model, tokenizer, **kwargs) char_end_idx = chars_idx + len(word)
words += [word]
# Stop if we went over the end of the answer
if token_idx > end:
break
# Append the subtokenization length to the running index
token_idx += len(token)
chars_idx += len(word) + 1
# Join text with spaces
return {'answer': ' '.join(words), 'start': max(0, char_start_idx), 'end': min(len(text), char_end_idx)}
# Register all the supported task here
SUPPORTED_TASKS = {
'text-classification': {
'impl': TextClassificationPipeline,
'tf': TFAutoModelForSequenceClassification if is_tf_available() else None,
'pt': AutoModelForSequenceClassification if is_torch_available() else None
},
'ner': {
'impl': NerPipeline,
'tf': TFAutoModelForTokenClassification if is_tf_available() else None,
'pt': AutoModelForTokenClassification if is_torch_available() else None,
},
'question-answering': {
'impl': QuestionAnsweringPipeline,
'tf': TFAutoModelForQuestionAnswering if is_tf_available() else None,
'pt': AutoModelForQuestionAnswering if is_torch_available() else None
}
}
def pipeline(task: str, model, config: Optional[PretrainedConfig] = None, tokenizer: Optional[Union[str, PreTrainedTokenizer]] = None, **kwargs) -> Pipeline:
"""
Utility factory method to build pipeline.
"""
# Try to infer tokenizer from model name (if provided as str)
if tokenizer is None and isinstance(model, str):
tokenizer = model
else:
# Impossible to guest what is the right tokenizer here
raise Exception('Tokenizer cannot be None if provided model is a PreTrainedModel instance')
tokenizer = tokenizer if isinstance(tokenizer, PreTrainedTokenizer) else AutoTokenizer.from_pretrained(tokenizer)
if task not in SUPPORTED_TASKS:
raise KeyError("Unknown task {}, available tasks are {}".format(task, list(SUPPORTED_TASKS.keys())))
targeted_task = SUPPORTED_TASKS[task]
task, allocator = targeted_task['impl'], targeted_task['tf'] if is_tf_available() else targeted_task['pt']
model = allocator.from_pretrained(model)
return task(model, tokenizer, **kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment