"vscode:/vscode.git/clone" did not exist on "ba84b8728a8d0a766a636b30661836c30b17fbe6"
Commit 6f3c5f1c authored by limm's avatar limm
Browse files

support v1.4.0

parent 6f674c7e
# Copyright (c) OpenMMLab. All rights reserved.
import os
from typing import Dict, Optional
import mmengine
import torch # noqa
import torch.nn as nn
from mmengine.hooks import Hook
from mmengine.logging import print_log
from mmengine.registry import HOOKS
from .operator import BaseConvRFSearchOp, Conv2dRFSearchOp # noqa
from .utils import get_single_padding, write_to_json
@HOOKS.register_module()
class RFSearchHook(Hook):
"""Rcecptive field search via dilation rates.
Please refer to `RF-Next: Efficient Receptive Field
Search for Convolutional Neural Networks
<https://arxiv.org/abs/2206.06637>`_ for more details.
Args:
mode (str, optional): It can be set to the following types:
'search', 'fixed_single_branch', or 'fixed_multi_branch'.
Defaults to 'search'.
config (Dict, optional): config dict of search.
By default this config contains "search",
and config["search"] must include:
- "step": recording the current searching step.
- "max_step": The maximum number of searching steps
to update the structures.
- "search_interval": The interval (epoch/iteration)
between two updates.
- "exp_rate": The controller of the sparsity of search space.
- "init_alphas": The value for initializing weights of each branch.
- "mmin": The minimum dilation rate.
- "mmax": The maximum dilation rate.
- "num_branches": The controller of the size of
search space (the number of branches).
- "skip_layer": The modules in skip_layer will be ignored
during the receptive field search.
rfstructure_file (str, optional): Path to load searched receptive
fields of the model. Defaults to None.
by_epoch (bool, optional): Determine to perform step by epoch or
by iteration. If set to True, it will step by epoch. Otherwise, by
iteration. Defaults to True.
verbose (bool): Determines whether to print rf-next related logging
messages. Defaults to True.
"""
def __init__(self,
mode: str = 'search',
config: Dict = {},
rfstructure_file: Optional[str] = None,
by_epoch: bool = True,
verbose: bool = True):
assert mode in ['search', 'fixed_single_branch', 'fixed_multi_branch']
assert config is not None
self.config = config
self.config['structure'] = {}
self.verbose = verbose
if rfstructure_file is not None:
rfstructure = mmengine.load(rfstructure_file)['structure']
self.config['structure'] = rfstructure
self.mode = mode
self.num_branches = self.config['search']['num_branches']
self.by_epoch = by_epoch
def init_model(self, model: nn.Module):
"""init model with search ability.
Args:
model (nn.Module): pytorch model
Raises:
NotImplementedError: only support three modes:
search/fixed_single_branch/fixed_multi_branch
"""
if self.verbose:
print_log('RFSearch init begin.', 'current')
if self.mode == 'search':
if self.config['structure']:
self.set_model(model, search_op='Conv2d')
self.wrap_model(model, search_op='Conv2d')
elif self.mode == 'fixed_single_branch':
self.set_model(model, search_op='Conv2d')
elif self.mode == 'fixed_multi_branch':
self.set_model(model, search_op='Conv2d')
self.wrap_model(model, search_op='Conv2d')
else:
raise NotImplementedError
if self.verbose:
print_log('RFSearch init end.', 'current')
def after_train_epoch(self, runner):
"""Performs a dilation searching step after one training epoch."""
if self.by_epoch and self.mode == 'search':
self.step(runner.model, runner.work_dir)
def after_train_iter(self, runner, batch_idx, data_batch, outputs):
"""Performs a dilation searching step after one training iteration."""
if not self.by_epoch and self.mode == 'search':
self.step(runner.model, runner.work_dir)
def step(self, model: nn.Module, work_dir: str) -> None:
"""Performs a dilation searching step.
Args:
model (nn.Module): pytorch model
work_dir (str): Directory to save the searching results.
"""
self.config['search']['step'] += 1
if (self.config['search']['step']
) % self.config['search']['search_interval'] == 0 and (self.config[
'search']['step']) < self.config['search']['max_step']:
self.estimate_and_expand(model)
for name, module in model.named_modules():
if isinstance(module, BaseConvRFSearchOp):
self.config['structure'][name] = module.op_layer.dilation
write_to_json(
self.config,
os.path.join(
work_dir,
'local_search_config_step%d.json' %
self.config['search']['step'],
),
)
def estimate_and_expand(self, model: nn.Module) -> None:
"""estimate and search for RFConvOp.
Args:
model (nn.Module): pytorch model
"""
for module in model.modules():
if isinstance(module, BaseConvRFSearchOp):
module.estimate_rates()
module.expand_rates()
def wrap_model(self,
model: nn.Module,
search_op: str = 'Conv2d',
prefix: str = '') -> None:
"""wrap model to support searchable conv op.
Args:
model (nn.Module): pytorch model
search_op (str): The module that uses RF search.
Defaults to 'Conv2d'.
init_rates (int, optional): Set to other initial dilation rates.
Defaults to None.
prefix (str): Prefix for function recursion. Defaults to ''.
"""
op = 'torch.nn.' + search_op
for name, module in model.named_children():
if prefix == '':
fullname = 'module.' + name
else:
fullname = prefix + '.' + name
if self.config['search']['skip_layer'] is not None:
if any(layer in fullname
for layer in self.config['search']['skip_layer']):
continue
if isinstance(module, eval(op)):
if 1 < module.kernel_size[0] and \
0 != module.kernel_size[0] % 2 or \
1 < module.kernel_size[1] and \
0 != module.kernel_size[1] % 2:
moduleWrap = eval(search_op + 'RFSearchOp')(
module, self.config['search'], self.verbose)
moduleWrap = moduleWrap.to(module.weight.device)
if self.verbose:
print_log(
'Wrap model %s to %s.' %
(str(module), str(moduleWrap)), 'current')
setattr(model, name, moduleWrap)
elif not isinstance(module, BaseConvRFSearchOp):
self.wrap_model(module, search_op, fullname)
def set_model(self,
model: nn.Module,
search_op: str = 'Conv2d',
init_rates: Optional[int] = None,
prefix: str = '') -> None:
"""set model based on config.
Args:
model (nn.Module): pytorch model
config (Dict): config file
search_op (str): The module that uses RF search.
Defaults to 'Conv2d'.
init_rates (int, optional): Set to other initial dilation rates.
Defaults to None.
prefix (str): Prefix for function recursion. Defaults to ''.
"""
op = 'torch.nn.' + search_op
for name, module in model.named_children():
if prefix == '':
fullname = 'module.' + name
else:
fullname = prefix + '.' + name
if self.config['search']['skip_layer'] is not None:
if any(layer in fullname
for layer in self.config['search']['skip_layer']):
continue
if isinstance(module, eval(op)):
if 1 < module.kernel_size[0] and \
0 != module.kernel_size[0] % 2 or \
1 < module.kernel_size[1] and \
0 != module.kernel_size[1] % 2:
if isinstance(self.config['structure'][fullname], int):
self.config['structure'][fullname] = [
self.config['structure'][fullname],
self.config['structure'][fullname]
]
module.dilation = (
self.config['structure'][fullname][0],
self.config['structure'][fullname][1],
)
module.padding = (
get_single_padding(
module.kernel_size[0], module.stride[0],
self.config['structure'][fullname][0]),
get_single_padding(
module.kernel_size[1], module.stride[1],
self.config['structure'][fullname][1]))
setattr(model, name, module)
if self.verbose:
print_log(
'Set module %s dilation as: [%d %d]' %
(fullname, module.dilation[0], module.dilation[1]),
'current')
elif not isinstance(module, BaseConvRFSearchOp):
self.set_model(module, search_op, init_rates, fullname)
# Copyright (c) OpenMMLab. All rights reserved.
import mmengine
import numpy as np
def write_to_json(config: dict, filename: str):
"""save config to json file.
Args:
config (dict): Config to be saved.
filename (str): Path to save config.
"""
with open(filename, 'w', encoding='utf-8') as f:
mmengine.dump(config, f, file_format='json')
def expand_rates(dilation: tuple, config: dict) -> list:
"""expand dilation rate according to config.
Args:
dilation (int): _description_
config (dict): config dict
Returns:
list: list of expanded dilation rates
"""
exp_rate = config['exp_rate']
large_rates = []
small_rates = []
for _ in range(config['num_branches'] // 2):
large_rates.append(
tuple([
np.clip(
int(round((1 + exp_rate) * dilation[0])), config['mmin'],
config['mmax']).item(),
np.clip(
int(round((1 + exp_rate) * dilation[1])), config['mmin'],
config['mmax']).item()
]))
small_rates.append(
tuple([
np.clip(
int(round((1 - exp_rate) * dilation[0])), config['mmin'],
config['mmax']).item(),
np.clip(
int(round((1 - exp_rate) * dilation[1])), config['mmin'],
config['mmax']).item()
]))
small_rates.reverse()
if config['num_branches'] % 2 == 0:
rate_list = small_rates + large_rates
else:
rate_list = small_rates + [dilation] + large_rates
unique_rate_list = list(set(rate_list))
unique_rate_list.sort(key=rate_list.index)
return unique_rate_list
def get_single_padding(kernel_size: int,
stride: int = 1,
dilation: int = 1) -> int:
padding = ((stride - 1) + dilation * (kernel_size - 1)) // 2
return padding
# Copyright (c) OpenMMLab. All rights reserved.
from .flops_counter import get_model_complexity_info
from .fuse_conv_bn import fuse_conv_bn
from .sync_bn import revert_sync_batchnorm
from .weight_init import (INITIALIZERS, Caffe2XavierInit, ConstantInit,
KaimingInit, NormalInit, PretrainedInit,
TruncNormalInit, UniformInit, XavierInit,
bias_init_with_prob, caffe2_xavier_init,
constant_init, initialize, kaiming_init, normal_init,
trunc_normal_init, uniform_init, xavier_init)
__all__ = ['get_model_complexity_info', 'fuse_conv_bn']
__all__ = [
'get_model_complexity_info', 'bias_init_with_prob', 'caffe2_xavier_init',
'constant_init', 'kaiming_init', 'normal_init', 'trunc_normal_init',
'uniform_init', 'xavier_init', 'fuse_conv_bn', 'initialize',
'INITIALIZERS', 'ConstantInit', 'XavierInit', 'NormalInit',
'TruncNormalInit', 'UniformInit', 'KaimingInit', 'PretrainedInit',
'Caffe2XavierInit', 'revert_sync_batchnorm'
]
......@@ -24,25 +24,22 @@
# SOFTWARE.
import sys
import warnings
from functools import partial
from typing import Any, Callable, Dict, Optional, TextIO, Tuple
import numpy as np
import torch
import torch.nn as nn
from mmcv.cnn.bricks import (Conv2d, Conv3d, ConvTranspose2d, Linear,
MaxPool2d, MaxPool3d)
import mmcv
def get_model_complexity_info(model: nn.Module,
input_shape: tuple,
print_per_layer_stat: bool = True,
as_strings: bool = True,
input_constructor: Optional[Callable] = None,
flush: bool = False,
ost: TextIO = sys.stdout) -> tuple:
def get_model_complexity_info(model,
input_shape,
print_per_layer_stat=True,
as_strings=True,
input_constructor=None,
flush=False,
ost=sys.stdout):
"""Get complexity information of a model.
This method can calculate FLOPs and parameter counts of a model with
......@@ -51,16 +48,16 @@ def get_model_complexity_info(model: nn.Module,
Supported layers are listed as below:
- Convolutions: ``nn.Conv1d``, ``nn.Conv2d``, ``nn.Conv3d``.
- Activations: ``nn.ReLU``, ``nn.PReLU``, ``nn.ELU``,
``nn.LeakyReLU``, ``nn.ReLU6``.
- Activations: ``nn.ReLU``, ``nn.PReLU``, ``nn.ELU``, ``nn.LeakyReLU``,
``nn.ReLU6``.
- Poolings: ``nn.MaxPool1d``, ``nn.MaxPool2d``, ``nn.MaxPool3d``,
``nn.AvgPool1d``, ``nn.AvgPool2d``, ``nn.AvgPool3d``,
``nn.AdaptiveMaxPool1d``, ``nn.AdaptiveMaxPool2d``,
``nn.AdaptiveMaxPool3d``, ``nn.AdaptiveAvgPool1d``,
``nn.AdaptiveAvgPool2d``, ``nn.AdaptiveAvgPool3d``.
``nn.AvgPool1d``, ``nn.AvgPool2d``, ``nn.AvgPool3d``,
``nn.AdaptiveMaxPool1d``, ``nn.AdaptiveMaxPool2d``,
``nn.AdaptiveMaxPool3d``, ``nn.AdaptiveAvgPool1d``,
``nn.AdaptiveAvgPool2d``, ``nn.AdaptiveAvgPool3d``.
- BatchNorms: ``nn.BatchNorm1d``, ``nn.BatchNorm2d``,
``nn.BatchNorm3d``, ``nn.GroupNorm``, ``nn.InstanceNorm1d``,
``InstanceNorm2d``, ``InstanceNorm3d``, ``nn.LayerNorm``.
``nn.BatchNorm3d``, ``nn.GroupNorm``, ``nn.InstanceNorm1d``,
``InstanceNorm2d``, ``InstanceNorm3d``, ``nn.LayerNorm``.
- Linear: ``nn.Linear``.
- Deconvolution: ``nn.ConvTranspose2d``.
- Upsample: ``nn.Upsample``.
......@@ -81,8 +78,8 @@ def get_model_complexity_info(model: nn.Module,
Returns:
tuple[float | str]: If ``as_strings`` is set to True, it will return
FLOPs and parameter counts in a string format. otherwise, it will
return those in a float number format.
FLOPs and parameter counts in a string format. otherwise, it will
return those in a float number format.
"""
assert type(input_shape) is tuple
assert len(input_shape) >= 1
......@@ -118,9 +115,7 @@ def get_model_complexity_info(model: nn.Module,
return flops_count, params_count
def flops_to_string(flops: float,
units: Optional[str] = 'GFLOPs',
precision: int = 2) -> str:
def flops_to_string(flops, units='GFLOPs', precision=2):
"""Convert FLOPs number into a string.
Note that Here we take a multiply-add counts as one FLOP.
......@@ -163,9 +158,7 @@ def flops_to_string(flops: float,
return str(flops) + ' FLOPs'
def params_to_string(num_params: float,
units: Optional[str] = None,
precision: int = 2) -> str:
def params_to_string(num_params, units=None, precision=2):
"""Convert parameter number into a string.
Args:
......@@ -202,13 +195,13 @@ def params_to_string(num_params: float,
return str(num_params)
def print_model_with_flops(model: nn.Module,
total_flops: float,
total_params: float,
units: Optional[str] = 'GFLOPs',
precision: int = 3,
ost: TextIO = sys.stdout,
flush: bool = False) -> None:
def print_model_with_flops(model,
total_flops,
total_params,
units='GFLOPs',
precision=3,
ost=sys.stdout,
flush=False):
"""Print a model with FLOPs for each layer.
Args:
......@@ -283,10 +276,10 @@ def print_model_with_flops(model: nn.Module,
return ', '.join([
params_to_string(
accumulated_num_params, units='M', precision=precision),
f'{accumulated_num_params / total_params:.3%} Params',
'{:.3%} Params'.format(accumulated_num_params / total_params),
flops_to_string(
accumulated_flops_cost, units=units, precision=precision),
f'{accumulated_flops_cost / total_flops:.3%} FLOPs',
'{:.3%} FLOPs'.format(accumulated_flops_cost / total_flops),
self.original_extra_repr()
])
......@@ -311,7 +304,7 @@ def print_model_with_flops(model: nn.Module,
model.apply(del_extra_repr)
def get_model_parameters_number(model: nn.Module) -> float:
def get_model_parameters_number(model):
"""Calculate parameter number of a model.
Args:
......@@ -324,16 +317,16 @@ def get_model_parameters_number(model: nn.Module) -> float:
return num_params
def add_flops_counting_methods(net_main_module: nn.Module) -> nn.Module:
def add_flops_counting_methods(net_main_module):
# adding additional methods to the existing module object,
# this is done this way so that each function has access to self object
net_main_module.start_flops_count = start_flops_count.__get__( # type: ignore # noqa E501
net_main_module.start_flops_count = start_flops_count.__get__(
net_main_module)
net_main_module.stop_flops_count = stop_flops_count.__get__( # type: ignore # noqa E501
net_main_module.stop_flops_count = stop_flops_count.__get__(
net_main_module)
net_main_module.reset_flops_count = reset_flops_count.__get__( # type: ignore # noqa E501
net_main_module.reset_flops_count = reset_flops_count.__get__(
net_main_module)
net_main_module.compute_average_flops_cost = compute_average_flops_cost.__get__( # type: ignore # noqa E501
net_main_module.compute_average_flops_cost = compute_average_flops_cost.__get__( # noqa: E501
net_main_module)
net_main_module.reset_flops_count()
......@@ -341,7 +334,7 @@ def add_flops_counting_methods(net_main_module: nn.Module) -> nn.Module:
return net_main_module
def compute_average_flops_cost(self) -> Tuple[float, float]:
def compute_average_flops_cost(self):
"""Compute average FLOPs cost.
A method to compute average FLOPs cost, which will be available after
......@@ -359,7 +352,7 @@ def compute_average_flops_cost(self) -> Tuple[float, float]:
return flops_sum / batches_count, params_sum
def start_flops_count(self) -> None:
def start_flops_count(self):
"""Activate the computation of mean flops consumption per image.
A method to activate the computation of mean flops consumption per image.
......@@ -368,7 +361,7 @@ def start_flops_count(self) -> None:
"""
add_batch_counter_hook_function(self)
def add_flops_counter_hook_function(module: nn.Module) -> None:
def add_flops_counter_hook_function(module):
if is_supported_instance(module):
if hasattr(module, '__flops_handle__'):
return
......@@ -382,7 +375,7 @@ def start_flops_count(self) -> None:
self.apply(partial(add_flops_counter_hook_function))
def stop_flops_count(self) -> None:
def stop_flops_count(self):
"""Stop computing the mean flops consumption per image.
A method to stop computing the mean flops consumption per image, which will
......@@ -393,7 +386,7 @@ def stop_flops_count(self) -> None:
self.apply(remove_flops_counter_hook_function)
def reset_flops_count(self) -> None:
def reset_flops_count(self):
"""Reset statistics computed so far.
A method to Reset computed statistics, which will be available after
......@@ -404,13 +397,11 @@ def reset_flops_count(self) -> None:
# ---- Internal functions
def empty_flops_counter_hook(module: nn.Module, input: tuple,
output: Any) -> None:
def empty_flops_counter_hook(module, input, output):
module.__flops__ += 0
def upsample_flops_counter_hook(module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
def upsample_flops_counter_hook(module, input, output):
output_size = output[0]
batch_size = output_size.shape[0]
output_elements_count = batch_size
......@@ -419,38 +410,39 @@ def upsample_flops_counter_hook(module: nn.Module, input: tuple,
module.__flops__ += int(output_elements_count)
def relu_flops_counter_hook(module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
def relu_flops_counter_hook(module, input, output):
active_elements_count = output.numel()
module.__flops__ += int(active_elements_count)
def linear_flops_counter_hook(module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
def linear_flops_counter_hook(module, input, output):
input = input[0]
output_last_dim = output.shape[
-1] # pytorch checks dimensions, so here we don't care much
module.__flops__ += int(np.prod(input[0].shape) * output_last_dim)
module.__flops__ += int(np.prod(input.shape) * output_last_dim)
def pool_flops_counter_hook(module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
module.__flops__ += int(np.prod(input[0].shape))
def pool_flops_counter_hook(module, input, output):
input = input[0]
module.__flops__ += int(np.prod(input.shape))
def norm_flops_counter_hook(module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
batch_flops = np.prod(input[0].shape)
def norm_flops_counter_hook(module, input, output):
input = input[0]
batch_flops = np.prod(input.shape)
if (getattr(module, 'affine', False)
or getattr(module, 'elementwise_affine', False)):
batch_flops *= 2
module.__flops__ += int(batch_flops)
def deconv_flops_counter_hook(conv_module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
def deconv_flops_counter_hook(conv_module, input, output):
# Can have multiple inputs, getting the first one
batch_size = input[0].shape[0]
input_height, input_width = input[0].shape[2:]
input = input[0]
batch_size = input.shape[0]
input_height, input_width = input.shape[2:]
kernel_height, kernel_width = conv_module.kernel_size
in_channels = conv_module.in_channels
......@@ -466,16 +458,17 @@ def deconv_flops_counter_hook(conv_module: nn.Module, input: tuple,
bias_flops = 0
if conv_module.bias is not None:
output_height, output_width = output.shape[2:]
bias_flops = out_channels * batch_size * output_height * output_width
bias_flops = out_channels * batch_size * output_height * output_height
overall_flops = overall_conv_flops + bias_flops
conv_module.__flops__ += int(overall_flops)
def conv_flops_counter_hook(conv_module: nn.Module, input: tuple,
output: torch.Tensor) -> None:
def conv_flops_counter_hook(conv_module, input, output):
# Can have multiple inputs, getting the first one
batch_size = input[0].shape[0]
input = input[0]
batch_size = input.shape[0]
output_dims = list(output.shape[2:])
kernel_dims = list(conv_module.kernel_size)
......@@ -502,23 +495,25 @@ def conv_flops_counter_hook(conv_module: nn.Module, input: tuple,
conv_module.__flops__ += int(overall_flops)
def batch_counter_hook(module: nn.Module, input: tuple, output: Any) -> None:
def batch_counter_hook(module, input, output):
batch_size = 1
if len(input) > 0:
# Can have multiple inputs, getting the first one
batch_size = len(input[0])
input = input[0]
batch_size = len(input)
else:
warnings.warn('No positional inputs found for a module, '
'assuming batch size is 1.')
pass
print('Warning! No positional inputs found for a module, '
'assuming batch size is 1.')
module.__batch_counter__ += batch_size
def add_batch_counter_variables_or_reset(module: nn.Module) -> None:
def add_batch_counter_variables_or_reset(module):
module.__batch_counter__ = 0
def add_batch_counter_hook_function(module: nn.Module) -> None:
def add_batch_counter_hook_function(module):
if hasattr(module, '__batch_counter_handle__'):
return
......@@ -526,43 +521,43 @@ def add_batch_counter_hook_function(module: nn.Module) -> None:
module.__batch_counter_handle__ = handle
def remove_batch_counter_hook_function(module: nn.Module) -> None:
def remove_batch_counter_hook_function(module):
if hasattr(module, '__batch_counter_handle__'):
module.__batch_counter_handle__.remove()
del module.__batch_counter_handle__
def add_flops_counter_variable_or_reset(module: nn.Module) -> None:
def add_flops_counter_variable_or_reset(module):
if is_supported_instance(module):
if hasattr(module, '__flops__') or hasattr(module, '__params__'):
warnings.warn('variables __flops__ or __params__ are already '
'defined for the module' + type(module).__name__ +
' ptflops can affect your code!')
print('Warning: variables __flops__ or __params__ are already '
'defined for the module' + type(module).__name__ +
' ptflops can affect your code!')
module.__flops__ = 0
module.__params__ = get_model_parameters_number(module)
def is_supported_instance(module: nn.Module) -> bool:
def is_supported_instance(module):
if type(module) in get_modules_mapping():
return True
return False
def remove_flops_counter_hook_function(module: nn.Module) -> None:
def remove_flops_counter_hook_function(module):
if is_supported_instance(module):
if hasattr(module, '__flops_handle__'):
module.__flops_handle__.remove()
del module.__flops_handle__
def get_modules_mapping() -> Dict:
def get_modules_mapping():
return {
# convolutions
nn.Conv1d: conv_flops_counter_hook,
nn.Conv2d: conv_flops_counter_hook,
Conv2d: conv_flops_counter_hook,
mmcv.cnn.bricks.Conv2d: conv_flops_counter_hook,
nn.Conv3d: conv_flops_counter_hook,
Conv3d: conv_flops_counter_hook,
mmcv.cnn.bricks.Conv3d: conv_flops_counter_hook,
# activations
nn.ReLU: relu_flops_counter_hook,
nn.PReLU: relu_flops_counter_hook,
......@@ -574,9 +569,9 @@ def get_modules_mapping() -> Dict:
nn.AvgPool1d: pool_flops_counter_hook,
nn.AvgPool2d: pool_flops_counter_hook,
nn.MaxPool2d: pool_flops_counter_hook,
MaxPool2d: pool_flops_counter_hook,
mmcv.cnn.bricks.MaxPool2d: pool_flops_counter_hook,
nn.MaxPool3d: pool_flops_counter_hook,
MaxPool3d: pool_flops_counter_hook,
mmcv.cnn.bricks.MaxPool3d: pool_flops_counter_hook,
nn.AvgPool3d: pool_flops_counter_hook,
nn.AdaptiveMaxPool1d: pool_flops_counter_hook,
nn.AdaptiveAvgPool1d: pool_flops_counter_hook,
......@@ -595,10 +590,10 @@ def get_modules_mapping() -> Dict:
nn.LayerNorm: norm_flops_counter_hook,
# FC
nn.Linear: linear_flops_counter_hook,
Linear: linear_flops_counter_hook,
mmcv.cnn.bricks.Linear: linear_flops_counter_hook,
# Upscale
nn.Upsample: upsample_flops_counter_hook,
# Deconvolution
nn.ConvTranspose2d: deconv_flops_counter_hook,
ConvTranspose2d: deconv_flops_counter_hook,
mmcv.cnn.bricks.ConvTranspose2d: deconv_flops_counter_hook,
}
......@@ -3,7 +3,7 @@ import torch
import torch.nn as nn
def _fuse_conv_bn(conv: nn.Module, bn: nn.Module) -> nn.Module:
def _fuse_conv_bn(conv, bn):
"""Fuse conv and bn into one module.
Args:
......@@ -24,7 +24,7 @@ def _fuse_conv_bn(conv: nn.Module, bn: nn.Module) -> nn.Module:
return conv
def fuse_conv_bn(module: nn.Module) -> nn.Module:
def fuse_conv_bn(module):
"""Recursively fuse conv and bn in a module.
During inference, the functionary of batch norm layers is turned off
......
import torch
import mmcv
class _BatchNormXd(torch.nn.modules.batchnorm._BatchNorm):
"""A general BatchNorm layer without input dimension check.
Reproduced from @kapily's work:
(https://github.com/pytorch/pytorch/issues/41081#issuecomment-783961547)
The only difference between BatchNorm1d, BatchNorm2d, BatchNorm3d, etc
is `_check_input_dim` that is designed for tensor sanity checks.
The check has been bypassed in this class for the convenience of converting
SyncBatchNorm.
"""
def _check_input_dim(self, input):
return
def revert_sync_batchnorm(module):
"""Helper function to convert all `SyncBatchNorm` (SyncBN) and
`mmcv.ops.sync_bn.SyncBatchNorm`(MMSyncBN) layers in the model to
`BatchNormXd` layers.
Adapted from @kapily's work:
(https://github.com/pytorch/pytorch/issues/41081#issuecomment-783961547)
Args:
module (nn.Module): The module containing `SyncBatchNorm` layers.
Returns:
module_output: The converted module with `BatchNormXd` layers.
"""
module_output = module
module_checklist = [torch.nn.modules.batchnorm.SyncBatchNorm]
if hasattr(mmcv, 'ops'):
module_checklist.append(mmcv.ops.SyncBatchNorm)
if isinstance(module, tuple(module_checklist)):
module_output = _BatchNormXd(module.num_features, module.eps,
module.momentum, module.affine,
module.track_running_stats)
if module.affine:
# no_grad() may not be needed here but
# just to be consistent with `convert_sync_batchnorm()`
with torch.no_grad():
module_output.weight = module.weight
module_output.bias = module.bias
module_output.running_mean = module.running_mean
module_output.running_var = module.running_var
module_output.num_batches_tracked = module.num_batches_tracked
module_output.training = module.training
# qconfig exists in quantized models
if hasattr(module, 'qconfig'):
module_output.qconfig = module.qconfig
for name, child in module.named_children():
module_output.add_module(name, revert_sync_batchnorm(child))
del module
return module_output
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import math
import warnings
import numpy as np
import torch
import torch.nn as nn
from torch import Tensor
from mmcv.utils import Registry, build_from_cfg, get_logger, print_log
INITIALIZERS = Registry('initializer')
def update_init_info(module, init_info):
"""Update the `_params_init_info` in the module if the value of parameters
are changed.
Args:
module (obj:`nn.Module`): The module of PyTorch with a user-defined
attribute `_params_init_info` which records the initialization
information.
init_info (str): The string that describes the initialization.
"""
assert hasattr(
module,
'_params_init_info'), f'Can not find `_params_init_info` in {module}'
for name, param in module.named_parameters():
assert param in module._params_init_info, (
f'Find a new :obj:`Parameter` '
f'named `{name}` during executing the '
f'`init_weights` of '
f'`{module.__class__.__name__}`. '
f'Please do not add or '
f'replace parameters during executing '
f'the `init_weights`. ')
# The parameter has been changed during executing the
# `init_weights` of module
mean_value = param.data.mean()
if module._params_init_info[param]['tmp_mean_value'] != mean_value:
module._params_init_info[param]['init_info'] = init_info
module._params_init_info[param]['tmp_mean_value'] = mean_value
def constant_init(module, val, bias=0):
if hasattr(module, 'weight') and module.weight is not None:
nn.init.constant_(module.weight, val)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def xavier_init(module, gain=1, bias=0, distribution='normal'):
assert distribution in ['uniform', 'normal']
if hasattr(module, 'weight') and module.weight is not None:
if distribution == 'uniform':
nn.init.xavier_uniform_(module.weight, gain=gain)
else:
nn.init.xavier_normal_(module.weight, gain=gain)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def normal_init(module, mean=0, std=1, bias=0):
if hasattr(module, 'weight') and module.weight is not None:
nn.init.normal_(module.weight, mean, std)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def trunc_normal_init(module: nn.Module,
mean: float = 0,
std: float = 1,
a: float = -2,
b: float = 2,
bias: float = 0) -> None:
if hasattr(module, 'weight') and module.weight is not None:
trunc_normal_(module.weight, mean, std, a, b) # type: ignore
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias) # type: ignore
def uniform_init(module, a=0, b=1, bias=0):
if hasattr(module, 'weight') and module.weight is not None:
nn.init.uniform_(module.weight, a, b)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def kaiming_init(module,
a=0,
mode='fan_out',
nonlinearity='relu',
bias=0,
distribution='normal'):
assert distribution in ['uniform', 'normal']
if hasattr(module, 'weight') and module.weight is not None:
if distribution == 'uniform':
nn.init.kaiming_uniform_(
module.weight, a=a, mode=mode, nonlinearity=nonlinearity)
else:
nn.init.kaiming_normal_(
module.weight, a=a, mode=mode, nonlinearity=nonlinearity)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def caffe2_xavier_init(module, bias=0):
# `XavierFill` in Caffe2 corresponds to `kaiming_uniform_` in PyTorch
# Acknowledgment to FAIR's internal code
kaiming_init(
module,
a=1,
mode='fan_in',
nonlinearity='leaky_relu',
bias=bias,
distribution='uniform')
def bias_init_with_prob(prior_prob):
"""initialize conv/fc bias value according to a given probability value."""
bias_init = float(-np.log((1 - prior_prob) / prior_prob))
return bias_init
def _get_bases_name(m):
return [b.__name__ for b in m.__class__.__bases__]
class BaseInit(object):
def __init__(self, *, bias=0, bias_prob=None, layer=None):
self.wholemodule = False
if not isinstance(bias, (int, float)):
raise TypeError(f'bias must be a number, but got a {type(bias)}')
if bias_prob is not None:
if not isinstance(bias_prob, float):
raise TypeError(f'bias_prob type must be float, \
but got {type(bias_prob)}')
if layer is not None:
if not isinstance(layer, (str, list)):
raise TypeError(f'layer must be a str or a list of str, \
but got a {type(layer)}')
else:
layer = []
if bias_prob is not None:
self.bias = bias_init_with_prob(bias_prob)
else:
self.bias = bias
self.layer = [layer] if isinstance(layer, str) else layer
def _get_init_info(self):
info = f'{self.__class__.__name__}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='Constant')
class ConstantInit(BaseInit):
"""Initialize module parameters with constant values.
Args:
val (int | float): the value to fill the weights in the module with
bias (int | float): the value to fill the bias. Defaults to 0.
bias_prob (float, optional): the probability for bias initialization.
Defaults to None.
layer (str | list[str], optional): the layer will be initialized.
Defaults to None.
"""
def __init__(self, val, **kwargs):
super().__init__(**kwargs)
self.val = val
def __call__(self, module):
def init(m):
if self.wholemodule:
constant_init(m, self.val, self.bias)
else:
layername = m.__class__.__name__
basesname = _get_bases_name(m)
if len(set(self.layer) & set([layername] + basesname)):
constant_init(m, self.val, self.bias)
module.apply(init)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: val={self.val}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='Xavier')
class XavierInit(BaseInit):
r"""Initialize module parameters with values according to the method
described in `Understanding the difficulty of training deep feedforward
neural networks - Glorot, X. & Bengio, Y. (2010).
<http://proceedings.mlr.press/v9/glorot10a/glorot10a.pdf>`_
Args:
gain (int | float): an optional scaling factor. Defaults to 1.
bias (int | float): the value to fill the bias. Defaults to 0.
bias_prob (float, optional): the probability for bias initialization.
Defaults to None.
distribution (str): distribution either be ``'normal'``
or ``'uniform'``. Defaults to ``'normal'``.
layer (str | list[str], optional): the layer will be initialized.
Defaults to None.
"""
def __init__(self, gain=1, distribution='normal', **kwargs):
super().__init__(**kwargs)
self.gain = gain
self.distribution = distribution
def __call__(self, module):
def init(m):
if self.wholemodule:
xavier_init(m, self.gain, self.bias, self.distribution)
else:
layername = m.__class__.__name__
basesname = _get_bases_name(m)
if len(set(self.layer) & set([layername] + basesname)):
xavier_init(m, self.gain, self.bias, self.distribution)
module.apply(init)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: gain={self.gain}, ' \
f'distribution={self.distribution}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='Normal')
class NormalInit(BaseInit):
r"""Initialize module parameters with the values drawn from the normal
distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)`.
Args:
mean (int | float):the mean of the normal distribution. Defaults to 0.
std (int | float): the standard deviation of the normal distribution.
Defaults to 1.
bias (int | float): the value to fill the bias. Defaults to 0.
bias_prob (float, optional): the probability for bias initialization.
Defaults to None.
layer (str | list[str], optional): the layer will be initialized.
Defaults to None.
"""
def __init__(self, mean=0, std=1, **kwargs):
super().__init__(**kwargs)
self.mean = mean
self.std = std
def __call__(self, module):
def init(m):
if self.wholemodule:
normal_init(m, self.mean, self.std, self.bias)
else:
layername = m.__class__.__name__
basesname = _get_bases_name(m)
if len(set(self.layer) & set([layername] + basesname)):
normal_init(m, self.mean, self.std, self.bias)
module.apply(init)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: mean={self.mean},' \
f' std={self.std}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='TruncNormal')
class TruncNormalInit(BaseInit):
r"""Initialize module parameters with the values drawn from the normal
distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` with values
outside :math:`[a, b]`.
Args:
mean (float): the mean of the normal distribution. Defaults to 0.
std (float): the standard deviation of the normal distribution.
Defaults to 1.
a (float): The minimum cutoff value.
b ( float): The maximum cutoff value.
bias (float): the value to fill the bias. Defaults to 0.
bias_prob (float, optional): the probability for bias initialization.
Defaults to None.
layer (str | list[str], optional): the layer will be initialized.
Defaults to None.
"""
def __init__(self,
mean: float = 0,
std: float = 1,
a: float = -2,
b: float = 2,
**kwargs) -> None:
super().__init__(**kwargs)
self.mean = mean
self.std = std
self.a = a
self.b = b
def __call__(self, module: nn.Module) -> None:
def init(m):
if self.wholemodule:
trunc_normal_init(m, self.mean, self.std, self.a, self.b,
self.bias)
else:
layername = m.__class__.__name__
basesname = _get_bases_name(m)
if len(set(self.layer) & set([layername] + basesname)):
trunc_normal_init(m, self.mean, self.std, self.a, self.b,
self.bias)
module.apply(init)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: a={self.a}, b={self.b},' \
f' mean={self.mean}, std={self.std}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='Uniform')
class UniformInit(BaseInit):
r"""Initialize module parameters with values drawn from the uniform
distribution :math:`\mathcal{U}(a, b)`.
Args:
a (int | float): the lower bound of the uniform distribution.
Defaults to 0.
b (int | float): the upper bound of the uniform distribution.
Defaults to 1.
bias (int | float): the value to fill the bias. Defaults to 0.
bias_prob (float, optional): the probability for bias initialization.
Defaults to None.
layer (str | list[str], optional): the layer will be initialized.
Defaults to None.
"""
def __init__(self, a=0, b=1, **kwargs):
super().__init__(**kwargs)
self.a = a
self.b = b
def __call__(self, module):
def init(m):
if self.wholemodule:
uniform_init(m, self.a, self.b, self.bias)
else:
layername = m.__class__.__name__
basesname = _get_bases_name(m)
if len(set(self.layer) & set([layername] + basesname)):
uniform_init(m, self.a, self.b, self.bias)
module.apply(init)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: a={self.a},' \
f' b={self.b}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='Kaiming')
class KaimingInit(BaseInit):
r"""Initialize module parameters with the values according to the method
described in `Delving deep into rectifiers: Surpassing human-level
performance on ImageNet classification - He, K. et al. (2015).
<https://www.cv-foundation.org/openaccess/content_iccv_2015/
papers/He_Delving_Deep_into_ICCV_2015_paper.pdf>`_
Args:
a (int | float): the negative slope of the rectifier used after this
layer (only used with ``'leaky_relu'``). Defaults to 0.
mode (str): either ``'fan_in'`` or ``'fan_out'``. Choosing
``'fan_in'`` preserves the magnitude of the variance of the weights
in the forward pass. Choosing ``'fan_out'`` preserves the
magnitudes in the backwards pass. Defaults to ``'fan_out'``.
nonlinearity (str): the non-linear function (`nn.functional` name),
recommended to use only with ``'relu'`` or ``'leaky_relu'`` .
Defaults to 'relu'.
bias (int | float): the value to fill the bias. Defaults to 0.
bias_prob (float, optional): the probability for bias initialization.
Defaults to None.
distribution (str): distribution either be ``'normal'`` or
``'uniform'``. Defaults to ``'normal'``.
layer (str | list[str], optional): the layer will be initialized.
Defaults to None.
"""
def __init__(self,
a=0,
mode='fan_out',
nonlinearity='relu',
distribution='normal',
**kwargs):
super().__init__(**kwargs)
self.a = a
self.mode = mode
self.nonlinearity = nonlinearity
self.distribution = distribution
def __call__(self, module):
def init(m):
if self.wholemodule:
kaiming_init(m, self.a, self.mode, self.nonlinearity,
self.bias, self.distribution)
else:
layername = m.__class__.__name__
basesname = _get_bases_name(m)
if len(set(self.layer) & set([layername] + basesname)):
kaiming_init(m, self.a, self.mode, self.nonlinearity,
self.bias, self.distribution)
module.apply(init)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: a={self.a}, mode={self.mode}, ' \
f'nonlinearity={self.nonlinearity}, ' \
f'distribution ={self.distribution}, bias={self.bias}'
return info
@INITIALIZERS.register_module(name='Caffe2Xavier')
class Caffe2XavierInit(KaimingInit):
# `XavierFill` in Caffe2 corresponds to `kaiming_uniform_` in PyTorch
# Acknowledgment to FAIR's internal code
def __init__(self, **kwargs):
super().__init__(
a=1,
mode='fan_in',
nonlinearity='leaky_relu',
distribution='uniform',
**kwargs)
def __call__(self, module):
super().__call__(module)
@INITIALIZERS.register_module(name='Pretrained')
class PretrainedInit(object):
"""Initialize module by loading a pretrained model.
Args:
checkpoint (str): the checkpoint file of the pretrained model should
be load.
prefix (str, optional): the prefix of a sub-module in the pretrained
model. it is for loading a part of the pretrained model to
initialize. For example, if we would like to only load the
backbone of a detector model, we can set ``prefix='backbone.'``.
Defaults to None.
map_location (str): map tensors into proper locations.
"""
def __init__(self, checkpoint, prefix=None, map_location=None):
self.checkpoint = checkpoint
self.prefix = prefix
self.map_location = map_location
def __call__(self, module):
from mmcv.runner import (_load_checkpoint_with_prefix, load_checkpoint,
load_state_dict)
logger = get_logger('mmcv')
if self.prefix is None:
print_log(f'load model from: {self.checkpoint}', logger=logger)
load_checkpoint(
module,
self.checkpoint,
map_location=self.map_location,
strict=False,
logger=logger)
else:
print_log(
f'load {self.prefix} in model from: {self.checkpoint}',
logger=logger)
state_dict = _load_checkpoint_with_prefix(
self.prefix, self.checkpoint, map_location=self.map_location)
load_state_dict(module, state_dict, strict=False, logger=logger)
if hasattr(module, '_params_init_info'):
update_init_info(module, init_info=self._get_init_info())
def _get_init_info(self):
info = f'{self.__class__.__name__}: load from {self.checkpoint}'
return info
def _initialize(module, cfg, wholemodule=False):
func = build_from_cfg(cfg, INITIALIZERS)
# wholemodule flag is for override mode, there is no layer key in override
# and initializer will give init values for the whole module with the name
# in override.
func.wholemodule = wholemodule
func(module)
def _initialize_override(module, override, cfg):
if not isinstance(override, (dict, list)):
raise TypeError(f'override must be a dict or a list of dict, \
but got {type(override)}')
override = [override] if isinstance(override, dict) else override
for override_ in override:
cp_override = copy.deepcopy(override_)
name = cp_override.pop('name', None)
if name is None:
raise ValueError('`override` must contain the key "name",'
f'but got {cp_override}')
# if override only has name key, it means use args in init_cfg
if not cp_override:
cp_override.update(cfg)
# if override has name key and other args except type key, it will
# raise error
elif 'type' not in cp_override.keys():
raise ValueError(
f'`override` need "type" key, but got {cp_override}')
if hasattr(module, name):
_initialize(getattr(module, name), cp_override, wholemodule=True)
else:
raise RuntimeError(f'module did not have attribute {name}, '
f'but init_cfg is {cp_override}.')
def initialize(module, init_cfg):
"""Initialize a module.
Args:
module (``torch.nn.Module``): the module will be initialized.
init_cfg (dict | list[dict]): initialization configuration dict to
define initializer. OpenMMLab has implemented 6 initializers
including ``Constant``, ``Xavier``, ``Normal``, ``Uniform``,
``Kaiming``, and ``Pretrained``.
Example:
>>> module = nn.Linear(2, 3, bias=True)
>>> init_cfg = dict(type='Constant', layer='Linear', val =1 , bias =2)
>>> initialize(module, init_cfg)
>>> module = nn.Sequential(nn.Conv1d(3, 1, 3), nn.Linear(1,2))
>>> # define key ``'layer'`` for initializing layer with different
>>> # configuration
>>> init_cfg = [dict(type='Constant', layer='Conv1d', val=1),
dict(type='Constant', layer='Linear', val=2)]
>>> initialize(module, init_cfg)
>>> # define key``'override'`` to initialize some specific part in
>>> # module
>>> class FooNet(nn.Module):
>>> def __init__(self):
>>> super().__init__()
>>> self.feat = nn.Conv2d(3, 16, 3)
>>> self.reg = nn.Conv2d(16, 10, 3)
>>> self.cls = nn.Conv2d(16, 5, 3)
>>> model = FooNet()
>>> init_cfg = dict(type='Constant', val=1, bias=2, layer='Conv2d',
>>> override=dict(type='Constant', name='reg', val=3, bias=4))
>>> initialize(model, init_cfg)
>>> model = ResNet(depth=50)
>>> # Initialize weights with the pretrained model.
>>> init_cfg = dict(type='Pretrained',
checkpoint='torchvision://resnet50')
>>> initialize(model, init_cfg)
>>> # Initialize weights of a sub-module with the specific part of
>>> # a pretrained model by using "prefix".
>>> url = 'http://download.openmmlab.com/mmdetection/v2.0/retinanet/'\
>>> 'retinanet_r50_fpn_1x_coco/'\
>>> 'retinanet_r50_fpn_1x_coco_20200130-c2398f9e.pth'
>>> init_cfg = dict(type='Pretrained',
checkpoint=url, prefix='backbone.')
"""
if not isinstance(init_cfg, (dict, list)):
raise TypeError(f'init_cfg must be a dict or a list of dict, \
but got {type(init_cfg)}')
if isinstance(init_cfg, dict):
init_cfg = [init_cfg]
for cfg in init_cfg:
# should deeply copy the original config because cfg may be used by
# other modules, e.g., one init_cfg shared by multiple bottleneck
# blocks, the expected cfg will be changed after pop and will change
# the initialization behavior of other modules
cp_cfg = copy.deepcopy(cfg)
override = cp_cfg.pop('override', None)
_initialize(module, cp_cfg)
if override is not None:
cp_cfg.pop('layer', None)
_initialize_override(module, override, cp_cfg)
else:
# All attributes in module have same initialization.
pass
def _no_grad_trunc_normal_(tensor: Tensor, mean: float, std: float, a: float,
b: float) -> Tensor:
# Method based on
# https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
# Modified from
# https://github.com/pytorch/pytorch/blob/master/torch/nn/init.py
def norm_cdf(x):
# Computes standard normal cumulative distribution function
return (1. + math.erf(x / math.sqrt(2.))) / 2.
if (mean < a - 2 * std) or (mean > b + 2 * std):
warnings.warn(
'mean is more than 2 std from [a, b] in nn.init.trunc_normal_. '
'The distribution of values may be incorrect.',
stacklevel=2)
with torch.no_grad():
# Values are generated by using a truncated uniform distribution and
# then using the inverse CDF for the normal distribution.
# Get upper and lower cdf values
lower = norm_cdf((a - mean) / std)
upper = norm_cdf((b - mean) / std)
# Uniformly fill tensor with values from [lower, upper], then translate
# to [2lower-1, 2upper-1].
tensor.uniform_(2 * lower - 1, 2 * upper - 1)
# Use inverse cdf transform for normal distribution to get truncated
# standard normal
tensor.erfinv_()
# Transform to proper mean, std
tensor.mul_(std * math.sqrt(2.))
tensor.add_(mean)
# Clamp to ensure it's in the proper range
tensor.clamp_(min=a, max=b)
return tensor
def trunc_normal_(tensor: Tensor,
mean: float = 0.,
std: float = 1.,
a: float = -2.,
b: float = 2.) -> Tensor:
r"""Fills the input Tensor with values drawn from a truncated
normal distribution. The values are effectively drawn from the
normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)`
with values outside :math:`[a, b]` redrawn until they are within
the bounds. The method used for generating the random values works
best when :math:`a \leq \text{mean} \leq b`.
Modified from
https://github.com/pytorch/pytorch/blob/master/torch/nn/init.py
Args:
tensor (``torch.Tensor``): an n-dimensional `torch.Tensor`.
mean (float): the mean of the normal distribution.
std (float): the standard deviation of the normal distribution.
a (float): the minimum cutoff value.
b (float): the maximum cutoff value.
"""
return _no_grad_trunc_normal_(tensor, mean, std, a, b)
# Copyright (c) OpenMMLab. All rights reserved.
import logging
from typing import List, Optional, Sequence, Tuple, Union
import torch.nn as nn
from mmengine.model import constant_init, kaiming_init, normal_init
from mmengine.runner import load_checkpoint
from torch import Tensor
from .utils import constant_init, kaiming_init, normal_init
def conv3x3(in_planes: int, out_planes: int, dilation: int = 1) -> nn.Module:
def conv3x3(in_planes, out_planes, dilation=1):
"""3x3 convolution with padding."""
return nn.Conv2d(
in_planes,
......@@ -18,12 +16,12 @@ def conv3x3(in_planes: int, out_planes: int, dilation: int = 1) -> nn.Module:
dilation=dilation)
def make_vgg_layer(inplanes: int,
planes: int,
num_blocks: int,
dilation: int = 1,
with_bn: bool = False,
ceil_mode: bool = False) -> List[nn.Module]:
def make_vgg_layer(inplanes,
planes,
num_blocks,
dilation=1,
with_bn=False,
ceil_mode=False):
layers = []
for _ in range(num_blocks):
layers.append(conv3x3(inplanes, planes, dilation))
......@@ -61,18 +59,18 @@ class VGG(nn.Module):
}
def __init__(self,
depth: int,
with_bn: bool = False,
num_classes: int = -1,
num_stages: int = 5,
dilations: Sequence[int] = (1, 1, 1, 1, 1),
out_indices: Sequence[int] = (0, 1, 2, 3, 4),
frozen_stages: int = -1,
bn_eval: bool = True,
bn_frozen: bool = False,
ceil_mode: bool = False,
with_last_pool: bool = True):
super().__init__()
depth,
with_bn=False,
num_classes=-1,
num_stages=5,
dilations=(1, 1, 1, 1, 1),
out_indices=(0, 1, 2, 3, 4),
frozen_stages=-1,
bn_eval=True,
bn_frozen=False,
ceil_mode=False,
with_last_pool=True):
super(VGG, self).__init__()
if depth not in self.arch_settings:
raise KeyError(f'invalid depth {depth} for vgg')
assert num_stages >= 1 and num_stages <= 5
......@@ -124,9 +122,10 @@ class VGG(nn.Module):
nn.Linear(4096, num_classes),
)
def init_weights(self, pretrained: Optional[str] = None) -> None:
def init_weights(self, pretrained=None):
if isinstance(pretrained, str):
logger = logging.getLogger()
from ..runner import load_checkpoint
load_checkpoint(self, pretrained, strict=False, logger=logger)
elif pretrained is None:
for m in self.modules():
......@@ -139,7 +138,7 @@ class VGG(nn.Module):
else:
raise TypeError('pretrained must be a str or None')
def forward(self, x: Tensor) -> Union[Tensor, Tuple[Tensor, ...]]:
def forward(self, x):
outs = []
vgg_layers = getattr(self, self.module_name)
for i in range(len(self.stage_blocks)):
......@@ -157,8 +156,8 @@ class VGG(nn.Module):
else:
return tuple(outs)
def train(self, mode: bool = True) -> None:
super().train(mode)
def train(self, mode=True):
super(VGG, self).train(mode)
if self.bn_eval:
for m in self.modules():
if isinstance(m, nn.BatchNorm2d):
......
# Copyright (c) OpenMMLab. All rights reserved.
from .test import (collect_results_cpu, collect_results_gpu, multi_gpu_test,
single_gpu_test)
__all__ = [
'collect_results_cpu', 'collect_results_gpu', 'multi_gpu_test',
'single_gpu_test'
]
# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
import pickle
import shutil
import tempfile
import time
import torch
import torch.distributed as dist
import mmcv
from mmcv.runner import get_dist_info
def single_gpu_test(model, data_loader):
"""Test model with a single gpu.
This method tests model with a single gpu and displays test progress bar.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
Returns:
list: The prediction results.
"""
model.eval()
results = []
dataset = data_loader.dataset
prog_bar = mmcv.ProgressBar(len(dataset))
for data in data_loader:
with torch.no_grad():
result = model(return_loss=False, **data)
results.extend(result)
# Assume result has the same length of batch_size
# refer to https://github.com/open-mmlab/mmcv/issues/985
batch_size = len(result)
for _ in range(batch_size):
prog_bar.update()
return results
def multi_gpu_test(model, data_loader, tmpdir=None, gpu_collect=False):
"""Test model with multiple gpus.
This method tests model with multiple gpus and collects the results
under two different modes: gpu and cpu modes. By setting
``gpu_collect=True``, it encodes results to gpu tensors and use gpu
communication for results collection. On cpu mode it saves the results on
different gpus to ``tmpdir`` and collects them by the rank 0 worker.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode.
gpu_collect (bool): Option to use either gpu or cpu to collect results.
Returns:
list: The prediction results.
"""
model.eval()
results = []
dataset = data_loader.dataset
rank, world_size = get_dist_info()
if rank == 0:
prog_bar = mmcv.ProgressBar(len(dataset))
time.sleep(2) # This line can prevent deadlock problem in some cases.
for i, data in enumerate(data_loader):
with torch.no_grad():
result = model(return_loss=False, **data)
results.extend(result)
if rank == 0:
batch_size = len(result)
batch_size_all = batch_size * world_size
if batch_size_all + prog_bar.completed > len(dataset):
batch_size_all = len(dataset) - prog_bar.completed
for _ in range(batch_size_all):
prog_bar.update()
# collect results from all ranks
if gpu_collect:
results = collect_results_gpu(results, len(dataset))
else:
results = collect_results_cpu(results, len(dataset), tmpdir)
return results
def collect_results_cpu(result_part, size, tmpdir=None):
"""Collect results under cpu mode.
On cpu mode, this function will save the results on different gpus to
``tmpdir`` and collect them by the rank 0 worker.
Args:
result_part (list): Result list containing result parts
to be collected.
size (int): Size of the results, commonly equal to length of
the results.
tmpdir (str | None): temporal directory for collected results to
store. If set to None, it will create a random temporal directory
for it.
Returns:
list: The collected results.
"""
rank, world_size = get_dist_info()
# create a tmp dir if it is not specified
if tmpdir is None:
MAX_LEN = 512
# 32 is whitespace
dir_tensor = torch.full((MAX_LEN, ),
32,
dtype=torch.uint8,
device='cuda')
if rank == 0:
mmcv.mkdir_or_exist('.dist_test')
tmpdir = tempfile.mkdtemp(dir='.dist_test')
tmpdir = torch.tensor(
bytearray(tmpdir.encode()), dtype=torch.uint8, device='cuda')
dir_tensor[:len(tmpdir)] = tmpdir
dist.broadcast(dir_tensor, 0)
tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip()
else:
mmcv.mkdir_or_exist(tmpdir)
# dump the part result to the dir
mmcv.dump(result_part, osp.join(tmpdir, f'part_{rank}.pkl'))
dist.barrier()
# collect all parts
if rank != 0:
return None
else:
# load results of all parts from tmp dir
part_list = []
for i in range(world_size):
part_file = osp.join(tmpdir, f'part_{i}.pkl')
part_result = mmcv.load(part_file)
# When data is severely insufficient, an empty part_result
# on a certain gpu could makes the overall outputs empty.
if part_result:
part_list.append(part_result)
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
# remove tmp dir
shutil.rmtree(tmpdir)
return ordered_results
def collect_results_gpu(result_part, size):
"""Collect results under gpu mode.
On gpu mode, this function will encode results to gpu tensors and use gpu
communication for results collection.
Args:
result_part (list): Result list containing result parts
to be collected.
size (int): Size of the results, commonly equal to length of
the results.
Returns:
list: The collected results.
"""
rank, world_size = get_dist_info()
# dump result part to tensor with pickle
part_tensor = torch.tensor(
bytearray(pickle.dumps(result_part)), dtype=torch.uint8, device='cuda')
# gather all result part tensor shape
shape_tensor = torch.tensor(part_tensor.shape, device='cuda')
shape_list = [shape_tensor.clone() for _ in range(world_size)]
dist.all_gather(shape_list, shape_tensor)
# padding result part tensor to max length
shape_max = torch.tensor(shape_list).max()
part_send = torch.zeros(shape_max, dtype=torch.uint8, device='cuda')
part_send[:shape_tensor[0]] = part_tensor
part_recv_list = [
part_tensor.new_zeros(shape_max) for _ in range(world_size)
]
# gather all result part
dist.all_gather(part_recv_list, part_send)
if rank == 0:
part_list = []
for recv, shape in zip(part_recv_list, shape_list):
part_result = pickle.loads(recv[:shape[0]].cpu().numpy().tobytes())
# When data is severely insufficient, an empty part_result
# on a certain gpu could makes the overall outputs empty.
if part_result:
part_list.append(part_result)
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
return ordered_results
# Copyright (c) OpenMMLab. All rights reserved.
from .file_client import BaseStorageBackend, FileClient
from .handlers import BaseFileHandler, JsonHandler, PickleHandler, YamlHandler
from .io import dump, load, register_handler
from .parse import dict_from_file, list_from_file
__all__ = [
'BaseStorageBackend', 'FileClient', 'load', 'dump', 'register_handler',
'BaseFileHandler', 'JsonHandler', 'PickleHandler', 'YamlHandler',
'list_from_file', 'dict_from_file'
]
# Copyright (c) OpenMMLab. All rights reserved.
import inspect
import os
import os.path as osp
import re
import tempfile
import warnings
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from typing import Iterable, Iterator, Optional, Tuple, Union
from urllib.request import urlopen
import mmcv
from mmcv.utils.misc import has_method
from mmcv.utils.path import is_filepath
class BaseStorageBackend(metaclass=ABCMeta):
"""Abstract class of storage backends.
All backends need to implement two apis: ``get()`` and ``get_text()``.
``get()`` reads the file as a byte stream and ``get_text()`` reads the file
as texts.
"""
# a flag to indicate whether the backend can create a symlink for a file
_allow_symlink = False
@property
def name(self):
return self.__class__.__name__
@property
def allow_symlink(self):
return self._allow_symlink
@abstractmethod
def get(self, filepath):
pass
@abstractmethod
def get_text(self, filepath):
pass
class CephBackend(BaseStorageBackend):
"""Ceph storage backend (for internal use).
Args:
path_mapping (dict|None): path mapping dict from local path to Petrel
path. When ``path_mapping={'src': 'dst'}``, ``src`` in ``filepath``
will be replaced by ``dst``. Default: None.
.. warning::
:class:`mmcv.fileio.file_client.CephBackend` will be deprecated,
please use :class:`mmcv.fileio.file_client.PetrelBackend` instead.
"""
def __init__(self, path_mapping=None):
try:
import ceph
except ImportError:
raise ImportError('Please install ceph to enable CephBackend.')
warnings.warn(
'CephBackend will be deprecated, please use PetrelBackend instead')
self._client = ceph.S3Client()
assert isinstance(path_mapping, dict) or path_mapping is None
self.path_mapping = path_mapping
def get(self, filepath):
filepath = str(filepath)
if self.path_mapping is not None:
for k, v in self.path_mapping.items():
filepath = filepath.replace(k, v)
value = self._client.Get(filepath)
value_buf = memoryview(value)
return value_buf
def get_text(self, filepath, encoding=None):
raise NotImplementedError
class PetrelBackend(BaseStorageBackend):
"""Petrel storage backend (for internal use).
PetrelBackend supports reading and writing data to multiple clusters.
If the file path contains the cluster name, PetrelBackend will read data
from specified cluster or write data to it. Otherwise, PetrelBackend will
access the default cluster.
Args:
path_mapping (dict, optional): Path mapping dict from local path to
Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in
``filepath`` will be replaced by ``dst``. Default: None.
enable_mc (bool, optional): Whether to enable memcached support.
Default: True.
Examples:
>>> filepath1 = 's3://path/of/file'
>>> filepath2 = 'cluster-name:s3://path/of/file'
>>> client = PetrelBackend()
>>> client.get(filepath1) # get data from default cluster
>>> client.get(filepath2) # get data from 'cluster-name' cluster
"""
def __init__(self,
path_mapping: Optional[dict] = None,
enable_mc: bool = True):
try:
from petrel_client import client
except ImportError:
raise ImportError('Please install petrel_client to enable '
'PetrelBackend.')
self._client = client.Client(enable_mc=enable_mc)
assert isinstance(path_mapping, dict) or path_mapping is None
self.path_mapping = path_mapping
def _map_path(self, filepath: Union[str, Path]) -> str:
"""Map ``filepath`` to a string path whose prefix will be replaced by
:attr:`self.path_mapping`.
Args:
filepath (str): Path to be mapped.
"""
filepath = str(filepath)
if self.path_mapping is not None:
for k, v in self.path_mapping.items():
filepath = filepath.replace(k, v)
return filepath
def _format_path(self, filepath: str) -> str:
"""Convert a ``filepath`` to standard format of petrel oss.
If the ``filepath`` is concatenated by ``os.path.join``, in a Windows
environment, the ``filepath`` will be the format of
's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the
above ``filepath`` will be converted to 's3://bucket_name/image.jpg'.
Args:
filepath (str): Path to be formatted.
"""
return re.sub(r'\\+', '/', filepath)
def get(self, filepath: Union[str, Path]) -> memoryview:
"""Read data from a given ``filepath`` with 'rb' mode.
Args:
filepath (str or Path): Path to read data.
Returns:
memoryview: A memory view of expected bytes object to avoid
copying. The memoryview object can be converted to bytes by
``value_buf.tobytes()``.
"""
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
value = self._client.Get(filepath)
value_buf = memoryview(value)
return value_buf
def get_text(self,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> str:
"""Read data from a given ``filepath`` with 'r' mode.
Args:
filepath (str or Path): Path to read data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
Returns:
str: Expected text reading from ``filepath``.
"""
return str(self.get(filepath), encoding=encoding)
def put(self, obj: bytes, filepath: Union[str, Path]) -> None:
"""Save data to a given ``filepath``.
Args:
obj (bytes): Data to be saved.
filepath (str or Path): Path to write data.
"""
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
self._client.put(filepath, obj)
def put_text(self,
obj: str,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> None:
"""Save data to a given ``filepath``.
Args:
obj (str): Data to be written.
filepath (str or Path): Path to write data.
encoding (str): The encoding format used to encode the ``obj``.
Default: 'utf-8'.
"""
self.put(bytes(obj, encoding=encoding), filepath)
def remove(self, filepath: Union[str, Path]) -> None:
"""Remove a file.
Args:
filepath (str or Path): Path to be removed.
"""
if not has_method(self._client, 'delete'):
raise NotImplementedError(
('Current version of Petrel Python SDK has not supported '
'the `delete` method, please use a higher version or dev'
' branch instead.'))
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
self._client.delete(filepath)
def exists(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path exists.
Args:
filepath (str or Path): Path to be checked whether exists.
Returns:
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise.
"""
if not (has_method(self._client, 'contains')
and has_method(self._client, 'isdir')):
raise NotImplementedError(
('Current version of Petrel Python SDK has not supported '
'the `contains` and `isdir` methods, please use a higher'
'version or dev branch instead.'))
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
return self._client.contains(filepath) or self._client.isdir(filepath)
def isdir(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a directory.
Args:
filepath (str or Path): Path to be checked whether it is a
directory.
Returns:
bool: Return ``True`` if ``filepath`` points to a directory,
``False`` otherwise.
"""
if not has_method(self._client, 'isdir'):
raise NotImplementedError(
('Current version of Petrel Python SDK has not supported '
'the `isdir` method, please use a higher version or dev'
' branch instead.'))
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
return self._client.isdir(filepath)
def isfile(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a file.
Args:
filepath (str or Path): Path to be checked whether it is a file.
Returns:
bool: Return ``True`` if ``filepath`` points to a file, ``False``
otherwise.
"""
if not has_method(self._client, 'contains'):
raise NotImplementedError(
('Current version of Petrel Python SDK has not supported '
'the `contains` method, please use a higher version or '
'dev branch instead.'))
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
return self._client.contains(filepath)
def join_path(self, filepath: Union[str, Path],
*filepaths: Union[str, Path]) -> str:
"""Concatenate all file paths.
Args:
filepath (str or Path): Path to be concatenated.
Returns:
str: The result after concatenation.
"""
filepath = self._format_path(self._map_path(filepath))
if filepath.endswith('/'):
filepath = filepath[:-1]
formatted_paths = [filepath]
for path in filepaths:
formatted_paths.append(self._format_path(self._map_path(path)))
return '/'.join(formatted_paths)
@contextmanager
def get_local_path(self, filepath: Union[str, Path]) -> Iterable[str]:
"""Download a file from ``filepath`` and return a temporary path.
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It
can be called with ``with`` statement, and when exists from the
``with`` statement, the temporary path will be released.
Args:
filepath (str | Path): Download a file from ``filepath``.
Examples:
>>> client = PetrelBackend()
>>> # After existing from the ``with`` clause,
>>> # the path will be removed
>>> with client.get_local_path('s3://path/of/your/file') as path:
... # do something here
Yields:
Iterable[str]: Only yield one temporary path.
"""
filepath = self._map_path(filepath)
filepath = self._format_path(filepath)
assert self.isfile(filepath)
try:
f = tempfile.NamedTemporaryFile(delete=False)
f.write(self.get(filepath))
f.close()
yield f.name
finally:
os.remove(f.name)
def list_dir_or_file(self,
dir_path: Union[str, Path],
list_dir: bool = True,
list_file: bool = True,
suffix: Optional[Union[str, Tuple[str]]] = None,
recursive: bool = False) -> Iterator[str]:
"""Scan a directory to find the interested directories or files in
arbitrary order.
Note:
Petrel has no concept of directories but it simulates the directory
hierarchy in the filesystem through public prefixes. In addition,
if the returned path ends with '/', it means the path is a public
prefix which is a logical directory.
Note:
:meth:`list_dir_or_file` returns the path relative to ``dir_path``.
In addition, the returned path of directory will not contains the
suffix '/' which is consistent with other backends.
Args:
dir_path (str | Path): Path of the directory.
list_dir (bool): List the directories. Default: True.
list_file (bool): List the path of files. Default: True.
suffix (str or tuple[str], optional): File suffix
that we are interested in. Default: None.
recursive (bool): If set to True, recursively scan the
directory. Default: False.
Yields:
Iterable[str]: A relative path to ``dir_path``.
"""
if not has_method(self._client, 'list'):
raise NotImplementedError(
('Current version of Petrel Python SDK has not supported '
'the `list` method, please use a higher version or dev'
' branch instead.'))
dir_path = self._map_path(dir_path)
dir_path = self._format_path(dir_path)
if list_dir and suffix is not None:
raise TypeError(
'`list_dir` should be False when `suffix` is not None')
if (suffix is not None) and not isinstance(suffix, (str, tuple)):
raise TypeError('`suffix` must be a string or tuple of strings')
# Petrel's simulated directory hierarchy assumes that directory paths
# should end with `/`
if not dir_path.endswith('/'):
dir_path += '/'
root = dir_path
def _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive):
for path in self._client.list(dir_path):
# the `self.isdir` is not used here to determine whether path
# is a directory, because `self.isdir` relies on
# `self._client.list`
if path.endswith('/'): # a directory path
next_dir_path = self.join_path(dir_path, path)
if list_dir:
# get the relative path and exclude the last
# character '/'
rel_dir = next_dir_path[len(root):-1]
yield rel_dir
if recursive:
yield from _list_dir_or_file(next_dir_path, list_dir,
list_file, suffix,
recursive)
else: # a file path
absolute_path = self.join_path(dir_path, path)
rel_path = absolute_path[len(root):]
if (suffix is None
or rel_path.endswith(suffix)) and list_file:
yield rel_path
return _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive)
class MemcachedBackend(BaseStorageBackend):
"""Memcached storage backend.
Attributes:
server_list_cfg (str): Config file for memcached server list.
client_cfg (str): Config file for memcached client.
sys_path (str | None): Additional path to be appended to `sys.path`.
Default: None.
"""
def __init__(self, server_list_cfg, client_cfg, sys_path=None):
if sys_path is not None:
import sys
sys.path.append(sys_path)
try:
import mc
except ImportError:
raise ImportError(
'Please install memcached to enable MemcachedBackend.')
self.server_list_cfg = server_list_cfg
self.client_cfg = client_cfg
self._client = mc.MemcachedClient.GetInstance(self.server_list_cfg,
self.client_cfg)
# mc.pyvector servers as a point which points to a memory cache
self._mc_buffer = mc.pyvector()
def get(self, filepath):
filepath = str(filepath)
import mc
self._client.Get(filepath, self._mc_buffer)
value_buf = mc.ConvertBuffer(self._mc_buffer)
return value_buf
def get_text(self, filepath, encoding=None):
raise NotImplementedError
class LmdbBackend(BaseStorageBackend):
"""Lmdb storage backend.
Args:
db_path (str): Lmdb database path.
readonly (bool, optional): Lmdb environment parameter. If True,
disallow any write operations. Default: True.
lock (bool, optional): Lmdb environment parameter. If False, when
concurrent access occurs, do not lock the database. Default: False.
readahead (bool, optional): Lmdb environment parameter. If False,
disable the OS filesystem readahead mechanism, which may improve
random read performance when a database is larger than RAM.
Default: False.
Attributes:
db_path (str): Lmdb database path.
"""
def __init__(self,
db_path,
readonly=True,
lock=False,
readahead=False,
**kwargs):
try:
import lmdb
except ImportError:
raise ImportError('Please install lmdb to enable LmdbBackend.')
self.db_path = str(db_path)
self._client = lmdb.open(
self.db_path,
readonly=readonly,
lock=lock,
readahead=readahead,
**kwargs)
def get(self, filepath):
"""Get values according to the filepath.
Args:
filepath (str | obj:`Path`): Here, filepath is the lmdb key.
"""
filepath = str(filepath)
with self._client.begin(write=False) as txn:
value_buf = txn.get(filepath.encode('ascii'))
return value_buf
def get_text(self, filepath, encoding=None):
raise NotImplementedError
class HardDiskBackend(BaseStorageBackend):
"""Raw hard disks storage backend."""
_allow_symlink = True
def get(self, filepath: Union[str, Path]) -> bytes:
"""Read data from a given ``filepath`` with 'rb' mode.
Args:
filepath (str or Path): Path to read data.
Returns:
bytes: Expected bytes object.
"""
with open(filepath, 'rb') as f:
value_buf = f.read()
return value_buf
def get_text(self,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> str:
"""Read data from a given ``filepath`` with 'r' mode.
Args:
filepath (str or Path): Path to read data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
Returns:
str: Expected text reading from ``filepath``.
"""
with open(filepath, 'r', encoding=encoding) as f:
value_buf = f.read()
return value_buf
def put(self, obj: bytes, filepath: Union[str, Path]) -> None:
"""Write data to a given ``filepath`` with 'wb' mode.
Note:
``put`` will create a directory if the directory of ``filepath``
does not exist.
Args:
obj (bytes): Data to be written.
filepath (str or Path): Path to write data.
"""
mmcv.mkdir_or_exist(osp.dirname(filepath))
with open(filepath, 'wb') as f:
f.write(obj)
def put_text(self,
obj: str,
filepath: Union[str, Path],
encoding: str = 'utf-8') -> None:
"""Write data to a given ``filepath`` with 'w' mode.
Note:
``put_text`` will create a directory if the directory of
``filepath`` does not exist.
Args:
obj (str): Data to be written.
filepath (str or Path): Path to write data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
"""
mmcv.mkdir_or_exist(osp.dirname(filepath))
with open(filepath, 'w', encoding=encoding) as f:
f.write(obj)
def remove(self, filepath: Union[str, Path]) -> None:
"""Remove a file.
Args:
filepath (str or Path): Path to be removed.
"""
os.remove(filepath)
def exists(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path exists.
Args:
filepath (str or Path): Path to be checked whether exists.
Returns:
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise.
"""
return osp.exists(filepath)
def isdir(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a directory.
Args:
filepath (str or Path): Path to be checked whether it is a
directory.
Returns:
bool: Return ``True`` if ``filepath`` points to a directory,
``False`` otherwise.
"""
return osp.isdir(filepath)
def isfile(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a file.
Args:
filepath (str or Path): Path to be checked whether it is a file.
Returns:
bool: Return ``True`` if ``filepath`` points to a file, ``False``
otherwise.
"""
return osp.isfile(filepath)
def join_path(self, filepath: Union[str, Path],
*filepaths: Union[str, Path]) -> str:
"""Concatenate all file paths.
Join one or more filepath components intelligently. The return value
is the concatenation of filepath and any members of *filepaths.
Args:
filepath (str or Path): Path to be concatenated.
Returns:
str: The result of concatenation.
"""
return osp.join(filepath, *filepaths)
@contextmanager
def get_local_path(
self, filepath: Union[str, Path]) -> Iterable[Union[str, Path]]:
"""Only for unified API and do nothing."""
yield filepath
def list_dir_or_file(self,
dir_path: Union[str, Path],
list_dir: bool = True,
list_file: bool = True,
suffix: Optional[Union[str, Tuple[str]]] = None,
recursive: bool = False) -> Iterator[str]:
"""Scan a directory to find the interested directories or files in
arbitrary order.
Note:
:meth:`list_dir_or_file` returns the path relative to ``dir_path``.
Args:
dir_path (str | Path): Path of the directory.
list_dir (bool): List the directories. Default: True.
list_file (bool): List the path of files. Default: True.
suffix (str or tuple[str], optional): File suffix
that we are interested in. Default: None.
recursive (bool): If set to True, recursively scan the
directory. Default: False.
Yields:
Iterable[str]: A relative path to ``dir_path``.
"""
if list_dir and suffix is not None:
raise TypeError('`suffix` should be None when `list_dir` is True')
if (suffix is not None) and not isinstance(suffix, (str, tuple)):
raise TypeError('`suffix` must be a string or tuple of strings')
root = dir_path
def _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive):
for entry in os.scandir(dir_path):
if not entry.name.startswith('.') and entry.is_file():
rel_path = osp.relpath(entry.path, root)
if (suffix is None
or rel_path.endswith(suffix)) and list_file:
yield rel_path
elif osp.isdir(entry.path):
if list_dir:
rel_dir = osp.relpath(entry.path, root)
yield rel_dir
if recursive:
yield from _list_dir_or_file(entry.path, list_dir,
list_file, suffix,
recursive)
return _list_dir_or_file(dir_path, list_dir, list_file, suffix,
recursive)
class HTTPBackend(BaseStorageBackend):
"""HTTP and HTTPS storage bachend."""
def get(self, filepath):
value_buf = urlopen(filepath).read()
return value_buf
def get_text(self, filepath, encoding='utf-8'):
value_buf = urlopen(filepath).read()
return value_buf.decode(encoding)
@contextmanager
def get_local_path(self, filepath: str) -> Iterable[str]:
"""Download a file from ``filepath``.
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It
can be called with ``with`` statement, and when exists from the
``with`` statement, the temporary path will be released.
Args:
filepath (str): Download a file from ``filepath``.
Examples:
>>> client = HTTPBackend()
>>> # After existing from the ``with`` clause,
>>> # the path will be removed
>>> with client.get_local_path('http://path/of/your/file') as path:
... # do something here
"""
try:
f = tempfile.NamedTemporaryFile(delete=False)
f.write(self.get(filepath))
f.close()
yield f.name
finally:
os.remove(f.name)
class FileClient:
"""A general file client to access files in different backends.
The client loads a file or text in a specified backend from its path
and returns it as a binary or text file. There are two ways to choose a
backend, the name of backend and the prefix of path. Although both of them
can be used to choose a storage backend, ``backend`` has a higher priority
that is if they are all set, the storage backend will be chosen by the
backend argument. If they are all `None`, the disk backend will be chosen.
Note that It can also register other backend accessor with a given name,
prefixes, and backend class. In addition, We use the singleton pattern to
avoid repeated object creation. If the arguments are the same, the same
object will be returned.
Args:
backend (str, optional): The storage backend type. Options are "disk",
"ceph", "memcached", "lmdb", "http" and "petrel". Default: None.
prefix (str, optional): The prefix of the registered storage backend.
Options are "s3", "http", "https". Default: None.
Examples:
>>> # only set backend
>>> file_client = FileClient(backend='petrel')
>>> # only set prefix
>>> file_client = FileClient(prefix='s3')
>>> # set both backend and prefix but use backend to choose client
>>> file_client = FileClient(backend='petrel', prefix='s3')
>>> # if the arguments are the same, the same object is returned
>>> file_client1 = FileClient(backend='petrel')
>>> file_client1 is file_client
True
Attributes:
client (:obj:`BaseStorageBackend`): The backend object.
"""
_backends = {
'disk': HardDiskBackend,
'ceph': CephBackend,
'memcached': MemcachedBackend,
'lmdb': LmdbBackend,
'petrel': PetrelBackend,
'http': HTTPBackend,
}
# This collection is used to record the overridden backends, and when a
# backend appears in the collection, the singleton pattern is disabled for
# that backend, because if the singleton pattern is used, then the object
# returned will be the backend before overwriting
_overridden_backends = set()
_prefix_to_backends = {
's3': PetrelBackend,
'http': HTTPBackend,
'https': HTTPBackend,
}
_overridden_prefixes = set()
_instances = {}
def __new__(cls, backend=None, prefix=None, **kwargs):
if backend is None and prefix is None:
backend = 'disk'
if backend is not None and backend not in cls._backends:
raise ValueError(
f'Backend {backend} is not supported. Currently supported ones'
f' are {list(cls._backends.keys())}')
if prefix is not None and prefix not in cls._prefix_to_backends:
raise ValueError(
f'prefix {prefix} is not supported. Currently supported ones '
f'are {list(cls._prefix_to_backends.keys())}')
# concatenate the arguments to a unique key for determining whether
# objects with the same arguments were created
arg_key = f'{backend}:{prefix}'
for key, value in kwargs.items():
arg_key += f':{key}:{value}'
# if a backend was overridden, it will create a new object
if (arg_key in cls._instances
and backend not in cls._overridden_backends
and prefix not in cls._overridden_prefixes):
_instance = cls._instances[arg_key]
else:
# create a new object and put it to _instance
_instance = super().__new__(cls)
if backend is not None:
_instance.client = cls._backends[backend](**kwargs)
else:
_instance.client = cls._prefix_to_backends[prefix](**kwargs)
cls._instances[arg_key] = _instance
return _instance
@property
def name(self):
return self.client.name
@property
def allow_symlink(self):
return self.client.allow_symlink
@staticmethod
def parse_uri_prefix(uri: Union[str, Path]) -> Optional[str]:
"""Parse the prefix of a uri.
Args:
uri (str | Path): Uri to be parsed that contains the file prefix.
Examples:
>>> FileClient.parse_uri_prefix('s3://path/of/your/file')
's3'
Returns:
str | None: Return the prefix of uri if the uri contains '://'
else ``None``.
"""
assert is_filepath(uri)
uri = str(uri)
if '://' not in uri:
return None
else:
prefix, _ = uri.split('://')
# In the case of PetrelBackend, the prefix may contains the cluster
# name like clusterName:s3
if ':' in prefix:
_, prefix = prefix.split(':')
return prefix
@classmethod
def infer_client(cls,
file_client_args: Optional[dict] = None,
uri: Optional[Union[str, Path]] = None) -> 'FileClient':
"""Infer a suitable file client based on the URI and arguments.
Args:
file_client_args (dict, optional): Arguments to instantiate a
FileClient. Default: None.
uri (str | Path, optional): Uri to be parsed that contains the file
prefix. Default: None.
Examples:
>>> uri = 's3://path/of/your/file'
>>> file_client = FileClient.infer_client(uri=uri)
>>> file_client_args = {'backend': 'petrel'}
>>> file_client = FileClient.infer_client(file_client_args)
Returns:
FileClient: Instantiated FileClient object.
"""
assert file_client_args is not None or uri is not None
if file_client_args is None:
file_prefix = cls.parse_uri_prefix(uri) # type: ignore
return cls(prefix=file_prefix)
else:
return cls(**file_client_args)
@classmethod
def _register_backend(cls, name, backend, force=False, prefixes=None):
if not isinstance(name, str):
raise TypeError('the backend name should be a string, '
f'but got {type(name)}')
if not inspect.isclass(backend):
raise TypeError(
f'backend should be a class but got {type(backend)}')
if not issubclass(backend, BaseStorageBackend):
raise TypeError(
f'backend {backend} is not a subclass of BaseStorageBackend')
if not force and name in cls._backends:
raise KeyError(
f'{name} is already registered as a storage backend, '
'add "force=True" if you want to override it')
if name in cls._backends and force:
cls._overridden_backends.add(name)
cls._backends[name] = backend
if prefixes is not None:
if isinstance(prefixes, str):
prefixes = [prefixes]
else:
assert isinstance(prefixes, (list, tuple))
for prefix in prefixes:
if prefix not in cls._prefix_to_backends:
cls._prefix_to_backends[prefix] = backend
elif (prefix in cls._prefix_to_backends) and force:
cls._overridden_prefixes.add(prefix)
cls._prefix_to_backends[prefix] = backend
else:
raise KeyError(
f'{prefix} is already registered as a storage backend,'
' add "force=True" if you want to override it')
@classmethod
def register_backend(cls, name, backend=None, force=False, prefixes=None):
"""Register a backend to FileClient.
This method can be used as a normal class method or a decorator.
.. code-block:: python
class NewBackend(BaseStorageBackend):
def get(self, filepath):
return filepath
def get_text(self, filepath):
return filepath
FileClient.register_backend('new', NewBackend)
or
.. code-block:: python
@FileClient.register_backend('new')
class NewBackend(BaseStorageBackend):
def get(self, filepath):
return filepath
def get_text(self, filepath):
return filepath
Args:
name (str): The name of the registered backend.
backend (class, optional): The backend class to be registered,
which must be a subclass of :class:`BaseStorageBackend`.
When this method is used as a decorator, backend is None.
Defaults to None.
force (bool, optional): Whether to override the backend if the name
has already been registered. Defaults to False.
prefixes (str or list[str] or tuple[str], optional): The prefixes
of the registered storage backend. Default: None.
`New in version 1.3.15.`
"""
if backend is not None:
cls._register_backend(
name, backend, force=force, prefixes=prefixes)
return
def _register(backend_cls):
cls._register_backend(
name, backend_cls, force=force, prefixes=prefixes)
return backend_cls
return _register
def get(self, filepath: Union[str, Path]) -> Union[bytes, memoryview]:
"""Read data from a given ``filepath`` with 'rb' mode.
Note:
There are two types of return values for ``get``, one is ``bytes``
and the other is ``memoryview``. The advantage of using memoryview
is that you can avoid copying, and if you want to convert it to
``bytes``, you can use ``.tobytes()``.
Args:
filepath (str or Path): Path to read data.
Returns:
bytes | memoryview: Expected bytes object or a memory view of the
bytes object.
"""
return self.client.get(filepath)
def get_text(self, filepath: Union[str, Path], encoding='utf-8') -> str:
"""Read data from a given ``filepath`` with 'r' mode.
Args:
filepath (str or Path): Path to read data.
encoding (str): The encoding format used to open the ``filepath``.
Default: 'utf-8'.
Returns:
str: Expected text reading from ``filepath``.
"""
return self.client.get_text(filepath, encoding)
def put(self, obj: bytes, filepath: Union[str, Path]) -> None:
"""Write data to a given ``filepath`` with 'wb' mode.
Note:
``put`` should create a directory if the directory of ``filepath``
does not exist.
Args:
obj (bytes): Data to be written.
filepath (str or Path): Path to write data.
"""
self.client.put(obj, filepath)
def put_text(self, obj: str, filepath: Union[str, Path]) -> None:
"""Write data to a given ``filepath`` with 'w' mode.
Note:
``put_text`` should create a directory if the directory of
``filepath`` does not exist.
Args:
obj (str): Data to be written.
filepath (str or Path): Path to write data.
encoding (str, optional): The encoding format used to open the
`filepath`. Default: 'utf-8'.
"""
self.client.put_text(obj, filepath)
def remove(self, filepath: Union[str, Path]) -> None:
"""Remove a file.
Args:
filepath (str, Path): Path to be removed.
"""
self.client.remove(filepath)
def exists(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path exists.
Args:
filepath (str or Path): Path to be checked whether exists.
Returns:
bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise.
"""
return self.client.exists(filepath)
def isdir(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a directory.
Args:
filepath (str or Path): Path to be checked whether it is a
directory.
Returns:
bool: Return ``True`` if ``filepath`` points to a directory,
``False`` otherwise.
"""
return self.client.isdir(filepath)
def isfile(self, filepath: Union[str, Path]) -> bool:
"""Check whether a file path is a file.
Args:
filepath (str or Path): Path to be checked whether it is a file.
Returns:
bool: Return ``True`` if ``filepath`` points to a file, ``False``
otherwise.
"""
return self.client.isfile(filepath)
def join_path(self, filepath: Union[str, Path],
*filepaths: Union[str, Path]) -> str:
"""Concatenate all file paths.
Join one or more filepath components intelligently. The return value
is the concatenation of filepath and any members of *filepaths.
Args:
filepath (str or Path): Path to be concatenated.
Returns:
str: The result of concatenation.
"""
return self.client.join_path(filepath, *filepaths)
@contextmanager
def get_local_path(self, filepath: Union[str, Path]) -> Iterable[str]:
"""Download data from ``filepath`` and write the data to local path.
``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It
can be called with ``with`` statement, and when exists from the
``with`` statement, the temporary path will be released.
Note:
If the ``filepath`` is a local path, just return itself.
.. warning::
``get_local_path`` is an experimental interface that may change in
the future.
Args:
filepath (str or Path): Path to be read data.
Examples:
>>> file_client = FileClient(prefix='s3')
>>> with file_client.get_local_path('s3://bucket/abc.jpg') as path:
... # do something here
Yields:
Iterable[str]: Only yield one path.
"""
with self.client.get_local_path(str(filepath)) as local_path:
yield local_path
def list_dir_or_file(self,
dir_path: Union[str, Path],
list_dir: bool = True,
list_file: bool = True,
suffix: Optional[Union[str, Tuple[str]]] = None,
recursive: bool = False) -> Iterator[str]:
"""Scan a directory to find the interested directories or files in
arbitrary order.
Note:
:meth:`list_dir_or_file` returns the path relative to ``dir_path``.
Args:
dir_path (str | Path): Path of the directory.
list_dir (bool): List the directories. Default: True.
list_file (bool): List the path of files. Default: True.
suffix (str or tuple[str], optional): File suffix
that we are interested in. Default: None.
recursive (bool): If set to True, recursively scan the
directory. Default: False.
Yields:
Iterable[str]: A relative path to ``dir_path``.
"""
yield from self.client.list_dir_or_file(dir_path, list_dir, list_file,
suffix, recursive)
# Copyright (c) OpenMMLab. All rights reserved.
from .base import BaseFileHandler
from .json_handler import JsonHandler
from .pickle_handler import PickleHandler
from .yaml_handler import YamlHandler
__all__ = ['BaseFileHandler', 'JsonHandler', 'PickleHandler', 'YamlHandler']
# Copyright (c) OpenMMLab. All rights reserved.
from abc import ABCMeta, abstractmethod
class BaseFileHandler(metaclass=ABCMeta):
# `str_like` is a flag to indicate whether the type of file object is
# str-like object or bytes-like object. Pickle only processes bytes-like
# objects but json only processes str-like object. If it is str-like
# object, `StringIO` will be used to process the buffer.
str_like = True
@abstractmethod
def load_from_fileobj(self, file, **kwargs):
pass
@abstractmethod
def dump_to_fileobj(self, obj, file, **kwargs):
pass
@abstractmethod
def dump_to_str(self, obj, **kwargs):
pass
def load_from_path(self, filepath, mode='r', **kwargs):
with open(filepath, mode) as f:
return self.load_from_fileobj(f, **kwargs)
def dump_to_path(self, obj, filepath, mode='w', **kwargs):
with open(filepath, mode) as f:
self.dump_to_fileobj(obj, f, **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
import json
import numpy as np
from .base import BaseFileHandler
def set_default(obj):
"""Set default json values for non-serializable values.
It helps convert ``set``, ``range`` and ``np.ndarray`` data types to list.
It also converts ``np.generic`` (including ``np.int32``, ``np.float32``,
etc.) into plain numbers of plain python built-in types.
"""
if isinstance(obj, (set, range)):
return list(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.generic):
return obj.item()
raise TypeError(f'{type(obj)} is unsupported for json dump')
class JsonHandler(BaseFileHandler):
def load_from_fileobj(self, file):
return json.load(file)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('default', set_default)
json.dump(obj, file, **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('default', set_default)
return json.dumps(obj, **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
import pickle
from .base import BaseFileHandler
class PickleHandler(BaseFileHandler):
str_like = False
def load_from_fileobj(self, file, **kwargs):
return pickle.load(file, **kwargs)
def load_from_path(self, filepath, **kwargs):
return super(PickleHandler, self).load_from_path(
filepath, mode='rb', **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('protocol', 2)
return pickle.dumps(obj, **kwargs)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('protocol', 2)
pickle.dump(obj, file, **kwargs)
def dump_to_path(self, obj, filepath, **kwargs):
super(PickleHandler, self).dump_to_path(
obj, filepath, mode='wb', **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
import yaml
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
from .base import BaseFileHandler # isort:skip
class YamlHandler(BaseFileHandler):
def load_from_fileobj(self, file, **kwargs):
kwargs.setdefault('Loader', Loader)
return yaml.load(file, **kwargs)
def dump_to_fileobj(self, obj, file, **kwargs):
kwargs.setdefault('Dumper', Dumper)
yaml.dump(obj, file, **kwargs)
def dump_to_str(self, obj, **kwargs):
kwargs.setdefault('Dumper', Dumper)
return yaml.dump(obj, **kwargs)
# Copyright (c) OpenMMLab. All rights reserved.
from io import BytesIO, StringIO
from pathlib import Path
from ..utils import is_list_of, is_str
from .file_client import FileClient
from .handlers import BaseFileHandler, JsonHandler, PickleHandler, YamlHandler
file_handlers = {
'json': JsonHandler(),
'yaml': YamlHandler(),
'yml': YamlHandler(),
'pickle': PickleHandler(),
'pkl': PickleHandler()
}
def load(file, file_format=None, file_client_args=None, **kwargs):
"""Load data from json/yaml/pickle files.
This method provides a unified api for loading data from serialized files.
Note:
In v1.3.16 and later, ``load`` supports loading data from serialized
files those can be storaged in different backends.
Args:
file (str or :obj:`Path` or file-like object): Filename or a file-like
object.
file_format (str, optional): If not specified, the file format will be
inferred from the file extension, otherwise use the specified one.
Currently supported formats include "json", "yaml/yml" and
"pickle/pkl".
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> load('/path/of/your/file') # file is storaged in disk
>>> load('https://path/of/your/file') # file is storaged in Internet
>>> load('s3://path/of/your/file') # file is storaged in petrel
Returns:
The content from the file.
"""
if isinstance(file, Path):
file = str(file)
if file_format is None and is_str(file):
file_format = file.split('.')[-1]
if file_format not in file_handlers:
raise TypeError(f'Unsupported format: {file_format}')
handler = file_handlers[file_format]
if is_str(file):
file_client = FileClient.infer_client(file_client_args, file)
if handler.str_like:
with StringIO(file_client.get_text(file)) as f:
obj = handler.load_from_fileobj(f, **kwargs)
else:
with BytesIO(file_client.get(file)) as f:
obj = handler.load_from_fileobj(f, **kwargs)
elif hasattr(file, 'read'):
obj = handler.load_from_fileobj(file, **kwargs)
else:
raise TypeError('"file" must be a filepath str or a file-object')
return obj
def dump(obj, file=None, file_format=None, file_client_args=None, **kwargs):
"""Dump data to json/yaml/pickle strings or files.
This method provides a unified api for dumping data as strings or to files,
and also supports custom arguments for each file format.
Note:
In v1.3.16 and later, ``dump`` supports dumping data as strings or to
files which is saved to different backends.
Args:
obj (any): The python object to be dumped.
file (str or :obj:`Path` or file-like object, optional): If not
specified, then the object is dumped to a str, otherwise to a file
specified by the filename or file-like object.
file_format (str, optional): Same as :func:`load`.
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> dump('hello world', '/path/of/your/file') # disk
>>> dump('hello world', 's3://path/of/your/file') # ceph or petrel
Returns:
bool: True for success, False otherwise.
"""
if isinstance(file, Path):
file = str(file)
if file_format is None:
if is_str(file):
file_format = file.split('.')[-1]
elif file is None:
raise ValueError(
'file_format must be specified since file is None')
if file_format not in file_handlers:
raise TypeError(f'Unsupported format: {file_format}')
handler = file_handlers[file_format]
if file is None:
return handler.dump_to_str(obj, **kwargs)
elif is_str(file):
file_client = FileClient.infer_client(file_client_args, file)
if handler.str_like:
with StringIO() as f:
handler.dump_to_fileobj(obj, f, **kwargs)
file_client.put_text(f.getvalue(), file)
else:
with BytesIO() as f:
handler.dump_to_fileobj(obj, f, **kwargs)
file_client.put(f.getvalue(), file)
elif hasattr(file, 'write'):
handler.dump_to_fileobj(obj, file, **kwargs)
else:
raise TypeError('"file" must be a filename str or a file-object')
def _register_handler(handler, file_formats):
"""Register a handler for some file extensions.
Args:
handler (:obj:`BaseFileHandler`): Handler to be registered.
file_formats (str or list[str]): File formats to be handled by this
handler.
"""
if not isinstance(handler, BaseFileHandler):
raise TypeError(
f'handler must be a child of BaseFileHandler, not {type(handler)}')
if isinstance(file_formats, str):
file_formats = [file_formats]
if not is_list_of(file_formats, str):
raise TypeError('file_formats must be a str or a list of str')
for ext in file_formats:
file_handlers[ext] = handler
def register_handler(file_formats, **kwargs):
def wrap(cls):
_register_handler(cls(**kwargs), file_formats)
return cls
return wrap
# Copyright (c) OpenMMLab. All rights reserved.
from io import StringIO
from .file_client import FileClient
def list_from_file(filename,
prefix='',
offset=0,
max_num=0,
encoding='utf-8',
file_client_args=None):
"""Load a text file and parse the content as a list of strings.
Note:
In v1.3.16 and later, ``list_from_file`` supports loading a text file
which can be storaged in different backends and parsing the content as
a list for strings.
Args:
filename (str): Filename.
prefix (str): The prefix to be inserted to the beginning of each item.
offset (int): The offset of lines.
max_num (int): The maximum number of lines to be read,
zeros and negatives mean no limitation.
encoding (str): Encoding used to open the file. Default utf-8.
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> list_from_file('/path/of/your/file') # disk
['hello', 'world']
>>> list_from_file('s3://path/of/your/file') # ceph or petrel
['hello', 'world']
Returns:
list[str]: A list of strings.
"""
cnt = 0
item_list = []
file_client = FileClient.infer_client(file_client_args, filename)
with StringIO(file_client.get_text(filename, encoding)) as f:
for _ in range(offset):
f.readline()
for line in f:
if 0 < max_num <= cnt:
break
item_list.append(prefix + line.rstrip('\n\r'))
cnt += 1
return item_list
def dict_from_file(filename,
key_type=str,
encoding='utf-8',
file_client_args=None):
"""Load a text file and parse the content as a dict.
Each line of the text file will be two or more columns split by
whitespaces or tabs. The first column will be parsed as dict keys, and
the following columns will be parsed as dict values.
Note:
In v1.3.16 and later, ``dict_from_file`` supports loading a text file
which can be storaged in different backends and parsing the content as
a dict.
Args:
filename(str): Filename.
key_type(type): Type of the dict keys. str is user by default and
type conversion will be performed if specified.
encoding (str): Encoding used to open the file. Default utf-8.
file_client_args (dict, optional): Arguments to instantiate a
FileClient. See :class:`mmcv.fileio.FileClient` for details.
Default: None.
Examples:
>>> dict_from_file('/path/of/your/file') # disk
{'key1': 'value1', 'key2': 'value2'}
>>> dict_from_file('s3://path/of/your/file') # ceph or petrel
{'key1': 'value1', 'key2': 'value2'}
Returns:
dict: The parsed contents.
"""
mapping = {}
file_client = FileClient.infer_client(file_client_args, filename)
with StringIO(file_client.get_text(filename, encoding)) as f:
for line in f:
items = line.rstrip('\n').split()
assert len(items) >= 2
key = key_type(items[0])
val = items[1:] if len(items) > 2 else items[1]
mapping[key] = val
return mapping
......@@ -9,10 +9,10 @@ from .geometric import (cutout, imcrop, imflip, imflip_, impad,
from .io import imfrombytes, imread, imwrite, supported_backends, use_backend
from .misc import tensor2imgs
from .photometric import (adjust_brightness, adjust_color, adjust_contrast,
adjust_hue, adjust_lighting, adjust_sharpness,
auto_contrast, clahe, imdenormalize, imequalize,
iminvert, imnormalize, imnormalize_, lut_transform,
posterize, solarize)
adjust_lighting, adjust_sharpness, auto_contrast,
clahe, imdenormalize, imequalize, iminvert,
imnormalize, imnormalize_, lut_transform, posterize,
solarize)
__all__ = [
'bgr2gray', 'bgr2hls', 'bgr2hsv', 'bgr2rgb', 'gray2bgr', 'gray2rgb',
......@@ -24,6 +24,5 @@ __all__ = [
'solarize', 'rgb2ycbcr', 'bgr2ycbcr', 'ycbcr2rgb', 'ycbcr2bgr',
'tensor2imgs', 'imshear', 'imtranslate', 'adjust_color', 'imequalize',
'adjust_brightness', 'adjust_contrast', 'lut_transform', 'clahe',
'adjust_sharpness', 'auto_contrast', 'cutout', 'adjust_lighting',
'adjust_hue'
'adjust_sharpness', 'auto_contrast', 'cutout', 'adjust_lighting'
]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment