"src/vscode:/vscode.git/clone" did not exist on "577b8a2783c2ba9cf4ebbb3c36ea024a6175ef95"
Unverified Commit 0fe1c647 authored by Zaida Zhou's avatar Zaida Zhou Committed by GitHub
Browse files

Remove fileio from mmcv and use mmengine.fileio instead (#2179)

parent 0b4285d9
fileio
-------
.. automodule:: mmcv.fileio
:members:
image image
------ ------
.. automodule:: mmcv.image .. automodule:: mmcv.image
......
...@@ -18,7 +18,6 @@ You can switch between Chinese and English documents in the lower-left corner of ...@@ -18,7 +18,6 @@ You can switch between Chinese and English documents in the lower-left corner of
understand_mmcv/config.md understand_mmcv/config.md
understand_mmcv/registry.md understand_mmcv/registry.md
understand_mmcv/runner.md understand_mmcv/runner.md
understand_mmcv/io.md
understand_mmcv/data_process.md understand_mmcv/data_process.md
understand_mmcv/visualization.md understand_mmcv/visualization.md
understand_mmcv/cnn.md understand_mmcv/cnn.md
......
## File IO
This module provides two universal API to load and dump files of different formats.
```{note}
Since v1.3.16, the IO modules support loading (dumping) data from (to) different backends, respectively. More details are in PR [#1330](https://github.com/open-mmlab/mmcv/pull/1330).
```
### Load and dump data
`mmcv` provides a universal api for loading and dumping data, currently
supported formats are json, yaml and pickle.
#### Load from disk or dump to disk
```python
import mmcv
# load data from a file
data = mmcv.load('test.json')
data = mmcv.load('test.yaml')
data = mmcv.load('test.pkl')
# load data from a file-like object
with open('test.json', 'r') as f:
data = mmcv.load(f, file_format='json')
# dump data to a string
json_str = mmcv.dump(data, file_format='json')
# dump data to a file with a filename (infer format from file extension)
mmcv.dump(data, 'out.pkl')
# dump data to a file with a file-like object
with open('test.yaml', 'w') as f:
data = mmcv.dump(data, f, file_format='yaml')
```
#### Load from other backends or dump to other backends
```python
import mmcv
# load data from a file
data = mmcv.load('s3://bucket-name/test.json')
data = mmcv.load('s3://bucket-name/test.yaml')
data = mmcv.load('s3://bucket-name/test.pkl')
# dump data to a file with a filename (infer format from file extension)
mmcv.dump(data, 's3://bucket-name/out.pkl')
```
It is also very convenient to extend the api to support more file formats.
All you need to do is to write a file handler inherited from `BaseFileHandler`
and register it with one or several file formats.
You need to implement at least 3 methods.
```python
import mmcv
# To register multiple file formats, a list can be used as the argument.
# @mmcv.register_handler(['txt', 'log'])
@mmcv.register_handler('txt')
class TxtHandler1(mmcv.BaseFileHandler):
def load_from_fileobj(self, file):
return file.read()
def dump_to_fileobj(self, obj, file):
file.write(str(obj))
def dump_to_str(self, obj, **kwargs):
return str(obj)
```
Here is an example of `PickleHandler`.
```python
import pickle
class PickleHandler(mmcv.BaseFileHandler):
def load_from_fileobj(self, file, **kwargs):
return pickle.load(file, **kwargs)
def load_from_path(self, filepath, **kwargs):
return super(PickleHandler, self).load_from_path(
filepath, mode='rb', **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('protocol', 2)
return pickle.dumps(obj, **kwargs)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('protocol', 2)
pickle.dump(obj, file, **kwargs)
def dump_to_path(self, obj, filepath, **kwargs):
super(PickleHandler, self).dump_to_path(
obj, filepath, mode='wb', **kwargs)
```
### Load a text file as a list or dict
For example `a.txt` is a text file with 5 lines.
```
a
b
c
d
e
```
#### Load from disk
Use `list_from_file` to load the list from a.txt.
```python
>>> mmcv.list_from_file('a.txt')
['a', 'b', 'c', 'd', 'e']
>>> mmcv.list_from_file('a.txt', offset=2)
['c', 'd', 'e']
>>> mmcv.list_from_file('a.txt', max_num=2)
['a', 'b']
>>> mmcv.list_from_file('a.txt', prefix='/mnt/')
['/mnt/a', '/mnt/b', '/mnt/c', '/mnt/d', '/mnt/e']
```
For example `b.txt` is a text file with 3 lines.
```
1 cat
2 dog cow
3 panda
```
Then use `dict_from_file` to load the dict from `b.txt`.
```python
>>> mmcv.dict_from_file('b.txt')
{'1': 'cat', '2': ['dog', 'cow'], '3': 'panda'}
>>> mmcv.dict_from_file('b.txt', key_type=int)
{1: 'cat', 2: ['dog', 'cow'], 3: 'panda'}
```
#### Load from other backends
Use `list_from_file` to load the list from `s3://bucket-name/a.txt`.
```python
>>> mmcv.list_from_file('s3://bucket-name/a.txt')
['a', 'b', 'c', 'd', 'e']
>>> mmcv.list_from_file('s3://bucket-name/a.txt', offset=2)
['c', 'd', 'e']
>>> mmcv.list_from_file('s3://bucket-name/a.txt', max_num=2)
['a', 'b']
>>> mmcv.list_from_file('s3://bucket-name/a.txt', prefix='/mnt/')
['/mnt/a', '/mnt/b', '/mnt/c', '/mnt/d', '/mnt/e']
```
Use `dict_from_file` to load the dict from `s3://bucket-name/b.txt`.
```python
>>> mmcv.dict_from_file('s3://bucket-name/b.txt')
{'1': 'cat', '2': ['dog', 'cow'], '3': 'panda'}
>>> mmcv.dict_from_file('s3://bucket-name/b.txt', key_type=int)
{1: 'cat', 2: ['dog', 'cow'], 3: 'panda'}
```
### Load and dump checkpoints
#### Load checkpoints from disk or save to disk
We can read the checkpoints from disk or save to disk in the following way.
```python
import torch
filepath1 = '/path/of/your/checkpoint1.pth'
filepath2 = '/path/of/your/checkpoint2.pth'
# read from filepath1
checkpoint = torch.load(filepath1)
# save to filepath2
torch.save(checkpoint, filepath2)
```
MMCV provides many backends. `HardDiskBackend` is one of them and we can use it to read or save checkpoints.
```python
import io
from mmcv.fileio.file_client import HardDiskBackend
disk_backend = HardDiskBackend()
with io.BytesIO(disk_backend.get(filepath1)) as buffer:
checkpoint = torch.load(buffer)
with io.BytesIO() as buffer:
torch.save(checkpoint, buffer)
disk_backend.put(buffer.getvalue(), filepath2)
```
If we want to implement an interface which automatically select the corresponding
backend based on the file path, we can use the `FileClient`.
For example, we want to implement two methods for reading checkpoints as well as saving checkpoints,
which need to support different types of file paths, either disk paths, network paths or other paths.
```python
from mmcv.fileio.file_client import FileClient
def load_checkpoint(path):
file_client = FileClient.infer(uri=path)
with io.BytesIO(file_client.get(path)) as buffer:
checkpoint = torch.load(buffer)
return checkpoint
def save_checkpoint(checkpoint, path):
with io.BytesIO() as buffer:
torch.save(checkpoint, buffer)
file_client.put(buffer.getvalue(), path)
file_client = FileClient.infer_client(uri=filepath1)
checkpoint = load_checkpoint(filepath1)
save_checkpoint(checkpoint, filepath2)
```
#### Load checkpoints from the Internet
```{note}
Currently, it only supports reading checkpoints from the Internet, and does not support saving checkpoints to the Internet.
```
```python
import io
import torch
from mmcv.fileio.file_client import HTTPBackend, FileClient
filepath = 'http://path/of/your/checkpoint.pth'
checkpoint = torch.utils.model_zoo.load_url(filepath)
http_backend = HTTPBackend()
with io.BytesIO(http_backend.get(filepath)) as buffer:
checkpoint = torch.load(buffer)
file_client = FileClient.infer_client(uri=filepath)
with io.BytesIO(file_client.get(filepath)) as buffer:
checkpoint = torch.load(buffer)
```
fileio
-------
.. automodule:: mmcv.fileio
:members:
image image
------ ------
.. automodule:: mmcv.image .. automodule:: mmcv.image
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
understand_mmcv/config.md understand_mmcv/config.md
understand_mmcv/registry.md understand_mmcv/registry.md
understand_mmcv/runner.md understand_mmcv/runner.md
understand_mmcv/io.md
understand_mmcv/data_process.md understand_mmcv/data_process.md
understand_mmcv/data_transform.md understand_mmcv/data_transform.md
understand_mmcv/visualization.md understand_mmcv/visualization.md
......
## 文件输入输出
文件输入输出模块提供了两个通用的 API 接口用于读取和保存不同格式的文件。
```{note}
在 v1.3.16 及之后的版本中,IO 模块支持从不同后端读取数据并支持将数据至不同后端。更多细节请访问 PR [#1330](https://github.com/open-mmlab/mmcv/pull/1330)。
```
### 读取和保存数据
`mmcv` 提供了一个通用的 api 用于读取和保存数据,目前支持的格式有 json、yaml 和 pickle。
#### 从硬盘读取数据或者将数据保存至硬盘
```python
import mmcv
# 从文件中读取数据
data = mmcv.load('test.json')
data = mmcv.load('test.yaml')
data = mmcv.load('test.pkl')
# 从文件对象中读取数据
with open('test.json', 'r') as f:
data = mmcv.load(f, file_format='json')
# 将数据序列化为字符串
json_str = mmcv.dump(data, file_format='json')
# 将数据保存至文件 (根据文件名后缀反推文件类型)
mmcv.dump(data, 'out.pkl')
# 将数据保存至文件对象
with open('test.yaml', 'w') as f:
data = mmcv.dump(data, f, file_format='yaml')
```
#### 从其他后端加载或者保存至其他后端
```python
import mmcv
# 从 s3 文件读取数据
data = mmcv.load('s3://bucket-name/test.json')
data = mmcv.load('s3://bucket-name/test.yaml')
data = mmcv.load('s3://bucket-name/test.pkl')
# 将数据保存至 s3 文件 (根据文件名后缀反推文件类型)
mmcv.dump(data, 's3://bucket-name/out.pkl')
```
我们提供了易于拓展的方式以支持更多的文件格式。我们只需要创建一个继承自 `BaseFileHandler`
文件句柄类并将其注册到 `mmcv` 中即可。句柄类至少需要重写三个方法。
```python
import mmcv
# 支持为文件句柄类注册多个文件格式
# @mmcv.register_handler(['txt', 'log'])
@mmcv.register_handler('txt')
class TxtHandler1(mmcv.BaseFileHandler):
def load_from_fileobj(self, file):
return file.read()
def dump_to_fileobj(self, obj, file):
file.write(str(obj))
def dump_to_str(self, obj, **kwargs):
return str(obj)
```
`PickleHandler` 为例
```python
import pickle
class PickleHandler(mmcv.BaseFileHandler):
def load_from_fileobj(self, file, **kwargs):
return pickle.load(file, **kwargs)
def load_from_path(self, filepath, **kwargs):
return super(PickleHandler, self).load_from_path(
filepath, mode='rb', **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('protocol', 2)
return pickle.dumps(obj, **kwargs)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('protocol', 2)
pickle.dump(obj, file, **kwargs)
def dump_to_path(self, obj, filepath, **kwargs):
super(PickleHandler, self).dump_to_path(
obj, filepath, mode='wb', **kwargs)
```
### 读取文件并返回列表或字典
例如, `a.txt` 是文本文件,一共有5行内容。
```
a
b
c
d
e
```
#### 从硬盘读取
使用 `list_from_file` 读取 `a.txt`
```python
>>> mmcv.list_from_file('a.txt')
['a', 'b', 'c', 'd', 'e']
>>> mmcv.list_from_file('a.txt', offset=2)
['c', 'd', 'e']
>>> mmcv.list_from_file('a.txt', max_num=2)
['a', 'b']
>>> mmcv.list_from_file('a.txt', prefix='/mnt/')
['/mnt/a', '/mnt/b', '/mnt/c', '/mnt/d', '/mnt/e']
```
同样, `b.txt` 也是文本文件,一共有3行内容
```
1 cat
2 dog cow
3 panda
```
使用 `dict_from_file` 读取 `b.txt`
```python
>>> mmcv.dict_from_file('b.txt')
{'1': 'cat', '2': ['dog', 'cow'], '3': 'panda'}
>>> mmcv.dict_from_file('b.txt', key_type=int)
{1: 'cat', 2: ['dog', 'cow'], 3: 'panda'}
```
#### 从其他后端读取
使用 `list_from_file` 读取 `s3://bucket-name/a.txt`
```python
>>> mmcv.list_from_file('s3://bucket-name/a.txt')
['a', 'b', 'c', 'd', 'e']
>>> mmcv.list_from_file('s3://bucket-name/a.txt', offset=2)
['c', 'd', 'e']
>>> mmcv.list_from_file('s3://bucket-name/a.txt', max_num=2)
['a', 'b']
>>> mmcv.list_from_file('s3://bucket-name/a.txt', prefix='/mnt/')
['/mnt/a', '/mnt/b', '/mnt/c', '/mnt/d', '/mnt/e']
```
使用 `dict_from_file` 读取 `b.txt`
```python
>>> mmcv.dict_from_file('s3://bucket-name/b.txt')
{'1': 'cat', '2': ['dog', 'cow'], '3': 'panda'}
>>> mmcv.dict_from_file('s3://bucket-name/b.txt', key_type=int)
{1: 'cat', 2: ['dog', 'cow'], 3: 'panda'}
```
### 读取和保存权重文件
#### 从硬盘读取权重文件或者将权重文件保存至硬盘
我们可以通过下面的方式从磁盘读取权重文件或者将权重文件保存至磁盘
```python
import torch
filepath1 = '/path/of/your/checkpoint1.pth'
filepath2 = '/path/of/your/checkpoint2.pth'
# 从 filepath1 读取权重文件
checkpoint = torch.load(filepath1)
# 将权重文件保存至 filepath2
torch.save(checkpoint, filepath2)
```
MMCV 提供了很多后端,`HardDiskBackend` 是其中一个,我们可以通过它来读取或者保存权重文件。
```python
import io
from mmcv.fileio.file_client import HardDiskBackend
disk_backend = HardDiskBackend()
with io.BytesIO(disk_backend.get(filepath1)) as buffer:
checkpoint = torch.load(buffer)
with io.BytesIO() as buffer:
torch.save(checkpoint, f)
disk_backend.put(f.getvalue(), filepath2)
```
如果我们想在接口中实现根据文件路径自动选择对应的后端,我们可以使用 `FileClient`
例如,我们想实现两个方法,分别是读取权重以及保存权重,它们需支持不同类型的文件路径,可以是磁盘路径,也可以是网络路径或者其他路径。
```python
from mmcv.fileio.file_client import FileClient
def load_checkpoint(path):
file_client = FileClient.infer(uri=path)
with io.BytesIO(file_client.get(path)) as buffer:
checkpoint = torch.load(buffer)
return checkpoint
def save_checkpoint(checkpoint, path):
with io.BytesIO() as buffer:
torch.save(checkpoint, buffer)
file_client.put(buffer.getvalue(), path)
file_client = FileClient.infer_client(uri=filepath1)
checkpoint = load_checkpoint(filepath1)
save_checkpoint(checkpoint, filepath2)
```
#### 从网络远端读取权重文件
```{note}
目前只支持从网络远端读取权重文件,暂不支持将权重文件写入网络远端
```
```python
import io
import torch
from mmcv.fileio.file_client import HTTPBackend, FileClient
filepath = 'http://path/of/your/checkpoint.pth'
checkpoint = torch.utils.model_zoo.load_url(filepath)
http_backend = HTTPBackend()
with io.BytesIO(http_backend.get(filepath)) as buffer:
checkpoint = torch.load(buffer)
file_client = FileClient.infer_client(uri=filepath)
with io.BytesIO(file_client.get(filepath)) as buffer:
checkpoint = torch.load(buffer)
```
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
# flake8: noqa # flake8: noqa
from .arraymisc import * from .arraymisc import *
from .fileio import *
from .image import * from .image import *
from .transforms import * from .transforms import *
from .utils import * from .utils import *
......
...@@ -6,6 +6,7 @@ import tempfile ...@@ -6,6 +6,7 @@ import tempfile
import time import time
from typing import Optional from typing import Optional
import mmengine
import torch import torch
import torch.distributed as dist import torch.distributed as dist
import torch.nn as nn import torch.nn as nn
...@@ -135,7 +136,7 @@ def collect_results_cpu(result_part: list, ...@@ -135,7 +136,7 @@ def collect_results_cpu(result_part: list,
mmcv.mkdir_or_exist(tmpdir) mmcv.mkdir_or_exist(tmpdir)
# dump the part result to the dir # dump the part result to the dir
part_file = osp.join(tmpdir, f'part_{rank}.pkl') # type: ignore part_file = osp.join(tmpdir, f'part_{rank}.pkl') # type: ignore
mmcv.dump(result_part, part_file) mmengine.dump(result_part, part_file)
dist.barrier() dist.barrier()
# collect all parts # collect all parts
if rank != 0: if rank != 0:
...@@ -145,7 +146,7 @@ def collect_results_cpu(result_part: list, ...@@ -145,7 +146,7 @@ def collect_results_cpu(result_part: list,
part_list = [] part_list = []
for i in range(world_size): for i in range(world_size):
part_file = osp.join(tmpdir, f'part_{i}.pkl') # type: ignore part_file = osp.join(tmpdir, f'part_{i}.pkl') # type: ignore
part_result = mmcv.load(part_file) part_result = mmengine.load(part_file)
# When data is severely insufficient, an empty part_result # When data is severely insufficient, an empty part_result
# on a certain gpu could makes the overall outputs empty. # on a certain gpu could makes the overall outputs empty.
if part_result: if part_result:
......
# Copyright (c) OpenMMLab. All rights reserved.
from .file_client import BaseStorageBackend, FileClient
from .handlers import BaseFileHandler, JsonHandler, PickleHandler, YamlHandler
from .io import dump, load, register_handler
from .parse import dict_from_file, list_from_file
__all__ = [
'BaseStorageBackend', 'FileClient', 'load', 'dump', 'register_handler',
'BaseFileHandler', 'JsonHandler', 'PickleHandler', 'YamlHandler',
'list_from_file', 'dict_from_file'
]
# Copyright (c) OpenMMLab. All rights reserved.
import inspect
import os
import os.path as osp
import re
import tempfile
import warnings
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Generator, Iterator, Optional, Tuple, Union
from urllib.request import urlopen
import mmcv
from mmcv.utils.misc import has_method
from mmcv.utils.path import is_filepath
class BaseStorageBackend(metaclass=ABCMeta):
"""Abstract class of storage backends.
All backends need to implement two apis: ``get()`` and ``get_text()``.
``get()`` reads the file as a byte stream and ``get_text()`` reads the file
as texts.
"""
# a flag to indicate whether the backend can create a symlink for a file
_allow_symlink = False
@property
def name(self):
return self.__class__.__name__
@property
def allow_symlink(self):
return self._allow_symlink
@abstractmethod
def get(self, filepath):
pass
@abstractmethod
def get_text(self, filepath):
pass
class CephBackend(BaseStorageBackend):
"""Ceph storage backend (for internal use).
Args:
path_mapping (dict|None): path mapping dict from local path to Petrel
path. When ``path_mapping={'src': 'dst'}``, ``src`` in ``filepath``
will be replaced by ``dst``. Default: None.
.. warning::
:class:`mmcv.fileio.file_client.CephBackend` will be deprecated,
please use :class:`mmcv.fileio.file_client.PetrelBackend` instead.
"""
def __init__(self, path_mapping=None):
try:
import ceph
except ImportError:
raise ImportError('Please install ceph to enable CephBackend.')
warnings.warn(
'CephBackend will be deprecated, please use PetrelBackend instead',
DeprecationWarning)
self._client = ceph.S3Client()
assert isinstance(path_mapping, dict) or path_mapping is None
self.path_mapping = path_mapping
def get(self, filepath):
filepath = str(filepath)
if self.path_mapping is not None:
for k, v in self.path_mapping.items():
filepath = filepath.replace(k, v)
value = self._client.Get(filepath)
value_buf = memoryview(value)
return value_buf
def get_text(self, filepath, encoding=None):
raise NotImplementedError
class PetrelBackend(BaseStorageBackend):
"""Petrel storage backend (for internal use).
PetrelBackend supports reading and writing data to multiple clusters.
If the file path contains the cluster name, PetrelBackend will read data
from specified cluster or write data to it. Otherwise, PetrelBackend will
access the default cluster.
Args:
path_mapping (dict, optional): Path mapping dict from local path to
Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in
``filepath`` will be replaced by ``dst``. Default: None.
enable_mc (bool, optional): Whether to enable memcached support.
Default: True.
Examples:
>>> filepath1 = 's3://path/of/file'
>>> filepath2 = 'cluster-name:s3://path/of/file'
>>> client = PetrelBackend()
>>> client.get(filepath1) # get data from default cluster
>>> client.get(filepath2) # get data from 'cluster-name' cluster
"""
def __init__(self,
path_mapping: Optional[dict] = None,
enable_mc: bool = True):
try:
from petrel_client import client
except ImportError:
raise ImportError('Please install petrel_client to enable '
'PetrelBackend.')
self._client = client.Client(enable_mc=enable_mc)
assert isinstance(path_mapping, dict) or path_mapping is None
self.path_mapping = path_mapping
def _map_path(self, filepath: Union[str, Path]) -> str:
"""Map ``filepath`` to a string path whose prefix will be replaced by
:attr:`self.path_mapping`.
Args:
filepath (str): Path to be mapped.
"""
filepath = str(filepath)
if self.path_mapping is not None:
for k, v in self.path_mapping.items():
filepath = filepath.replace(k, v)
return filepath
def _format_path(self, filepath: str) -> str:
"""Convert a ``filepath`` to standard format of petrel oss.
If the ``filepath`` is concatenated by ``os.path.join``, in a Windows
environment, the ``filepath`` will be the format of
's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the
above ``filepath`` will be converted to 's3://bucket_name/image.jpg'.
Args:
filepath (str): Path to be formatted.
"""
return re.sub(r'\\+', '/', filepath)
def get(self, filepath: Union[str, Path]) -> memoryview:
"""Read data from a given ``filepath`` with 'rb' mode.
Args:
filepath (str or Path): Path to read data.
Returns:
memoryview: A memory view of expected bytes object to avoid
copying. The memoryview object can be converted to bytes by
``value_buf.tobytes()``.
"""
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
value = self._client.Get(filepath)
value_buf = memoryview(value)
return value_buf
def get_text(self,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> str:
"""Read data from a given ``filepath`` with 'r' mode.
Args:
filepath (str or Path): Path to read data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
Returns:
str: Expected text reading from ``filepath``.
"""
return str(self.get(filepath), encoding=encoding)
def put(self, obj: bytes, filepath: Union[str, Path]) -> None:
"""Save data to a given ``filepath``.
Args:
obj (bytes): Data to be saved.
filepath (str or Path): Path to write data.
"""
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
self._client.put(filepath, obj)
def put_text(self,
obj: str,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> None:
"""Save data to a given ``filepath``.
Args:
obj (str): Data to be written.
filepath (str or Path): Path to write data.
encoding (str): The encoding format used to encode the ``obj``.
Default: 'utf-8'.
"""
self.put(bytes(obj, encoding=encoding), filepath)
def remove(self, filepath: Union[str, Path]) -> None:
"""Remove a file.
Args:
filepath (str or Path): Path to be removed.
"""
if not has_method(self._client, 'delete'):
raise NotImplementedError(
'Current version of Petrel Python SDK has not supported '
'the `delete` method, please use a higher version or dev'
' branch instead.')
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
self._client.delete(filepath)
def exists(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path exists.
Args:
filepath (str or Path): Path to be checked whether exists.
Returns:
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise.
"""
if not (has_method(self._client, 'contains')
and has_method(self._client, 'isdir')):
raise NotImplementedError(
'Current version of Petrel Python SDK has not supported '
'the `contains` and `isdir` methods, please use a higher'
'version or dev branch instead.')
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
return self._client.contains(filepath) or self._client.isdir(filepath)
def isdir(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a directory.
Args:
filepath (str or Path): Path to be checked whether it is a
directory.
Returns:
bool: Return ``True`` if ``filepath`` points to a directory,
``False`` otherwise.
"""
if not has_method(self._client, 'isdir'):
raise NotImplementedError(
'Current version of Petrel Python SDK has not supported '
'the `isdir` method, please use a higher version or dev'
' branch instead.')
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
return self._client.isdir(filepath)
def isfile(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a file.
Args:
filepath (str or Path): Path to be checked whether it is a file.
Returns:
bool: Return ``True`` if ``filepath`` points to a file, ``False``
otherwise.
"""
if not has_method(self._client, 'contains'):
raise NotImplementedError(
'Current version of Petrel Python SDK has not supported '
'the `contains` method, please use a higher version or '
'dev branch instead.')
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
return self._client.contains(filepath)
def join_path(self, filepath: Union[str, Path],
*filepaths: Union[str, Path]) -> str:
"""Concatenate all file paths.
Args:
filepath (str or Path): Path to be concatenated.
Returns:
str: The result after concatenation.
"""
filepath = self._format_path(self._map_path(filepath))
if filepath.endswith('/'):
filepath = filepath[:-1]
formatted_paths = [filepath]
for path in filepaths:
formatted_paths.append(self._format_path(self._map_path(path)))
return '/'.join(formatted_paths)
@contextmanager
def get_local_path(
self,
filepath: Union[str,
Path]) -> Generator[Union[str, Path], None, None]:
"""Download a file from ``filepath`` and return a temporary path.
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It
can be called with ``with`` statement, and when exists from the
``with`` statement, the temporary path will be released.
Args:
filepath (str | Path): Download a file from ``filepath``.
Examples:
>>> client = PetrelBackend()
>>> # After existing from the ``with`` clause,
>>> # the path will be removed
>>> with client.get_local_path('s3://path/of/your/file') as path:
... # do something here
Yields:
Iterable[str]: Only yield one temporary path.
"""
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
assert self.isfile(filepath)
try:
f = tempfile.NamedTemporaryFile(delete=False)
f.write(self.get(filepath))
f.close()
yield f.name
finally:
os.remove(f.name)
def list_dir_or_file(self,
dir_path: Union[str, Path],
list_dir: bool = True,
list_file: bool = True,
suffix: Optional[Union[str, Tuple[str]]] = None,
recursive: bool = False) -> Iterator[str]:
"""Scan a directory to find the interested directories or files in
arbitrary order.
Note:
Petrel has no concept of directories but it simulates the directory
hierarchy in the filesystem through public prefixes. In addition,
if the returned path ends with '/', it means the path is a public
prefix which is a logical directory.
Note:
:meth:`list_dir_or_file` returns the path relative to ``dir_path``.
In addition, the returned path of directory will not contains the
suffix '/' which is consistent with other backends.
Args:
dir_path (str | Path): Path of the directory.
list_dir (bool): List the directories. Default: True.
list_file (bool): List the path of files. Default: True.
suffix (str or tuple[str], optional): File suffix
that we are interested in. Default: None.
recursive (bool): If set to True, recursively scan the
directory. Default: False.
Yields:
Iterable[str]: A relative path to ``dir_path``.
"""
if not has_method(self._client, 'list'):
raise NotImplementedError(
'Current version of Petrel Python SDK has not supported '
'the `list` method, please use a higher version or dev'
' branch instead.')
dir_path = self._map_path(dir_path)
dir_path = self._format_path(dir_path)
if list_dir and suffix is not None:
raise TypeError(
'`list_dir` should be False when `suffix` is not None')
if (suffix is not None) and not isinstance(suffix, (str, tuple)):
raise TypeError('`suffix` must be a string or tuple of strings')
# Petrel's simulated directory hierarchy assumes that directory paths
# should end with `/`
if not dir_path.endswith('/'):
dir_path += '/'
root = dir_path
def _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive):
for path in self._client.list(dir_path):
# the `self.isdir` is not used here to determine whether path
# is a directory, because `self.isdir` relies on
# `self._client.list`
if path.endswith('/'): # a directory path
next_dir_path = self.join_path(dir_path, path)
if list_dir:
# get the relative path and exclude the last
# character '/'
rel_dir = next_dir_path[len(root):-1]
yield rel_dir
if recursive:
yield from _list_dir_or_file(next_dir_path, list_dir,
list_file, suffix,
recursive)
else: # a file path
absolute_path = self.join_path(dir_path, path)
rel_path = absolute_path[len(root):]
if (suffix is None
or rel_path.endswith(suffix)) and list_file:
yield rel_path
return _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive)
class MemcachedBackend(BaseStorageBackend):
"""Memcached storage backend.
Attributes:
server_list_cfg (str): Config file for memcached server list.
client_cfg (str): Config file for memcached client.
sys_path (str | None): Additional path to be appended to `sys.path`.
Default: None.
"""
def __init__(self, server_list_cfg, client_cfg, sys_path=None):
if sys_path is not None:
import sys
sys.path.append(sys_path)
try:
import mc
except ImportError:
raise ImportError(
'Please install memcached to enable MemcachedBackend.')
self.server_list_cfg = server_list_cfg
self.client_cfg = client_cfg
self._client = mc.MemcachedClient.GetInstance(self.server_list_cfg,
self.client_cfg)
# mc.pyvector servers as a point which points to a memory cache
self._mc_buffer = mc.pyvector()
def get(self, filepath):
filepath = str(filepath)
import mc
self._client.Get(filepath, self._mc_buffer)
value_buf = mc.ConvertBuffer(self._mc_buffer)
return value_buf
def get_text(self, filepath, encoding=None):
raise NotImplementedError
class LmdbBackend(BaseStorageBackend):
"""Lmdb storage backend.
Args:
db_path (str): Lmdb database path.
readonly (bool, optional): Lmdb environment parameter. If True,
disallow any write operations. Default: True.
lock (bool, optional): Lmdb environment parameter. If False, when
concurrent access occurs, do not lock the database. Default: False.
readahead (bool, optional): Lmdb environment parameter. If False,
disable the OS filesystem readahead mechanism, which may improve
random read performance when a database is larger than RAM.
Default: False.
Attributes:
db_path (str): Lmdb database path.
"""
def __init__(self,
db_path,
readonly=True,
lock=False,
readahead=False,
**kwargs):
try:
import lmdb # NOQA
except ImportError:
raise ImportError('Please install lmdb to enable LmdbBackend.')
self.db_path = str(db_path)
self.readonly = readonly
self.lock = lock
self.readahead = readahead
self.kwargs = kwargs
self._client = None
def get(self, filepath):
"""Get values according to the filepath.
Args:
filepath (str | obj:`Path`): Here, filepath is the lmdb key.
"""
if self._client is None:
self._client = self._get_client()
with self._client.begin(write=False) as txn:
value_buf = txn.get(str(filepath).encode('utf-8'))
return value_buf
def get_text(self, filepath, encoding=None):
raise NotImplementedError
def _get_client(self):
import lmdb
return lmdb.open(
self.db_path,
readonly=self.readonly,
lock=self.lock,
readahead=self.readahead,
**self.kwargs)
def __del__(self):
self._client.close()
class HardDiskBackend(BaseStorageBackend):
"""Raw hard disks storage backend."""
_allow_symlink = True
def get(self, filepath: Union[str, Path]) -> bytes:
"""Read data from a given ``filepath`` with 'rb' mode.
Args:
filepath (str or Path): Path to read data.
Returns:
bytes: Expected bytes object.
"""
with open(filepath, 'rb') as f:
value_buf = f.read()
return value_buf
def get_text(self,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> str:
"""Read data from a given ``filepath`` with 'r' mode.
Args:
filepath (str or Path): Path to read data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
Returns:
str: Expected text reading from ``filepath``.
"""
with open(filepath, encoding=encoding) as f:
value_buf = f.read()
return value_buf
def put(self, obj: bytes, filepath: Union[str, Path]) -> None:
"""Write data to a given ``filepath`` with 'wb' mode.
Note:
``put`` will create a directory if the directory of ``filepath``
does not exist.
Args:
obj (bytes): Data to be written.
filepath (str or Path): Path to write data.
"""
mmcv.mkdir_or_exist(osp.dirname(filepath))
with open(filepath, 'wb') as f:
f.write(obj)
def put_text(self,
obj: str,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> None:
"""Write data to a given ``filepath`` with 'w' mode.
Note:
``put_text`` will create a directory if the directory of
``filepath`` does not exist.
Args:
obj (str): Data to be written.
filepath (str or Path): Path to write data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
"""
mmcv.mkdir_or_exist(osp.dirname(filepath))
with open(filepath, 'w', encoding=encoding) as f:
f.write(obj)
def remove(self, filepath: Union[str, Path]) -> None:
"""Remove a file.
Args:
filepath (str or Path): Path to be removed.
"""
os.remove(filepath)
def exists(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path exists.
Args:
filepath (str or Path): Path to be checked whether exists.
Returns:
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise.
"""
return osp.exists(filepath)
def isdir(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a directory.
Args:
filepath (str or Path): Path to be checked whether it is a
directory.
Returns:
bool: Return ``True`` if ``filepath`` points to a directory,
``False`` otherwise.
"""
return osp.isdir(filepath)
def isfile(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a file.
Args:
filepath (str or Path): Path to be checked whether it is a file.
Returns:
bool: Return ``True`` if ``filepath`` points to a file, ``False``
otherwise.
"""
return osp.isfile(filepath)
def join_path(self, filepath: Union[str, Path],
*filepaths: Union[str, Path]) -> str:
"""Concatenate all file paths.
Join one or more filepath components intelligently. The return value
is the concatenation of filepath and any members of *filepaths.
Args:
filepath (str or Path): Path to be concatenated.
Returns:
str: The result of concatenation.
"""
return osp.join(filepath, *filepaths)
@contextmanager
def get_local_path(
self,
filepath: Union[str,
Path]) -> Generator[Union[str, Path], None, None]:
"""Only for unified API and do nothing."""
yield filepath
def list_dir_or_file(self,
dir_path: Union[str, Path],
list_dir: bool = True,
list_file: bool = True,
suffix: Optional[Union[str, Tuple[str]]] = None,
recursive: bool = False) -> Iterator[str]:
"""Scan a directory to find the interested directories or files in
arbitrary order.
Note:
:meth:`list_dir_or_file` returns the path relative to ``dir_path``.
Args:
dir_path (str | Path): Path of the directory.
list_dir (bool): List the directories. Default: True.
list_file (bool): List the path of files. Default: True.
suffix (str or tuple[str], optional): File suffix
that we are interested in. Default: None.
recursive (bool): If set to True, recursively scan the
directory. Default: False.
Yields:
Iterable[str]: A relative path to ``dir_path``.
"""
if list_dir and suffix is not None:
raise TypeError('`suffix` should be None when `list_dir` is True')
if (suffix is not None) and not isinstance(suffix, (str, tuple)):
raise TypeError('`suffix` must be a string or tuple of strings')
root = dir_path
def _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive):
for entry in os.scandir(dir_path):
if not entry.name.startswith('.') and entry.is_file():
rel_path = osp.relpath(entry.path, root)
if (suffix is None
or rel_path.endswith(suffix)) and list_file:
yield rel_path
elif osp.isdir(entry.path):
if list_dir:
rel_dir = osp.relpath(entry.path, root)
yield rel_dir
if recursive:
yield from _list_dir_or_file(entry.path, list_dir,
list_file, suffix,
recursive)
return _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive)
class HTTPBackend(BaseStorageBackend):
"""HTTP and HTTPS storage bachend."""
def get(self, filepath):
value_buf = urlopen(filepath).read()
return value_buf
def get_text(self, filepath, encoding='utf-8'):
value_buf = urlopen(filepath).read()
return value_buf.decode(encoding)
@contextmanager
def get_local_path(
self, filepath: str) -> Generator[Union[str, Path], None, None]:
"""Download a file from ``filepath``.
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It
can be called with ``with`` statement, and when exists from the
``with`` statement, the temporary path will be released.
Args:
filepath (str): Download a file from ``filepath``.
Examples:
>>> client = HTTPBackend()
>>> # After existing from the ``with`` clause,
>>> # the path will be removed
>>> with client.get_local_path('http://path/of/your/file') as path:
... # do something here
"""
try:
f = tempfile.NamedTemporaryFile(delete=False)
f.write(self.get(filepath))
f.close()
yield f.name
finally:
os.remove(f.name)
class FileClient:
"""A general file client to access files in different backends.
The client loads a file or text in a specified backend from its path
and returns it as a binary or text file. There are two ways to choose a
backend, the name of backend and the prefix of path. Although both of them
can be used to choose a storage backend, ``backend`` has a higher priority
that is if they are all set, the storage backend will be chosen by the
backend argument. If they are all `None`, the disk backend will be chosen.
Note that It can also register other backend accessor with a given name,
prefixes, and backend class. In addition, We use the singleton pattern to
avoid repeated object creation. If the arguments are the same, the same
object will be returned.
Args:
backend (str, optional): The storage backend type. Options are "disk",
"ceph", "memcached", "lmdb", "http" and "petrel". Default: None.
prefix (str, optional): The prefix of the registered storage backend.
Options are "s3", "http", "https". Default: None.
Examples:
>>> # only set backend
>>> file_client = FileClient(backend='petrel')
>>> # only set prefix
>>> file_client = FileClient(prefix='s3')
>>> # set both backend and prefix but use backend to choose client
>>> file_client = FileClient(backend='petrel', prefix='s3')
>>> # if the arguments are the same, the same object is returned
>>> file_client1 = FileClient(backend='petrel')
>>> file_client1 is file_client
True
Attributes:
client (:obj:`BaseStorageBackend`): The backend object.
"""
_backends = {
'disk': HardDiskBackend,
'ceph': CephBackend,
'memcached': MemcachedBackend,
'lmdb': LmdbBackend,
'petrel': PetrelBackend,
'http': HTTPBackend,
}
_prefix_to_backends = {
's3': PetrelBackend,
'http': HTTPBackend,
'https': HTTPBackend,
}
_instances: dict = {}
client: Any
def __new__(cls, backend=None, prefix=None, **kwargs):
if backend is None and prefix is None:
backend = 'disk'
if backend is not None and backend not in cls._backends:
raise ValueError(
f'Backend {backend} is not supported. Currently supported ones'
f' are {list(cls._backends.keys())}')
if prefix is not None and prefix not in cls._prefix_to_backends:
raise ValueError(
f'prefix {prefix} is not supported. Currently supported ones '
f'are {list(cls._prefix_to_backends.keys())}')
# concatenate the arguments to a unique key for determining whether
# objects with the same arguments were created
arg_key = f'{backend}:{prefix}'
for key, value in kwargs.items():
arg_key += f':{key}:{value}'
if arg_key in cls._instances:
_instance = cls._instances[arg_key]
else:
# create a new object and put it to _instance
_instance = super().__new__(cls)
if backend is not None:
_instance.client = cls._backends[backend](**kwargs)
else:
_instance.client = cls._prefix_to_backends[prefix](**kwargs)
cls._instances[arg_key] = _instance
return _instance
@property
def name(self):
return self.client.name
@property
def allow_symlink(self):
return self.client.allow_symlink
@staticmethod
def parse_uri_prefix(uri: Union[str, Path]) -> Optional[str]:
"""Parse the prefix of a uri.
Args:
uri (str | Path): Uri to be parsed that contains the file prefix.
Examples:
>>> FileClient.parse_uri_prefix('s3://path/of/your/file')
's3'
Returns:
str | None: Return the prefix of uri if the uri contains '://' else
``None``.
"""
assert is_filepath(uri)
uri = str(uri)
if '://' not in uri:
return None
else:
prefix, _ = uri.split('://')
# In the case of PetrelBackend, the prefix may contains the cluster
# name like clusterName:s3
if ':' in prefix:
_, prefix = prefix.split(':')
return prefix
@classmethod
def infer_client(cls,
file_client_args: Optional[dict] = None,
uri: Optional[Union[str, Path]] = None) -> 'FileClient':
"""Infer a suitable file client based on the URI and arguments.
Args:
file_client_args (dict, optional): Arguments to instantiate a
FileClient. Default: None.
uri (str | Path, optional): Uri to be parsed that contains the file
prefix. Default: None.
Examples:
>>> uri = 's3://path/of/your/file'
>>> file_client = FileClient.infer_client(uri=uri)
>>> file_client_args = {'backend': 'petrel'}
>>> file_client = FileClient.infer_client(file_client_args)
Returns:
FileClient: Instantiated FileClient object.
"""
assert file_client_args is not None or uri is not None
if file_client_args is None:
file_prefix = cls.parse_uri_prefix(uri) # type: ignore
return cls(prefix=file_prefix)
else:
return cls(**file_client_args)
@classmethod
def _register_backend(cls, name, backend, force=False, prefixes=None):
if not isinstance(name, str):
raise TypeError('the backend name should be a string, '
f'but got {type(name)}')
if not inspect.isclass(backend):
raise TypeError(
f'backend should be a class but got {type(backend)}')
if not issubclass(backend, BaseStorageBackend):
raise TypeError(
f'backend {backend} is not a subclass of BaseStorageBackend')
if not force and name in cls._backends:
raise KeyError(
f'{name} is already registered as a storage backend, '
'add "force=True" if you want to override it')
if name in cls._backends and force:
for arg_key, instance in list(cls._instances.items()):
if isinstance(instance.client, cls._backends[name]):
cls._instances.pop(arg_key)
cls._backends[name] = backend
if prefixes is not None:
if isinstance(prefixes, str):
prefixes = [prefixes]
else:
assert isinstance(prefixes, (list, tuple))
for prefix in prefixes:
if prefix not in cls._prefix_to_backends:
cls._prefix_to_backends[prefix] = backend
elif (prefix in cls._prefix_to_backends) and force:
overridden_backend = cls._prefix_to_backends[prefix]
if isinstance(overridden_backend, list):
overridden_backend = tuple(overridden_backend)
for arg_key, instance in list(cls._instances.items()):
if isinstance(instance.client, overridden_backend):
cls._instances.pop(arg_key)
cls._prefix_to_backends[prefix] = backend
else:
raise KeyError(
f'{prefix} is already registered as a storage backend,'
' add "force=True" if you want to override it')
@classmethod
def register_backend(cls, name, backend=None, force=False, prefixes=None):
"""Register a backend to FileClient.
This method can be used as a normal class method or a decorator.
.. code-block:: python
class NewBackend(BaseStorageBackend):
def get(self, filepath):
return filepath
def get_text(self, filepath):
return filepath
FileClient.register_backend('new', NewBackend)
or
.. code-block:: python
@FileClient.register_backend('new')
class NewBackend(BaseStorageBackend):
def get(self, filepath):
return filepath
def get_text(self, filepath):
return filepath
Args:
name (str): The name of the registered backend.
backend (class, optional): The backend class to be registered,
which must be a subclass of :class:`BaseStorageBackend`.
When this method is used as a decorator, backend is None.
Defaults to None.
force (bool, optional): Whether to override the backend if the name
has already been registered. Defaults to False.
prefixes (str or list[str] or tuple[str], optional): The prefixes
of the registered storage backend. Default: None.
`New in version 1.3.15.`
"""
if backend is not None:
cls._register_backend(
name, backend, force=force, prefixes=prefixes)
return
def _register(backend_cls):
cls._register_backend(
name, backend_cls, force=force, prefixes=prefixes)
return backend_cls
return _register
def get(self, filepath: Union[str, Path]) -> Union[bytes, memoryview]:
"""Read data from a given ``filepath`` with 'rb' mode.
Note:
There are two types of return values for ``get``, one is ``bytes``
and the other is ``memoryview``. The advantage of using memoryview
is that you can avoid copying, and if you want to convert it to
``bytes``, you can use ``.tobytes()``.
Args:
filepath (str or Path): Path to read data.
Returns:
bytes | memoryview: Expected bytes object or a memory view of the
bytes object.
"""
return self.client.get(filepath)
def get_text(self, filepath: Union[str, Path], encoding='utf-8') -> str:
"""Read data from a given ``filepath`` with 'r' mode.
Args:
filepath (str or Path): Path to read data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
Returns:
str: Expected text reading from ``filepath``.
"""
return self.client.get_text(filepath, encoding)
def put(self, obj: bytes, filepath: Union[str, Path]) -> None:
"""Write data to a given ``filepath`` with 'wb' mode.
Note:
``put`` should create a directory if the directory of ``filepath``
does not exist.
Args:
obj (bytes): Data to be written.
filepath (str or Path): Path to write data.
"""
self.client.put(obj, filepath)
def put_text(self, obj: str, filepath: Union[str, Path]) -> None:
"""Write data to a given ``filepath`` with 'w' mode.
Note:
``put_text`` should create a directory if the directory of
``filepath`` does not exist.
Args:
obj (str): Data to be written.
filepath (str or Path): Path to write data.
encoding (str, optional): The encoding format used to open the
`filepath`. Default: 'utf-8'.
"""
self.client.put_text(obj, filepath)
def remove(self, filepath: Union[str, Path]) -> None:
"""Remove a file.
Args:
filepath (str, Path): Path to be removed.
"""
self.client.remove(filepath)
def exists(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path exists.
Args:
filepath (str or Path): Path to be checked whether exists.
Returns:
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise.
"""
return self.client.exists(filepath)
def isdir(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a directory.
Args:
filepath (str or Path): Path to be checked whether it is a
directory.
Returns:
bool: Return ``True`` if ``filepath`` points to a directory,
``False`` otherwise.
"""
return self.client.isdir(filepath)
def isfile(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a file.
Args:
filepath (str or Path): Path to be checked whether it is a file.
Returns:
bool: Return ``True`` if ``filepath`` points to a file, ``False``
otherwise.
"""
return self.client.isfile(filepath)
def join_path(self, filepath: Union[str, Path],
*filepaths: Union[str, Path]) -> str:
"""Concatenate all file paths.
Join one or more filepath components intelligently. The return value
is the concatenation of filepath and any members of *filepaths.
Args:
filepath (str or Path): Path to be concatenated.
Returns:
str: The result of concatenation.
"""
return self.client.join_path(filepath, *filepaths)
@contextmanager
def get_local_path(
self,
filepath: Union[str,
Path]) -> Generator[Union[str, Path], None, None]:
"""Download data from ``filepath`` and write the data to local path.
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It
can be called with ``with`` statement, and when exists from the
``with`` statement, the temporary path will be released.
Note:
If the ``filepath`` is a local path, just return itself.
.. warning::
``get_local_path`` is an experimental interface that may change in
the future.
Args:
filepath (str or Path): Path to be read data.
Examples:
>>> file_client = FileClient(prefix='s3')
>>> with file_client.get_local_path('s3://bucket/abc.jpg') as path:
... # do something here
Yields:
Iterable[str]: Only yield one path.
"""
with self.client.get_local_path(str(filepath)) as local_path:
yield local_path
def list_dir_or_file(self,
dir_path: Union[str, Path],
list_dir: bool = True,
list_file: bool = True,
suffix: Optional[Union[str, Tuple[str]]] = None,
recursive: bool = False) -> Iterator[str]:
"""Scan a directory to find the interested directories or files in
arbitrary order.
Note:
:meth:`list_dir_or_file` returns the path relative to ``dir_path``.
Args:
dir_path (str | Path): Path of the directory.
list_dir (bool): List the directories. Default: True.
list_file (bool): List the path of files. Default: True.
suffix (str or tuple[str], optional): File suffix
that we are interested in. Default: None.
recursive (bool): If set to True, recursively scan the
directory. Default: False.
Yields:
Iterable[str]: A relative path to ``dir_path``.
"""
yield from self.client.list_dir_or_file(dir_path, list_dir, list_file,
suffix, recursive)
# Copyright (c) OpenMMLab. All rights reserved.
from .base import BaseFileHandler
from .json_handler import JsonHandler
from .pickle_handler import PickleHandler
from .yaml_handler import YamlHandler
__all__ = ['BaseFileHandler', 'JsonHandler', 'PickleHandler', 'YamlHandler']
# Copyright (c) OpenMMLab. All rights reserved.
from abc import ABCMeta, abstractmethod
class BaseFileHandler(metaclass=ABCMeta):
# `str_like` is a flag to indicate whether the type of file object is
# str-like object or bytes-like object. Pickle only processes bytes-like
# objects but json only processes str-like object. If it is str-like
# object, `StringIO` will be used to process the buffer.
str_like = True
@abstractmethod
def load_from_fileobj(self, file, **kwargs):
pass
@abstractmethod
def dump_to_fileobj(self, obj, file, **kwargs):
pass
@abstractmethod
def dump_to_str(self, obj, **kwargs):
pass
def load_from_path(self, filepath: str, mode: str = 'r', **kwargs):
with open(filepath, mode) as f:
return self.load_from_fileobj(f, **kwargs)
def dump_to_path(self, obj, filepath: str, mode: str = 'w', **kwargs):
with open(filepath, mode) as f:
self.dump_to_fileobj(obj, f, **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
import json
import numpy as np
from .base import BaseFileHandler
def set_default(obj):
"""Set default json values for non-serializable values.
It helps convert ``set``, ``range`` and ``np.ndarray`` data types to list.
It also converts ``np.generic`` (including ``np.int32``, ``np.float32``,
etc.) into plain numbers of plain python built-in types.
"""
if isinstance(obj, (set, range)):
return list(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.generic):
return obj.item()
raise TypeError(f'{type(obj)} is unsupported for json dump')
class JsonHandler(BaseFileHandler):
def load_from_fileobj(self, file):
return json.load(file)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('default', set_default)
json.dump(obj, file, **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('default', set_default)
return json.dumps(obj, **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
import pickle
from .base import BaseFileHandler
class PickleHandler(BaseFileHandler):
str_like = False
def load_from_fileobj(self, file, **kwargs):
return pickle.load(file, **kwargs)
def load_from_path(self, filepath, **kwargs):
return super().load_from_path(filepath, mode='rb', **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('protocol', 2)
return pickle.dumps(obj, **kwargs)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('protocol', 2)
pickle.dump(obj, file, **kwargs)
def dump_to_path(self, obj, filepath, **kwargs):
super().dump_to_path(obj, filepath, mode='wb', **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
import yaml
try:
from yaml import CDumper as Dumper
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader, Dumper # type: ignore
from .base import BaseFileHandler # isort:skip
class YamlHandler(BaseFileHandler):
def load_from_fileobj(self, file, **kwargs):
kwargs.setdefault('Loader', Loader)
return yaml.load(file, **kwargs)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('Dumper', Dumper)
yaml.dump(obj, file, **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('Dumper', Dumper)
return yaml.dump(obj, **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
from io import BytesIO, StringIO
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, TextIO, Union
from ..utils import is_list_of
from .file_client import FileClient
from .handlers import BaseFileHandler, JsonHandler, PickleHandler, YamlHandler
FileLikeObject = Union[TextIO, StringIO, BytesIO]
file_handlers = {
'json': JsonHandler(),
'yaml': YamlHandler(),
'yml': YamlHandler(),
'pickle': PickleHandler(),
'pkl': PickleHandler()
}
def load(file: Union[str, Path, FileLikeObject],
file_format: Optional[str] = None,
file_client_args: Optional[Dict] = None,
**kwargs):
"""Load data from json/yaml/pickle files.
This method provides a unified api for loading data from serialized files.
Note:
In v1.3.16 and later, ``load`` supports loading data from serialized
files those can be storaged in different backends.
Args:
file (str or :obj:`Path` or file-like object): Filename or a file-like
object.
file_format (str, optional): If not specified, the file format will be
inferred from the file extension, otherwise use the specified one.
Currently supported formats include "json", "yaml/yml" and
"pickle/pkl".
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> load('/path/of/your/file') # file is storaged in disk
>>> load('https://path/of/your/file') # file is storaged in Internet
>>> load('s3://path/of/your/file') # file is storaged in petrel
Returns:
The content from the file.
"""
if isinstance(file, Path):
file = str(file)
if file_format is None and isinstance(file, str):
file_format = file.split('.')[-1]
if file_format not in file_handlers:
raise TypeError(f'Unsupported format: {file_format}')
handler = file_handlers[file_format]
f: FileLikeObject
if isinstance(file, str):
file_client = FileClient.infer_client(file_client_args, file)
if handler.str_like:
with StringIO(file_client.get_text(file)) as f:
obj = handler.load_from_fileobj(f, **kwargs)
else:
with BytesIO(file_client.get(file)) as f:
obj = handler.load_from_fileobj(f, **kwargs)
elif hasattr(file, 'read'):
obj = handler.load_from_fileobj(file, **kwargs)
else:
raise TypeError('"file" must be a filepath str or a file-object')
return obj
def dump(obj: Any,
file: Optional[Union[str, Path, FileLikeObject]] = None,
file_format: Optional[str] = None,
file_client_args: Optional[Dict] = None,
**kwargs):
"""Dump data to json/yaml/pickle strings or files.
This method provides a unified api for dumping data as strings or to files,
and also supports custom arguments for each file format.
Note:
In v1.3.16 and later, ``dump`` supports dumping data as strings or to
files which is saved to different backends.
Args:
obj (any): The python object to be dumped.
file (str or :obj:`Path` or file-like object, optional): If not
specified, then the object is dumped to a str, otherwise to a file
specified by the filename or file-like object.
file_format (str, optional): Same as :func:`load`.
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> dump('hello world', '/path/of/your/file') # disk
>>> dump('hello world', 's3://path/of/your/file') # ceph or petrel
Returns:
bool: True for success, False otherwise.
"""
if isinstance(file, Path):
file = str(file)
if file_format is None:
if isinstance(file, str):
file_format = file.split('.')[-1]
elif file is None:
raise ValueError(
'file_format must be specified since file is None')
if file_format not in file_handlers:
raise TypeError(f'Unsupported format: {file_format}')
f: FileLikeObject
handler = file_handlers[file_format]
if file is None:
return handler.dump_to_str(obj, **kwargs)
elif isinstance(file, str):
file_client = FileClient.infer_client(file_client_args, file)
if handler.str_like:
with StringIO() as f:
handler.dump_to_fileobj(obj, f, **kwargs)
file_client.put_text(f.getvalue(), file)
else:
with BytesIO() as f:
handler.dump_to_fileobj(obj, f, **kwargs)
file_client.put(f.getvalue(), file)
elif hasattr(file, 'write'):
handler.dump_to_fileobj(obj, file, **kwargs)
else:
raise TypeError('"file" must be a filename str or a file-object')
def _register_handler(handler: BaseFileHandler,
file_formats: Union[str, List[str]]) -> None:
"""Register a handler for some file extensions.
Args:
handler (:obj:`BaseFileHandler`): Handler to be registered.
file_formats (str or list[str]): File formats to be handled by this
handler.
"""
if not isinstance(handler, BaseFileHandler):
raise TypeError(
f'handler must be a child of BaseFileHandler, not {type(handler)}')
if isinstance(file_formats, str):
file_formats = [file_formats]
if not is_list_of(file_formats, str):
raise TypeError('file_formats must be a str or a list of str')
for ext in file_formats:
file_handlers[ext] = handler
def register_handler(file_formats: Union[str, list], **kwargs) -> Callable:
def wrap(cls):
_register_handler(cls(**kwargs), file_formats)
return cls
return wrap
# Copyright (c) OpenMMLab. All rights reserved.
from io import StringIO
from pathlib import Path
from typing import Dict, List, Optional, Union
from .file_client import FileClient
def list_from_file(filename: Union[str, Path],
prefix: str = '',
offset: int = 0,
max_num: int = 0,
encoding: str = 'utf-8',
file_client_args: Optional[Dict] = None) -> List:
"""Load a text file and parse the content as a list of strings.
Note:
In v1.3.16 and later, ``list_from_file`` supports loading a text file
which can be storaged in different backends and parsing the content as
a list for strings.
Args:
filename (str): Filename.
prefix (str): The prefix to be inserted to the beginning of each item.
offset (int): The offset of lines.
max_num (int): The maximum number of lines to be read,
zeros and negatives mean no limitation.
encoding (str): Encoding used to open the file. Default utf-8.
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> list_from_file('/path/of/your/file') # disk
['hello', 'world']
>>> list_from_file('s3://path/of/your/file') # ceph or petrel
['hello', 'world']
Returns:
list[str]: A list of strings.
"""
cnt = 0
item_list = []
file_client = FileClient.infer_client(file_client_args, filename)
with StringIO(file_client.get_text(filename, encoding)) as f:
for _ in range(offset):
f.readline()
for line in f:
if 0 < max_num <= cnt:
break
item_list.append(prefix + line.rstrip('\n\r'))
cnt += 1
return item_list
def dict_from_file(filename: Union[str, Path],
key_type: type = str,
encoding: str = 'utf-8',
file_client_args: Optional[Dict] = None) -> Dict:
"""Load a text file and parse the content as a dict.
Each line of the text file will be two or more columns split by
whitespaces or tabs. The first column will be parsed as dict keys, and
the following columns will be parsed as dict values.
Note:
In v1.3.16 and later, ``dict_from_file`` supports loading a text file
which can be storaged in different backends and parsing the content as
a dict.
Args:
filename(str): Filename.
key_type(type): Type of the dict keys. str is user by default and
type conversion will be performed if specified.
encoding (str): Encoding used to open the file. Default utf-8.
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> dict_from_file('/path/of/your/file') # disk
{'key1': 'value1', 'key2': 'value2'}
>>> dict_from_file('s3://path/of/your/file') # ceph or petrel
{'key1': 'value1', 'key2': 'value2'}
Returns:
dict: The parsed contents.
"""
mapping = {}
file_client = FileClient.infer_client(file_client_args, filename)
with StringIO(file_client.get_text(filename, encoding)) as f:
for line in f:
items = line.rstrip('\n').split()
assert len(items) >= 2
key = key_type(items[0])
val = items[1:] if len(items) > 2 else items[1]
mapping[key] = val
return mapping
...@@ -8,8 +8,8 @@ import cv2 ...@@ -8,8 +8,8 @@ import cv2
import numpy as np import numpy as np
from cv2 import (IMREAD_COLOR, IMREAD_GRAYSCALE, IMREAD_IGNORE_ORIENTATION, from cv2 import (IMREAD_COLOR, IMREAD_GRAYSCALE, IMREAD_IGNORE_ORIENTATION,
IMREAD_UNCHANGED) IMREAD_UNCHANGED)
from mmengine.fileio import FileClient
from mmcv.fileio import FileClient
from mmcv.utils import is_filepath, is_str from mmcv.utils import is_filepath, is_str
try: try:
...@@ -167,7 +167,7 @@ def imread(img_or_path, ...@@ -167,7 +167,7 @@ def imread(img_or_path,
If backend is None, the global imread_backend specified by If backend is None, the global imread_backend specified by
``mmcv.use_backend()`` will be used. Default: None. ``mmcv.use_backend()`` will be used. Default: None.
file_client_args (dict | None): Arguments to instantiate a file_client_args (dict | None): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details. FileClient. See :class:`mmengine.fileio.FileClient` for details.
Default: None. Default: None.
Returns: Returns:
...@@ -283,7 +283,7 @@ def imwrite(img, ...@@ -283,7 +283,7 @@ def imwrite(img,
auto_mkdir (bool): If the parent folder of `file_path` does not exist, auto_mkdir (bool): If the parent folder of `file_path` does not exist,
whether to create it automatically. It will be deprecated. whether to create it automatically. It will be deprecated.
file_client_args (dict | None): Arguments to instantiate a file_client_args (dict | None): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details. FileClient. See :class:`mmengine.fileio.FileClient` for details.
Default: None. Default: None.
Returns: Returns:
......
...@@ -12,14 +12,15 @@ from importlib import import_module ...@@ -12,14 +12,15 @@ from importlib import import_module
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Callable, Dict, List, Optional, Tuple, Union from typing import Callable, Dict, List, Optional, Tuple, Union
import mmengine
import torch import torch
import torch.nn as nn import torch.nn as nn
import torchvision import torchvision
from mmengine.fileio import FileClient
from mmengine.fileio import load as load_file
from torch.optim import Optimizer from torch.optim import Optimizer
import mmcv import mmcv
from ..fileio import FileClient
from ..fileio import load as load_file
from ..parallel import is_module_wrapper from ..parallel import is_module_wrapper
from ..utils import digit_version, load_url, mkdir_or_exist from ..utils import digit_version, load_url, mkdir_or_exist
from .dist_utils import get_dist_info from .dist_utils import get_dist_info
...@@ -136,7 +137,7 @@ def get_torchvision_models(): ...@@ -136,7 +137,7 @@ def get_torchvision_models():
# 'resnet50' or 'ResNet50_Weights.IMAGENET1K_V1' in the config. # 'resnet50' or 'ResNet50_Weights.IMAGENET1K_V1' in the config.
json_path = osp.join(mmcv.__path__[0], json_path = osp.join(mmcv.__path__[0],
'model_zoo/torchvision_0.12.json') 'model_zoo/torchvision_0.12.json')
model_urls = mmcv.load(json_path) model_urls = mmengine.load(json_path)
for cls_name, cls in torchvision.models.__dict__.items(): for cls_name, cls in torchvision.models.__dict__.items():
# The name of torchvision model weights classes ends with # The name of torchvision model weights classes ends with
# `_Weights` such as `ResNet18_Weights`. However, some model weight # `_Weights` such as `ResNet18_Weights`. However, some model weight
...@@ -409,8 +410,8 @@ def load_from_ceph(filename: str, ...@@ -409,8 +410,8 @@ def load_from_ceph(filename: str,
'petrel'. Default: 'petrel'. 'petrel'. Default: 'petrel'.
.. warning:: .. warning::
:class:`mmcv.fileio.file_client.CephBackend` will be deprecated, :class:`mmengine.fileio.file_client.CephBackend` will be deprecated,
please use :class:`mmcv.fileio.file_client.PetrelBackend` instead. please use :class:`mmengine.fileio.file_client.PetrelBackend` instead.
Returns: Returns:
dict or OrderedDict: The loaded checkpoint. dict or OrderedDict: The loaded checkpoint.
...@@ -751,7 +752,7 @@ def save_checkpoint(model: torch.nn.Module, ...@@ -751,7 +752,7 @@ def save_checkpoint(model: torch.nn.Module,
optimizer (:obj:`Optimizer`, optional): Optimizer to be saved. optimizer (:obj:`Optimizer`, optional): Optimizer to be saved.
meta (dict, optional): Metadata to be saved in checkpoint. meta (dict, optional): Metadata to be saved in checkpoint.
file_client_args (dict, optional): Arguments to instantiate a file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details. FileClient. See :class:`mmengine.fileio.FileClient` for details.
Default: None. Default: None.
`New in version 1.3.16.` `New in version 1.3.16.`
""" """
......
...@@ -3,7 +3,8 @@ import os.path as osp ...@@ -3,7 +3,8 @@ import os.path as osp
import warnings import warnings
from typing import Optional from typing import Optional
from mmcv.fileio import FileClient from mmengine.fileio import FileClient
from ..dist_utils import allreduce_params, master_only from ..dist_utils import allreduce_params, master_only
from .hook import HOOKS, Hook from .hook import HOOKS, Hook
...@@ -35,7 +36,7 @@ class CheckpointHook(Hook): ...@@ -35,7 +36,7 @@ class CheckpointHook(Hook):
sync_buffer (bool, optional): Whether to synchronize buffers in sync_buffer (bool, optional): Whether to synchronize buffers in
different gpus. Default: False. different gpus. Default: False.
file_client_args (dict, optional): Arguments to instantiate a file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details. FileClient. See :class:`mmengine.fileio.FileClient` for details.
Default: None. Default: None.
`New in version 1.3.16.` `New in version 1.3.16.`
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment