Unverified Commit d1ab583d authored by Adam J. Stewart's avatar Adam J. Stewart Committed by GitHub
Browse files

Add support for files with periods in name (#4099)

parent 850491eb
...@@ -63,6 +63,9 @@ class Tester(unittest.TestCase): ...@@ -63,6 +63,9 @@ class Tester(unittest.TestCase):
("foo.gz", (".gz", None, ".gz")), ("foo.gz", (".gz", None, ".gz")),
("foo.zip", (".zip", ".zip", None)), ("foo.zip", (".zip", ".zip", None)),
("foo.xz", (".xz", None, ".xz")), ("foo.xz", (".xz", None, ".xz")),
("foo.bar.tar.gz", (".tar.gz", ".tar", ".gz")),
("foo.bar.gz", (".gz", None, ".gz")),
("foo.bar.zip", (".zip", ".zip", None)),
]: ]:
with self.subTest(file=file): with self.subTest(file=file):
self.assertSequenceEqual(utils._detect_file_type(file), expected) self.assertSequenceEqual(utils._detect_file_type(file), expected)
...@@ -71,14 +74,6 @@ class Tester(unittest.TestCase): ...@@ -71,14 +74,6 @@ class Tester(unittest.TestCase):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
utils._detect_file_type("foo") utils._detect_file_type("foo")
def test_detect_file_type_to_many_exts(self):
with self.assertRaises(RuntimeError):
utils._detect_file_type("foo.bar.tar.gz")
def test_detect_file_type_unknown_archive_type(self):
with self.assertRaises(RuntimeError):
utils._detect_file_type("foo.bar.gz")
def test_detect_file_type_unknown_compression(self): def test_detect_file_type_unknown_compression(self):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
utils._detect_file_type("foo.tar.baz") utils._detect_file_type("foo.tar.baz")
......
...@@ -291,53 +291,47 @@ _FILE_TYPE_ALIASES: Dict[str, Tuple[Optional[str], Optional[str]]] = { ...@@ -291,53 +291,47 @@ _FILE_TYPE_ALIASES: Dict[str, Tuple[Optional[str], Optional[str]]] = {
} }
def _verify_archive_type(archive_type: str) -> None: def _detect_file_type(file: str) -> Tuple[str, Optional[str], Optional[str]]:
if archive_type not in _ARCHIVE_EXTRACTORS.keys(): """Detect the archive type and/or compression of a file.
valid_types = "', '".join(_ARCHIVE_EXTRACTORS.keys())
raise RuntimeError(f"Unknown archive type '{archive_type}'. Known archive types are '{valid_types}'.")
def _verify_compression(compression: str) -> None: Args:
if compression not in _COMPRESSED_FILE_OPENERS.keys(): file (str): the filename
valid_types = "', '".join(_COMPRESSED_FILE_OPENERS.keys())
raise RuntimeError(f"Unknown compression '{compression}'. Known compressions are '{valid_types}'.")
Returns:
(tuple): tuple of suffix, archive type, and compression
def _detect_file_type(file: str) -> Tuple[str, Optional[str], Optional[str]]: Raises:
path = pathlib.Path(file) RuntimeError: if file has no suffix or suffix is not supported
suffix = path.suffix """
suffixes = pathlib.Path(file).suffixes suffixes = pathlib.Path(file).suffixes
if not suffixes: if not suffixes:
raise RuntimeError( raise RuntimeError(
f"File '{file}' has no suffixes that could be used to detect the archive type and compression." f"File '{file}' has no suffixes that could be used to detect the archive type and compression."
) )
elif len(suffixes) > 2: suffix = suffixes[-1]
raise RuntimeError(
"Archive type and compression detection only works for 1 or 2 suffixes. " f"Got {len(suffixes)} instead."
)
elif len(suffixes) == 2:
# if we have exactly two suffixes we assume the first one is the archive type and the second on is the
# compression
archive_type, compression = suffixes
_verify_archive_type(archive_type)
_verify_compression(compression)
return "".join(suffixes), archive_type, compression
# check if the suffix is a known alias # check if the suffix is a known alias
with contextlib.suppress(KeyError): if suffix in _FILE_TYPE_ALIASES:
return (suffix, *_FILE_TYPE_ALIASES[suffix]) return (suffix, *_FILE_TYPE_ALIASES[suffix])
# check if the suffix is an archive type # check if the suffix is an archive type
with contextlib.suppress(RuntimeError): if suffix in _ARCHIVE_EXTRACTORS:
_verify_archive_type(suffix)
return suffix, suffix, None return suffix, suffix, None
# check if the suffix is a compression # check if the suffix is a compression
with contextlib.suppress(RuntimeError): if suffix in _COMPRESSED_FILE_OPENERS:
_verify_compression(suffix) # check for suffix hierarchy
if len(suffixes) > 1:
suffix2 = suffixes[-2]
# check if the suffix2 is an archive type
if suffix2 in _ARCHIVE_EXTRACTORS:
return suffix2 + suffix, suffix2, suffix
return suffix, None, suffix return suffix, None, suffix
raise RuntimeError(f"Suffix '{suffix}' is neither recognized as archive type nor as compression.") valid_suffixes = sorted(set(_FILE_TYPE_ALIASES) | set(_ARCHIVE_EXTRACTORS) | set(_COMPRESSED_FILE_OPENERS))
raise RuntimeError(f"Unknown compression or archive type: '{suffix}'.\nKnown suffixes are: '{valid_suffixes}'.")
def _decompress(from_path: str, to_path: Optional[str] = None, remove_finished: bool = False) -> str: def _decompress(from_path: str, to_path: Optional[str] = None, remove_finished: bool = False) -> str:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment