import logging import pandas as pd import pyarrow import pyarrow.csv from .registry import register_array_parser @register_array_parser("csv") class CSVArrayParser(object): def __init__(self, delimiter=","): self.delimiter = delimiter def read(self, path): logging.info( "Reading from %s using CSV format with configuration %s" % (path, self.__dict__) ) # do not read the first line as header read_options = pyarrow.csv.ReadOptions(autogenerate_column_names=True) parse_options = pyarrow.csv.ParseOptions(delimiter=self.delimiter) arr = pyarrow.csv.read_csv( path, read_options=read_options, parse_options=parse_options ) logging.info("Done reading from %s" % path) return arr.to_pandas().to_numpy() def write(self, path, arr): logging.info( "Writing to %s using CSV format with configuration %s" % (path, self.__dict__) ) write_options = pyarrow.csv.WriteOptions( include_header=False, delimiter=self.delimiter ) arr = pyarrow.Table.from_pandas(pd.DataFrame(arr)) pyarrow.csv.write_csv(arr, path, write_options=write_options) logging.info("Done writing to %s" % path)