"...git@developer.sourcefind.cn:chenpangpang/transformers.git" did not exist on "1d463303fee9976a586fb9161df0fd383910db5e"
Commit d0724d07 authored by Morgan Funtowicz's avatar Morgan Funtowicz
Browse files

Add PipedPipelineDataFormat

parent 7711403b
......@@ -191,8 +191,8 @@ from .modeling_tf_pytorch_utils import (convert_tf_weight_name_to_pt_weight_name
load_tf2_model_in_pytorch_model)
# Pipelines
# from .pipeline_ import TextClassificationPipeline
from .pipelines import Pipeline, pipeline, TextClassificationPipeline
from .pipelines import pipeline, PipelineDataFormat, CsvPipelineDataFormat, JsonPipelineDataFormat, PipedPipelineDataFormat, \
Pipeline, FeatureExtractionPipeline, QuestionAnsweringPipeline, NerPipeline, TextClassificationPipeline
if not is_tf_available() and not is_torch_available():
logger.warning("Neither PyTorch nor TensorFlow >= 2.0 have been found."
......
......@@ -36,11 +36,10 @@ class RunCommand(BaseTransformersCLICommand):
run_parser.add_argument('--model', type=str, required=True, help='Name or path to the model to instantiate.')
run_parser.add_argument('--config', type=str, help='Name or path to the model\'s config to instantiate.')
run_parser.add_argument('--tokenizer', type=str, help='Name of the tokenizer to use. (default: same as the model name)')
run_parser.add_argument('--column', type=str, required=True, help='Name of the column to use as input. (For multi columns input as QA use column1,columns2)')
run_parser.add_argument('--column', type=str, help='Name of the column to use as input. (For multi columns input as QA use column1,columns2)')
run_parser.add_argument('--format', type=str, default='infer', choices=PipelineDataFormat.SUPPORTED_FORMATS, help='Input format to read from')
run_parser.add_argument('--input', type=str, required=True, help='Path to the file to use for inference')
run_parser.add_argument('--output', type=str, required=True, help='Path to the file that will be used post to write results.')
run_parser.add_argument('kwargs', nargs='*', help='Arguments to forward to the file format reader')
run_parser.add_argument('--input', type=str, help='Path to the file to use for inference')
run_parser.add_argument('--output', type=str, help='Path to the file that will be used post to write results.')
run_parser.set_defaults(func=run_command_factory)
def run(self):
......
......@@ -66,20 +66,6 @@ class DefaultArgumentHandler(ArgumentHandler):
raise ValueError('Unable to infer the format of the provided data (X=, data=, ...)')
class _ScikitCompat(ABC):
"""
Interface layer for the Scikit and Keras compatibility.
"""
@abstractmethod
def transform(self, X):
raise NotImplementedError()
@abstractmethod
def predict(self, X):
raise NotImplementedError()
class PipelineDataFormat:
"""
Base class for all the pipeline supported data format both for reading and writing.
......@@ -90,12 +76,12 @@ class PipelineDataFormat:
PipelineDataFormat also includes some utilities to work with multi-columns like mapping from datasets columns
to pipelines keyword arguments through the `dataset_kwarg_1=dataset_column_1` format.
"""
SUPPORTED_FORMATS = ['json', 'csv']
SUPPORTED_FORMATS = ['json', 'csv', 'pipe']
def __init__(self, output: str, path: str, column: str):
def __init__(self, output: Optional[str], path: Optional[str], column: Optional[str]):
self.output = output
self.path = path
self.column = column.split(',')
self.column = column.split(',') if column else ['']
self.is_multi_columns = len(self.column) > 1
if self.is_multi_columns:
......@@ -117,17 +103,19 @@ class PipelineDataFormat:
raise NotImplementedError()
@staticmethod
def from_str(name: str, output: str, path: str, column: str):
def from_str(name: str, output: Optional[str], path: Optional[str], column: Optional[str]):
if name == 'json':
return JsonPipelineDataFormat(output, path, column)
elif name == 'csv':
return CsvPipelineDataFormat(output, path, column)
elif name == 'pipe':
return PipedPipelineDataFormat(output, path, column)
else:
raise KeyError('Unknown reader {} (Available reader are json/csv)'.format(name))
raise KeyError('Unknown reader {} (Available reader are json/csv/pipe)'.format(name))
class CsvPipelineDataFormat(PipelineDataFormat):
def __init__(self, output: str, path: str, column: str):
def __init__(self, output: Optional[str], path: Optional[str], column: Optional[str]):
super().__init__(output, path, column)
def __iter__(self):
......@@ -148,7 +136,7 @@ class CsvPipelineDataFormat(PipelineDataFormat):
class JsonPipelineDataFormat(PipelineDataFormat):
def __init__(self, output: str, path: str, column: str):
def __init__(self, output: Optional[str], path: Optional[str], column: Optional[str]):
super().__init__(output, path, column)
with open(path, 'r') as f:
......@@ -166,6 +154,50 @@ class JsonPipelineDataFormat(PipelineDataFormat):
json.dump(data, f)
class PipedPipelineDataFormat(PipelineDataFormat):
"""
Read data from piped input to the python process.
For multi columns data, columns should separated by \t
If columns are provided, then the output will be a dictionary with {column_x: value_x}
"""
def __iter__(self):
import sys
for line in sys.stdin:
# Split for multi-columns
if '\t' in line:
line = line.split('\t')
if self.column:
# Dictionary to map arguments
yield {kwargs: l for (kwargs, _), l in zip(self.column, line)}
else:
yield tuple(line)
# No dictionary to map arguments
else:
print(line)
yield line
def save(self, data: dict):
print(data)
class _ScikitCompat(ABC):
"""
Interface layer for the Scikit and Keras compatibility.
"""
@abstractmethod
def transform(self, X):
raise NotImplementedError()
@abstractmethod
def predict(self, X):
raise NotImplementedError()
class Pipeline(_ScikitCompat):
"""
Base class implementing pipelined operations.
......@@ -208,18 +240,6 @@ class Pipeline(_ScikitCompat):
"""
return self(X=X)
def __call__(self, *texts, **kwargs):
# Parse arguments
inputs = self._args_parser(*texts, **kwargs)
# Encode for forward
with self.device_placement():
inputs = self.tokenizer.batch_encode_plus(
inputs, add_special_tokens=True, return_tensors='tf' if is_tf_available() else 'pt'
)
return self._forward(inputs)
@contextmanager
def device_placement(self):
"""
......@@ -244,6 +264,18 @@ class Pipeline(_ScikitCompat):
yield
def __call__(self, *texts, **kwargs):
# Parse arguments
inputs = self._args_parser(*texts, **kwargs)
# Encode for forward
with self.device_placement():
inputs = self.tokenizer.batch_encode_plus(
inputs, add_special_tokens=True, return_tensors='tf' if is_tf_available() else 'pt'
)
return self._forward(inputs)
def _forward(self, inputs):
"""
Internal framework specific forward dispatching.
......@@ -275,12 +307,6 @@ class TextClassificationPipeline(Pipeline):
"""
Text classification pipeline using ModelForTextClassification head.
"""
def __init__(self, model, tokenizer: PreTrainedTokenizer, nb_classes: int = 2):
super().__init__(model, tokenizer)
if nb_classes < 2:
raise Exception('Invalid parameter nb_classes. int >= 2 is required (got: {})'.format(nb_classes))
self._nb_classes = nb_classes
def __call__(self, *args, **kwargs):
outputs = super().__call__(*args, **kwargs)
......@@ -398,56 +424,6 @@ class QuestionAnsweringPipeline(Pipeline):
Question Answering pipeline using ModelForQuestionAnswering head.
"""
class QuestionAnsweringArgumentHandler(ArgumentHandler):
def __call__(self, *args, **kwargs):
# Position args, handling is sensibly the same as X and data, so forwarding to avoid duplicating
if args is not None and len(args) > 0:
if len(args) == 1:
kwargs['X'] = args[0]
else:
kwargs['X'] = list(args)
# Generic compatibility with sklearn and Keras
# Batched data
if 'X' in kwargs or 'data' in kwargs:
data = kwargs['X'] if 'X' in kwargs else kwargs['data']
if not isinstance(data, list):
data = [data]
for i, item in enumerate(data):
if isinstance(item, dict):
if any(k not in item for k in ['question', 'context']):
raise KeyError('You need to provide a dictionary with keys {question:..., context:...}')
data[i] = QuestionAnsweringPipeline.create_sample(**item)
elif isinstance(item, SquadExample):
continue
else:
raise ValueError(
'{} argument needs to be of type (list[SquadExample | dict], SquadExample, dict)'
.format('X' if 'X' in kwargs else 'data')
)
inputs = data
# Tabular input
elif 'question' in kwargs and 'context' in kwargs:
if isinstance(kwargs['question'], str):
kwargs['question'] = [kwargs['question']]
if isinstance(kwargs['context'], str):
kwargs['context'] = [kwargs['context']]
inputs = [QuestionAnsweringPipeline.create_sample(q, c) for q, c in zip(kwargs['question'], kwargs['context'])]
else:
raise ValueError('Unknown arguments {}'.format(kwargs))
if not isinstance(inputs, list):
inputs = [inputs]
return inputs
@staticmethod
def create_sample(question: Union[str, List[str]], context: Union[str, List[str]]) -> Union[SquadExample, List[SquadExample]]:
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment