Commit 85529f35 authored by unknown's avatar unknown
Browse files

添加openmmlab测试用例

parent b21b0c01
import warnings
import numpy as np
from mmcls.core import average_performance, mAP
from .base_dataset import BaseDataset
class MultiLabelDataset(BaseDataset):
"""Multi-label Dataset."""
def get_cat_ids(self, idx):
"""Get category ids by index.
Args:
idx (int): Index of data.
Returns:
np.ndarray: Image categories of specified index.
"""
gt_labels = self.data_infos[idx]['gt_label']
cat_ids = np.where(gt_labels == 1)[0]
return cat_ids
def evaluate(self,
results,
metric='mAP',
metric_options=None,
logger=None,
**deprecated_kwargs):
"""Evaluate the dataset.
Args:
results (list): Testing results of the dataset.
metric (str | list[str]): Metrics to be evaluated.
Default value is 'mAP'. Options are 'mAP', 'CP', 'CR', 'CF1',
'OP', 'OR' and 'OF1'.
metric_options (dict, optional): Options for calculating metrics.
Allowed keys are 'k' and 'thr'. Defaults to None
logger (logging.Logger | str, optional): Logger used for printing
related information during evaluation. Defaults to None.
deprecated_kwargs (dict): Used for containing deprecated arguments.
Returns:
dict: evaluation results
"""
if metric_options is None:
metric_options = {'thr': 0.5}
if deprecated_kwargs != {}:
warnings.warn('Option arguments for metrics has been changed to '
'`metric_options`.')
metric_options = {**deprecated_kwargs}
if isinstance(metric, str):
metrics = [metric]
else:
metrics = metric
allowed_metrics = ['mAP', 'CP', 'CR', 'CF1', 'OP', 'OR', 'OF1']
eval_results = {}
results = np.vstack(results)
gt_labels = self.get_gt_labels()
num_imgs = len(results)
assert len(gt_labels) == num_imgs, 'dataset testing results should '\
'be of the same length as gt_labels.'
invalid_metrics = set(metrics) - set(allowed_metrics)
if len(invalid_metrics) != 0:
raise ValueError(f'metric {invalid_metrics} is not supported.')
if 'mAP' in metrics:
mAP_value = mAP(results, gt_labels)
eval_results['mAP'] = mAP_value
if len(set(metrics) - {'mAP'}) != 0:
performance_keys = ['CP', 'CR', 'CF1', 'OP', 'OR', 'OF1']
performance_values = average_performance(results, gt_labels,
**metric_options)
for k, v in zip(performance_keys, performance_values):
if k in metrics:
eval_results[k] = v
return eval_results
from .auto_augment import (AutoAugment, AutoContrast, Brightness,
ColorTransform, Contrast, Cutout, Equalize, Invert,
Posterize, RandAugment, Rotate, Sharpness, Shear,
Solarize, SolarizeAdd, Translate)
from .compose import Compose
from .formating import (Collect, ImageToTensor, ToNumpy, ToPIL, ToTensor,
Transpose, to_tensor)
from .loading import LoadImageFromFile
from .transforms import (CenterCrop, ColorJitter, Lighting, RandomCrop,
RandomErasing, RandomFlip, RandomGrayscale,
RandomResizedCrop, Resize)
__all__ = [
'Compose', 'to_tensor', 'ToTensor', 'ImageToTensor', 'ToPIL', 'ToNumpy',
'Transpose', 'Collect', 'LoadImageFromFile', 'Resize', 'CenterCrop',
'RandomFlip', 'Normalize', 'RandomCrop', 'RandomResizedCrop',
'RandomGrayscale', 'Shear', 'Translate', 'Rotate', 'Invert',
'ColorTransform', 'Solarize', 'Posterize', 'AutoContrast', 'Equalize',
'Contrast', 'Brightness', 'Sharpness', 'AutoAugment', 'SolarizeAdd',
'Cutout', 'RandAugment', 'Lighting', 'ColorJitter', 'RandomErasing'
]
import copy
import random
from numbers import Number
from typing import Sequence
import mmcv
import numpy as np
from ..builder import PIPELINES
from .compose import Compose
def random_negative(value, random_negative_prob):
"""Randomly negate value based on random_negative_prob."""
return -value if np.random.rand() < random_negative_prob else value
@PIPELINES.register_module()
class AutoAugment(object):
"""Auto augmentation. This data augmentation is proposed in `AutoAugment:
Learning Augmentation Policies from Data.
<https://arxiv.org/abs/1805.09501>`_.
Args:
policies (list[list[dict]]): The policies of auto augmentation. Each
policy in ``policies`` is a specific augmentation policy, and is
composed by several augmentations (dict). When AutoAugment is
called, a random policy in ``policies`` will be selected to
augment images.
"""
def __init__(self, policies):
assert isinstance(policies, list) and len(policies) > 0, \
'Policies must be a non-empty list.'
for policy in policies:
assert isinstance(policy, list) and len(policy) > 0, \
'Each policy in policies must be a non-empty list.'
for augment in policy:
assert isinstance(augment, dict) and 'type' in augment, \
'Each specific augmentation must be a dict with key' \
' "type".'
self.policies = copy.deepcopy(policies)
self.sub_policy = [Compose(policy) for policy in self.policies]
def __call__(self, results):
sub_policy = random.choice(self.sub_policy)
return sub_policy(results)
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(policies={self.policies})'
return repr_str
@PIPELINES.register_module()
class RandAugment(object):
"""Random augmentation. This data augmentation is proposed in `RandAugment:
Practical automated data augmentation with a reduced search space.
<https://arxiv.org/abs/1909.13719>`_.
Args:
policies (list[dict]): The policies of random augmentation. Each
policy in ``policies`` is one specific augmentation policy (dict).
The policy shall at least have key `type`, indicating the type of
augmentation. For those which have magnitude, (given to the fact
they are named differently in different augmentation, )
`magnitude_key` and `magnitude_range` shall be the magnitude
argument (str) and the range of magnitude (tuple in the format of
(val1, val2)), respectively. Note that val1 is not necessarily
less than val2.
num_policies (int): Number of policies to select from policies each
time.
magnitude_level (int | float): Magnitude level for all the augmentation
selected.
total_level (int | float): Total level for the magnitude. Defaults to
30.
magnitude_std (Number | str): Deviation of magnitude noise applied.
If positive number, magnitude is sampled from normal distribution
(mean=magnitude, std=magnitude_std).
If 0 or negative number, magnitude remains unchanged.
If str "inf", magnitude is sampled from uniform distribution
(range=[min, magnitude]).
Note:
`magnitude_std` will introduce some randomness to policy, modified by
https://github.com/rwightman/pytorch-image-models
When magnitude_std=0, we calculate the magnitude as follows:
.. math::
magnitude = magnitude_level / total_level * (val2 - val1) + val1
"""
def __init__(self,
policies,
num_policies,
magnitude_level,
magnitude_std=0.,
total_level=30):
assert isinstance(num_policies, int), 'Number of policies must be ' \
f'of int type, got {type(num_policies)} instead.'
assert isinstance(magnitude_level, (int, float)), \
'Magnitude level must be of int or float type, ' \
f'got {type(magnitude_level)} instead.'
assert isinstance(total_level, (int, float)), 'Total level must be ' \
f'of int or float type, got {type(total_level)} instead.'
assert isinstance(policies, list) and len(policies) > 0, \
'Policies must be a non-empty list.'
assert isinstance(magnitude_std, (Number, str)), \
'Magnitude std must be of number or str type, ' \
f'got {type(magnitude_std)} instead.'
if isinstance(magnitude_std, str):
assert magnitude_std == 'inf', \
'Magnitude std must be of number or "inf", ' \
f'got "{magnitude_std}" instead.'
assert num_policies > 0, 'num_policies must be greater than 0.'
assert magnitude_level >= 0, 'magnitude_level must be no less than 0.'
assert total_level > 0, 'total_level must be greater than 0.'
self.num_policies = num_policies
self.magnitude_level = magnitude_level
self.magnitude_std = magnitude_std
self.total_level = total_level
self.policies = policies
self._check_policies(self.policies)
def _check_policies(self, policies):
for policy in policies:
assert isinstance(policy, dict) and 'type' in policy, \
'Each policy must be a dict with key "type".'
type_name = policy['type']
magnitude_key = policy.get('magnitude_key', None)
if magnitude_key is not None:
assert 'magnitude_range' in policy, \
f'RandAugment policy {type_name} needs `magnitude_range`.'
magnitude_range = policy['magnitude_range']
assert (isinstance(magnitude_range, Sequence)
and len(magnitude_range) == 2), \
f'`magnitude_range` of RandAugment policy {type_name} ' \
f'should be a Sequence with two numbers.'
def _process_policies(self, policies):
processed_policies = []
for policy in policies:
processed_policy = copy.deepcopy(policy)
magnitude_key = processed_policy.pop('magnitude_key', None)
if magnitude_key is not None:
magnitude = self.magnitude_level
# if magnitude_std is positive number or 'inf', move
# magnitude_value randomly.
if self.magnitude_std == 'inf':
magnitude = random.uniform(0, magnitude)
elif self.magnitude_std > 0:
magnitude = random.gauss(magnitude, self.magnitude_std)
magnitude = min(self.total_level, max(0, magnitude))
val1, val2 = processed_policy.pop('magnitude_range')
magnitude = (magnitude / self.total_level) * (val2 -
val1) + val1
processed_policy.update({magnitude_key: magnitude})
processed_policies.append(processed_policy)
return processed_policies
def __call__(self, results):
if self.num_policies == 0:
return results
sub_policy = random.choices(self.policies, k=self.num_policies)
sub_policy = self._process_policies(sub_policy)
sub_policy = Compose(sub_policy)
return sub_policy(results)
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(policies={self.policies}, '
repr_str += f'num_policies={self.num_policies}, '
repr_str += f'magnitude_level={self.magnitude_level}, '
repr_str += f'total_level={self.total_level})'
return repr_str
@PIPELINES.register_module()
class Shear(object):
"""Shear images.
Args:
magnitude (int | float): The magnitude used for shear.
pad_val (int, tuple[int]): Pixel pad_val value for constant fill. If a
tuple of length 3, it is used to pad_val R, G, B channels
respectively. Defaults to 128.
prob (float): The probability for performing Shear therefore should be
in range [0, 1]. Defaults to 0.5.
direction (str): The shearing direction. Options are 'horizontal' and
'vertical'. Defaults to 'horizontal'.
random_negative_prob (float): The probability that turns the magnitude
negative, which should be in range [0,1]. Defaults to 0.5.
interpolation (str): Interpolation method. Options are 'nearest',
'bilinear', 'bicubic', 'area', 'lanczos'. Defaults to 'bicubic'.
"""
def __init__(self,
magnitude,
pad_val=128,
prob=0.5,
direction='horizontal',
random_negative_prob=0.5,
interpolation='bicubic'):
assert isinstance(magnitude, (int, float)), 'The magnitude type must '\
f'be int or float, but got {type(magnitude)} instead.'
if isinstance(pad_val, int):
pad_val = tuple([pad_val] * 3)
elif isinstance(pad_val, tuple):
assert len(pad_val) == 3, 'pad_val as a tuple must have 3 ' \
f'elements, got {len(pad_val)} instead.'
assert all(isinstance(i, int) for i in pad_val), 'pad_val as a '\
'tuple must got elements of int type.'
else:
raise TypeError('pad_val must be int or tuple with 3 elements.')
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert direction in ('horizontal', 'vertical'), 'direction must be ' \
f'either "horizontal" or "vertical", got {direction} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.magnitude = magnitude
self.pad_val = pad_val
self.prob = prob
self.direction = direction
self.random_negative_prob = random_negative_prob
self.interpolation = interpolation
def __call__(self, results):
if np.random.rand() > self.prob:
return results
magnitude = random_negative(self.magnitude, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
img_sheared = mmcv.imshear(
img,
magnitude,
direction=self.direction,
border_value=self.pad_val,
interpolation=self.interpolation)
results[key] = img_sheared.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'pad_val={self.pad_val}, '
repr_str += f'prob={self.prob}, '
repr_str += f'direction={self.direction}, '
repr_str += f'random_negative_prob={self.random_negative_prob}, '
repr_str += f'interpolation={self.interpolation})'
return repr_str
@PIPELINES.register_module()
class Translate(object):
"""Translate images.
Args:
magnitude (int | float): The magnitude used for translate. Note that
the offset is calculated by magnitude * size in the corresponding
direction. With a magnitude of 1, the whole image will be moved out
of the range.
pad_val (int, tuple[int]): Pixel pad_val value for constant fill. If a
tuple of length 3, it is used to pad_val R, G, B channels
respectively. Defaults to 128.
prob (float): The probability for performing translate therefore should
be in range [0, 1]. Defaults to 0.5.
direction (str): The translating direction. Options are 'horizontal'
and 'vertical'. Defaults to 'horizontal'.
random_negative_prob (float): The probability that turns the magnitude
negative, which should be in range [0,1]. Defaults to 0.5.
interpolation (str): Interpolation method. Options are 'nearest',
'bilinear', 'bicubic', 'area', 'lanczos'. Defaults to 'nearest'.
"""
def __init__(self,
magnitude,
pad_val=128,
prob=0.5,
direction='horizontal',
random_negative_prob=0.5,
interpolation='nearest'):
assert isinstance(magnitude, (int, float)), 'The magnitude type must '\
f'be int or float, but got {type(magnitude)} instead.'
if isinstance(pad_val, int):
pad_val = tuple([pad_val] * 3)
elif isinstance(pad_val, tuple):
assert len(pad_val) == 3, 'pad_val as a tuple must have 3 ' \
f'elements, got {len(pad_val)} instead.'
assert all(isinstance(i, int) for i in pad_val), 'pad_val as a '\
'tuple must got elements of int type.'
else:
raise TypeError('pad_val must be int or tuple with 3 elements.')
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert direction in ('horizontal', 'vertical'), 'direction must be ' \
f'either "horizontal" or "vertical", got {direction} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.magnitude = magnitude
self.pad_val = pad_val
self.prob = prob
self.direction = direction
self.random_negative_prob = random_negative_prob
self.interpolation = interpolation
def __call__(self, results):
if np.random.rand() > self.prob:
return results
magnitude = random_negative(self.magnitude, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
height, width = img.shape[:2]
if self.direction == 'horizontal':
offset = magnitude * width
else:
offset = magnitude * height
img_translated = mmcv.imtranslate(
img,
offset,
direction=self.direction,
border_value=self.pad_val,
interpolation=self.interpolation)
results[key] = img_translated.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'pad_val={self.pad_val}, '
repr_str += f'prob={self.prob}, '
repr_str += f'direction={self.direction}, '
repr_str += f'random_negative_prob={self.random_negative_prob}, '
repr_str += f'interpolation={self.interpolation})'
return repr_str
@PIPELINES.register_module()
class Rotate(object):
"""Rotate images.
Args:
angle (float): The angle used for rotate. Positive values stand for
clockwise rotation.
center (tuple[float], optional): Center point (w, h) of the rotation in
the source image. If None, the center of the image will be used.
defaults to None.
scale (float): Isotropic scale factor. Defaults to 1.0.
pad_val (int, tuple[int]): Pixel pad_val value for constant fill. If a
tuple of length 3, it is used to pad_val R, G, B channels
respectively. Defaults to 128.
prob (float): The probability for performing Rotate therefore should be
in range [0, 1]. Defaults to 0.5.
random_negative_prob (float): The probability that turns the angle
negative, which should be in range [0,1]. Defaults to 0.5.
interpolation (str): Interpolation method. Options are 'nearest',
'bilinear', 'bicubic', 'area', 'lanczos'. Defaults to 'nearest'.
"""
def __init__(self,
angle,
center=None,
scale=1.0,
pad_val=128,
prob=0.5,
random_negative_prob=0.5,
interpolation='nearest'):
assert isinstance(angle, float), 'The angle type must be float, but ' \
f'got {type(angle)} instead.'
if isinstance(center, tuple):
assert len(center) == 2, 'center as a tuple must have 2 ' \
f'elements, got {len(center)} elements instead.'
else:
assert center is None, 'The center type' \
f'must be tuple or None, got {type(center)} instead.'
assert isinstance(scale, float), 'the scale type must be float, but ' \
f'got {type(scale)} instead.'
if isinstance(pad_val, int):
pad_val = tuple([pad_val] * 3)
elif isinstance(pad_val, tuple):
assert len(pad_val) == 3, 'pad_val as a tuple must have 3 ' \
f'elements, got {len(pad_val)} instead.'
assert all(isinstance(i, int) for i in pad_val), 'pad_val as a '\
'tuple must got elements of int type.'
else:
raise TypeError('pad_val must be int or tuple with 3 elements.')
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.angle = angle
self.center = center
self.scale = scale
self.pad_val = pad_val
self.prob = prob
self.random_negative_prob = random_negative_prob
self.interpolation = interpolation
def __call__(self, results):
if np.random.rand() > self.prob:
return results
angle = random_negative(self.angle, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
img_rotated = mmcv.imrotate(
img,
angle,
center=self.center,
scale=self.scale,
border_value=self.pad_val,
interpolation=self.interpolation)
results[key] = img_rotated.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(angle={self.angle}, '
repr_str += f'center={self.center}, '
repr_str += f'scale={self.scale}, '
repr_str += f'pad_val={self.pad_val}, '
repr_str += f'prob={self.prob}, '
repr_str += f'random_negative_prob={self.random_negative_prob}, '
repr_str += f'interpolation={self.interpolation})'
return repr_str
@PIPELINES.register_module()
class AutoContrast(object):
"""Auto adjust image contrast.
Args:
prob (float): The probability for performing invert therefore should
be in range [0, 1]. Defaults to 0.5.
"""
def __init__(self, prob=0.5):
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_contrasted = mmcv.auto_contrast(img)
results[key] = img_contrasted.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(prob={self.prob})'
return repr_str
@PIPELINES.register_module()
class Invert(object):
"""Invert images.
Args:
prob (float): The probability for performing invert therefore should
be in range [0, 1]. Defaults to 0.5.
"""
def __init__(self, prob=0.5):
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_inverted = mmcv.iminvert(img)
results[key] = img_inverted.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(prob={self.prob})'
return repr_str
@PIPELINES.register_module()
class Equalize(object):
"""Equalize the image histogram.
Args:
prob (float): The probability for performing invert therefore should
be in range [0, 1]. Defaults to 0.5.
"""
def __init__(self, prob=0.5):
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_equalized = mmcv.imequalize(img)
results[key] = img_equalized.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(prob={self.prob})'
return repr_str
@PIPELINES.register_module()
class Solarize(object):
"""Solarize images (invert all pixel values above a threshold).
Args:
thr (int | float): The threshold above which the pixels value will be
inverted.
prob (float): The probability for solarizing therefore should be in
range [0, 1]. Defaults to 0.5.
"""
def __init__(self, thr, prob=0.5):
assert isinstance(thr, (int, float)), 'The thr type must '\
f'be int or float, but got {type(thr)} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.thr = thr
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_solarized = mmcv.solarize(img, thr=self.thr)
results[key] = img_solarized.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(thr={self.thr}, '
repr_str += f'prob={self.prob})'
return repr_str
@PIPELINES.register_module()
class SolarizeAdd(object):
"""SolarizeAdd images (add a certain value to pixels below a threshold).
Args:
magnitude (int | float): The value to be added to pixels below the thr.
thr (int | float): The threshold below which the pixels value will be
adjusted.
prob (float): The probability for solarizing therefore should be in
range [0, 1]. Defaults to 0.5.
"""
def __init__(self, magnitude, thr=128, prob=0.5):
assert isinstance(magnitude, (int, float)), 'The thr magnitude must '\
f'be int or float, but got {type(magnitude)} instead.'
assert isinstance(thr, (int, float)), 'The thr type must '\
f'be int or float, but got {type(thr)} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.magnitude = magnitude
self.thr = thr
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_solarized = np.where(img < self.thr,
np.minimum(img + self.magnitude, 255),
img)
results[key] = img_solarized.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'thr={self.thr}, '
repr_str += f'prob={self.prob})'
return repr_str
@PIPELINES.register_module()
class Posterize(object):
"""Posterize images (reduce the number of bits for each color channel).
Args:
bits (int | float): Number of bits for each pixel in the output img,
which should be less or equal to 8.
prob (float): The probability for posterizing therefore should be in
range [0, 1]. Defaults to 0.5.
"""
def __init__(self, bits, prob=0.5):
assert bits <= 8, f'The bits must be less than 8, got {bits} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.bits = int(bits)
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_posterized = mmcv.posterize(img, bits=self.bits)
results[key] = img_posterized.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(bits={self.bits}, '
repr_str += f'prob={self.prob})'
return repr_str
@PIPELINES.register_module()
class Contrast(object):
"""Adjust images contrast.
Args:
magnitude (int | float): The magnitude used for adjusting contrast. A
positive magnitude would enhance the contrast and a negative
magnitude would make the image grayer. A magnitude=0 gives the
origin img.
prob (float): The probability for performing contrast adjusting
therefore should be in range [0, 1]. Defaults to 0.5.
random_negative_prob (float): The probability that turns the magnitude
negative, which should be in range [0,1]. Defaults to 0.5.
"""
def __init__(self, magnitude, prob=0.5, random_negative_prob=0.5):
assert isinstance(magnitude, (int, float)), 'The magnitude type must '\
f'be int or float, but got {type(magnitude)} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.magnitude = magnitude
self.prob = prob
self.random_negative_prob = random_negative_prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
magnitude = random_negative(self.magnitude, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
img_contrasted = mmcv.adjust_contrast(img, factor=1 + magnitude)
results[key] = img_contrasted.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'prob={self.prob}, '
repr_str += f'random_negative_prob={self.random_negative_prob})'
return repr_str
@PIPELINES.register_module()
class ColorTransform(object):
"""Adjust images color balance.
Args:
magnitude (int | float): The magnitude used for color transform. A
positive magnitude would enhance the color and a negative magnitude
would make the image grayer. A magnitude=0 gives the origin img.
prob (float): The probability for performing ColorTransform therefore
should be in range [0, 1]. Defaults to 0.5.
random_negative_prob (float): The probability that turns the magnitude
negative, which should be in range [0,1]. Defaults to 0.5.
"""
def __init__(self, magnitude, prob=0.5, random_negative_prob=0.5):
assert isinstance(magnitude, (int, float)), 'The magnitude type must '\
f'be int or float, but got {type(magnitude)} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.magnitude = magnitude
self.prob = prob
self.random_negative_prob = random_negative_prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
magnitude = random_negative(self.magnitude, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
img_color_adjusted = mmcv.adjust_color(img, alpha=1 + magnitude)
results[key] = img_color_adjusted.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'prob={self.prob}, '
repr_str += f'random_negative_prob={self.random_negative_prob})'
return repr_str
@PIPELINES.register_module()
class Brightness(object):
"""Adjust images brightness.
Args:
magnitude (int | float): The magnitude used for adjusting brightness. A
positive magnitude would enhance the brightness and a negative
magnitude would make the image darker. A magnitude=0 gives the
origin img.
prob (float): The probability for performing contrast adjusting
therefore should be in range [0, 1]. Defaults to 0.5.
random_negative_prob (float): The probability that turns the magnitude
negative, which should be in range [0,1]. Defaults to 0.5.
"""
def __init__(self, magnitude, prob=0.5, random_negative_prob=0.5):
assert isinstance(magnitude, (int, float)), 'The magnitude type must '\
f'be int or float, but got {type(magnitude)} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.magnitude = magnitude
self.prob = prob
self.random_negative_prob = random_negative_prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
magnitude = random_negative(self.magnitude, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
img_brightened = mmcv.adjust_brightness(img, factor=1 + magnitude)
results[key] = img_brightened.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'prob={self.prob}, '
repr_str += f'random_negative_prob={self.random_negative_prob})'
return repr_str
@PIPELINES.register_module()
class Sharpness(object):
"""Adjust images sharpness.
Args:
magnitude (int | float): The magnitude used for adjusting sharpness. A
positive magnitude would enhance the sharpness and a negative
magnitude would make the image bulr. A magnitude=0 gives the
origin img.
prob (float): The probability for performing contrast adjusting
therefore should be in range [0, 1]. Defaults to 0.5.
random_negative_prob (float): The probability that turns the magnitude
negative, which should be in range [0,1]. Defaults to 0.5.
"""
def __init__(self, magnitude, prob=0.5, random_negative_prob=0.5):
assert isinstance(magnitude, (int, float)), 'The magnitude type must '\
f'be int or float, but got {type(magnitude)} instead.'
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
assert 0 <= random_negative_prob <= 1.0, 'The random_negative_prob ' \
f'should be in range [0,1], got {random_negative_prob} instead.'
self.magnitude = magnitude
self.prob = prob
self.random_negative_prob = random_negative_prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
magnitude = random_negative(self.magnitude, self.random_negative_prob)
for key in results.get('img_fields', ['img']):
img = results[key]
img_sharpened = mmcv.adjust_sharpness(img, factor=1 + magnitude)
results[key] = img_sharpened.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(magnitude={self.magnitude}, '
repr_str += f'prob={self.prob}, '
repr_str += f'random_negative_prob={self.random_negative_prob})'
return repr_str
@PIPELINES.register_module()
class Cutout(object):
"""Cutout images.
Args:
shape (int | float | tuple(int | float)): Expected cutout shape (h, w).
If given as a single value, the value will be used for
both h and w.
pad_val (int, tuple[int]): Pixel pad_val value for constant fill. If
it is a tuple, it must have the same length with the image
channels. Defaults to 128.
prob (float): The probability for performing cutout therefore should
be in range [0, 1]. Defaults to 0.5.
"""
def __init__(self, shape, pad_val=128, prob=0.5):
if isinstance(shape, float):
shape = int(shape)
elif isinstance(shape, tuple):
shape = tuple(int(i) for i in shape)
elif not isinstance(shape, int):
raise TypeError(
'shape must be of '
f'type int, float or tuple, got {type(shape)} instead')
assert 0 <= prob <= 1.0, 'The prob should be in range [0,1], ' \
f'got {prob} instead.'
self.shape = shape
self.pad_val = pad_val
self.prob = prob
def __call__(self, results):
if np.random.rand() > self.prob:
return results
for key in results.get('img_fields', ['img']):
img = results[key]
img_cutout = mmcv.cutout(img, self.shape, pad_val=self.pad_val)
results[key] = img_cutout.astype(img.dtype)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(shape={self.shape}, '
repr_str += f'pad_val={self.pad_val}, '
repr_str += f'prob={self.prob})'
return repr_str
from collections.abc import Sequence
from mmcv.utils import build_from_cfg
from ..builder import PIPELINES
@PIPELINES.register_module()
class Compose(object):
"""Compose a data pipeline with a sequence of transforms.
Args:
transforms (list[dict | callable]):
Either config dicts of transforms or transform objects.
"""
def __init__(self, transforms):
assert isinstance(transforms, Sequence)
self.transforms = []
for transform in transforms:
if isinstance(transform, dict):
transform = build_from_cfg(transform, PIPELINES)
self.transforms.append(transform)
elif callable(transform):
self.transforms.append(transform)
else:
raise TypeError('transform must be callable or a dict, but got'
f' {type(transform)}')
def __call__(self, data):
for t in self.transforms:
data = t(data)
if data is None:
return None
return data
def __repr__(self):
format_string = self.__class__.__name__ + '('
for t in self.transforms:
format_string += f'\n {t}'
format_string += '\n)'
return format_string
from collections.abc import Sequence
import mmcv
import numpy as np
import torch
from mmcv.parallel import DataContainer as DC
from PIL import Image
from ..builder import PIPELINES
def to_tensor(data):
"""Convert objects of various python types to :obj:`torch.Tensor`.
Supported types are: :class:`numpy.ndarray`, :class:`torch.Tensor`,
:class:`Sequence`, :class:`int` and :class:`float`.
"""
if isinstance(data, torch.Tensor):
return data
elif isinstance(data, np.ndarray):
return torch.from_numpy(data)
elif isinstance(data, Sequence) and not mmcv.is_str(data):
return torch.tensor(data)
elif isinstance(data, int):
return torch.LongTensor([data])
elif isinstance(data, float):
return torch.FloatTensor([data])
else:
raise TypeError(
f'Type {type(data)} cannot be converted to tensor.'
'Supported types are: `numpy.ndarray`, `torch.Tensor`, '
'`Sequence`, `int` and `float`')
@PIPELINES.register_module()
class ToTensor(object):
def __init__(self, keys):
self.keys = keys
def __call__(self, results):
for key in self.keys:
results[key] = to_tensor(results[key])
return results
def __repr__(self):
return self.__class__.__name__ + f'(keys={self.keys})'
@PIPELINES.register_module()
class ImageToTensor(object):
def __init__(self, keys):
self.keys = keys
def __call__(self, results):
for key in self.keys:
img = results[key]
if len(img.shape) < 3:
img = np.expand_dims(img, -1)
results[key] = to_tensor(img.transpose(2, 0, 1))
return results
def __repr__(self):
return self.__class__.__name__ + f'(keys={self.keys})'
@PIPELINES.register_module()
class Transpose(object):
def __init__(self, keys, order):
self.keys = keys
self.order = order
def __call__(self, results):
for key in self.keys:
results[key] = results[key].transpose(self.order)
return results
def __repr__(self):
return self.__class__.__name__ + \
f'(keys={self.keys}, order={self.order})'
@PIPELINES.register_module()
class ToPIL(object):
def __init__(self):
pass
def __call__(self, results):
results['img'] = Image.fromarray(results['img'])
return results
@PIPELINES.register_module()
class ToNumpy(object):
def __init__(self):
pass
def __call__(self, results):
results['img'] = np.array(results['img'], dtype=np.float32)
return results
@PIPELINES.register_module()
class Collect(object):
"""Collect data from the loader relevant to the specific task.
This is usually the last stage of the data loader pipeline. Typically keys
is set to some subset of "img" and "gt_label".
Args:
keys (Sequence[str]): Keys of results to be collected in ``data``.
meta_keys (Sequence[str], optional): Meta keys to be converted to
``mmcv.DataContainer`` and collected in ``data[img_metas]``.
Default: ``('filename', 'ori_shape', 'img_shape', 'flip',
'flip_direction', 'img_norm_cfg')``
Returns:
dict: The result dict contains the following keys
- keys in``self.keys``
- ``img_metas`` if avaliable
"""
def __init__(self,
keys,
meta_keys=('filename', 'ori_filename', 'ori_shape',
'img_shape', 'flip', 'flip_direction',
'img_norm_cfg')):
self.keys = keys
self.meta_keys = meta_keys
def __call__(self, results):
data = {}
img_meta = {}
for key in self.meta_keys:
if key in results:
img_meta[key] = results[key]
data['img_metas'] = DC(img_meta, cpu_only=True)
for key in self.keys:
data[key] = results[key]
return data
def __repr__(self):
return self.__class__.__name__ + \
f'(keys={self.keys}, meta_keys={self.meta_keys})'
@PIPELINES.register_module()
class WrapFieldsToLists(object):
"""Wrap fields of the data dictionary into lists for evaluation.
This class can be used as a last step of a test or validation
pipeline for single image evaluation or inference.
Example:
>>> test_pipeline = [
>>> dict(type='LoadImageFromFile'),
>>> dict(type='Normalize',
mean=[123.675, 116.28, 103.53],
std=[58.395, 57.12, 57.375],
to_rgb=True),
>>> dict(type='ImageToTensor', keys=['img']),
>>> dict(type='Collect', keys=['img']),
>>> dict(type='WrapIntoLists')
>>> ]
"""
def __call__(self, results):
# Wrap dict fields into lists
for key, val in results.items():
results[key] = [val]
return results
def __repr__(self):
return f'{self.__class__.__name__}()'
import os.path as osp
import mmcv
import numpy as np
from ..builder import PIPELINES
@PIPELINES.register_module()
class LoadImageFromFile(object):
"""Load an image from file.
Required keys are "img_prefix" and "img_info" (a dict that must contain the
key "filename"). Added or updated keys are "filename", "img", "img_shape",
"ori_shape" (same as `img_shape`) and "img_norm_cfg" (means=0 and stds=1).
Args:
to_float32 (bool): Whether to convert the loaded image to a float32
numpy array. If set to False, the loaded image is an uint8 array.
Defaults to False.
color_type (str): The flag argument for :func:`mmcv.imfrombytes()`.
Defaults to 'color'.
file_client_args (dict): Arguments to instantiate a FileClient.
See :class:`mmcv.fileio.FileClient` for details.
Defaults to ``dict(backend='disk')``.
"""
def __init__(self,
to_float32=False,
color_type='color',
file_client_args=dict(backend='disk')):
self.to_float32 = to_float32
self.color_type = color_type
self.file_client_args = file_client_args.copy()
self.file_client = None
def __call__(self, results):
if self.file_client is None:
self.file_client = mmcv.FileClient(**self.file_client_args)
if results['img_prefix'] is not None:
filename = osp.join(results['img_prefix'],
results['img_info']['filename'])
else:
filename = results['img_info']['filename']
img_bytes = self.file_client.get(filename)
img = mmcv.imfrombytes(img_bytes, flag=self.color_type)
if self.to_float32:
img = img.astype(np.float32)
results['filename'] = filename
results['ori_filename'] = results['img_info']['filename']
results['img'] = img
results['img_shape'] = img.shape
results['ori_shape'] = img.shape
num_channels = 1 if len(img.shape) < 3 else img.shape[2]
results['img_norm_cfg'] = dict(
mean=np.zeros(num_channels, dtype=np.float32),
std=np.ones(num_channels, dtype=np.float32),
to_rgb=False)
return results
def __repr__(self):
repr_str = (f'{self.__class__.__name__}('
f'to_float32={self.to_float32}, '
f"color_type='{self.color_type}', "
f'file_client_args={self.file_client_args})')
return repr_str
import inspect
import math
import random
from numbers import Number
from typing import Sequence
import mmcv
import numpy as np
from ..builder import PIPELINES
from .compose import Compose
try:
import albumentations
except ImportError:
albumentations = None
@PIPELINES.register_module()
class RandomCrop(object):
"""Crop the given Image at a random location.
Args:
size (sequence or int): Desired output size of the crop. If size is an
int instead of sequence like (h, w), a square crop (size, size) is
made.
padding (int or sequence, optional): Optional padding on each border
of the image. If a sequence of length 4 is provided, it is used to
pad left, top, right, bottom borders respectively. If a sequence
of length 2 is provided, it is used to pad left/right, top/bottom
borders, respectively. Default: None, which means no padding.
pad_if_needed (boolean): It will pad the image if smaller than the
desired size to avoid raising an exception. Since cropping is done
after padding, the padding seems to be done at a random offset.
Default: False.
pad_val (Number | Sequence[Number]): Pixel pad_val value for constant
fill. If a tuple of length 3, it is used to pad_val R, G, B
channels respectively. Default: 0.
padding_mode (str): Type of padding. Should be: constant, edge,
reflect or symmetric. Default: constant.
-constant: Pads with a constant value, this value is specified
with pad_val.
-edge: pads with the last value at the edge of the image.
-reflect: Pads with reflection of image without repeating the
last value on the edge. For example, padding [1, 2, 3, 4]
with 2 elements on both sides in reflect mode will result
in [3, 2, 1, 2, 3, 4, 3, 2].
-symmetric: Pads with reflection of image repeating the last
value on the edge. For example, padding [1, 2, 3, 4] with
2 elements on both sides in symmetric mode will result in
[2, 1, 1, 2, 3, 4, 4, 3].
"""
def __init__(self,
size,
padding=None,
pad_if_needed=False,
pad_val=0,
padding_mode='constant'):
if isinstance(size, (tuple, list)):
self.size = size
else:
self.size = (size, size)
# check padding mode
assert padding_mode in ['constant', 'edge', 'reflect', 'symmetric']
self.padding = padding
self.pad_if_needed = pad_if_needed
self.pad_val = pad_val
self.padding_mode = padding_mode
@staticmethod
def get_params(img, output_size):
"""Get parameters for ``crop`` for a random crop.
Args:
img (ndarray): Image to be cropped.
output_size (tuple): Expected output size of the crop.
Returns:
tuple: Params (xmin, ymin, target_height, target_width) to be
passed to ``crop`` for random crop.
"""
height = img.shape[0]
width = img.shape[1]
target_height, target_width = output_size
if width == target_width and height == target_height:
return 0, 0, height, width
ymin = random.randint(0, height - target_height)
xmin = random.randint(0, width - target_width)
return ymin, xmin, target_height, target_width
def __call__(self, results):
"""
Args:
img (ndarray): Image to be cropped.
"""
for key in results.get('img_fields', ['img']):
img = results[key]
if self.padding is not None:
img = mmcv.impad(
img, padding=self.padding, pad_val=self.pad_val)
# pad the height if needed
if self.pad_if_needed and img.shape[0] < self.size[0]:
img = mmcv.impad(
img,
padding=(0, self.size[0] - img.shape[0], 0,
self.size[0] - img.shape[0]),
pad_val=self.pad_val,
padding_mode=self.padding_mode)
# pad the width if needed
if self.pad_if_needed and img.shape[1] < self.size[1]:
img = mmcv.impad(
img,
padding=(self.size[1] - img.shape[1], 0,
self.size[1] - img.shape[1], 0),
pad_val=self.pad_val,
padding_mode=self.padding_mode)
ymin, xmin, height, width = self.get_params(img, self.size)
results[key] = mmcv.imcrop(
img,
np.array([
xmin,
ymin,
xmin + width - 1,
ymin + height - 1,
]))
return results
def __repr__(self):
return (self.__class__.__name__ +
f'(size={self.size}, padding={self.padding})')
@PIPELINES.register_module()
class RandomResizedCrop(object):
"""Crop the given image to random size and aspect ratio.
A crop of random size (default: of 0.08 to 1.0) of the original size and a
random aspect ratio (default: of 3/4 to 4/3) of the original aspect ratio
is made. This crop is finally resized to given size.
Args:
size (sequence | int): Desired output size of the crop. If size is an
int instead of sequence like (h, w), a square crop (size, size) is
made.
scale (tuple): Range of the random size of the cropped image compared
to the original image. Defaults to (0.08, 1.0).
ratio (tuple): Range of the random aspect ratio of the cropped image
compared to the original image. Defaults to (3. / 4., 4. / 3.).
max_attempts (int): Maxinum number of attempts before falling back to
Central Crop. Defaults to 10.
efficientnet_style (bool): Whether to use efficientnet style Random
ResizedCrop. Defaults to False.
min_covered (Number): Minimum ratio of the cropped area to the original
area. Only valid if efficientnet_style is true. Defaults to 0.1.
crop_padding (int): The crop padding parameter in efficientnet style
center crop. Only valid if efficientnet_style is true.
Defaults to 32.
interpolation (str): Interpolation method, accepted values are
'nearest', 'bilinear', 'bicubic', 'area', 'lanczos'. Defaults to
'bilinear'.
backend (str): The image resize backend type, accpeted values are
`cv2` and `pillow`. Defaults to `cv2`.
"""
def __init__(self,
size,
scale=(0.08, 1.0),
ratio=(3. / 4., 4. / 3.),
max_attempts=10,
efficientnet_style=False,
min_covered=0.1,
crop_padding=32,
interpolation='bilinear',
backend='cv2'):
if efficientnet_style:
assert isinstance(size, int)
self.size = (size, size)
assert crop_padding >= 0
else:
if isinstance(size, (tuple, list)):
self.size = size
else:
self.size = (size, size)
if (scale[0] > scale[1]) or (ratio[0] > ratio[1]):
raise ValueError('range should be of kind (min, max). '
f'But received scale {scale} and rato {ratio}.')
assert min_covered >= 0, 'min_covered should be no less than 0.'
assert isinstance(max_attempts, int) and max_attempts >= 0, \
'max_attempts mush be of typle int and no less than 0.'
assert interpolation in ('nearest', 'bilinear', 'bicubic', 'area',
'lanczos')
if backend not in ['cv2', 'pillow']:
raise ValueError(f'backend: {backend} is not supported for resize.'
'Supported backends are "cv2", "pillow"')
self.scale = scale
self.ratio = ratio
self.max_attempts = max_attempts
self.efficientnet_style = efficientnet_style
self.min_covered = min_covered
self.crop_padding = crop_padding
self.interpolation = interpolation
self.backend = backend
@staticmethod
def get_params(img, scale, ratio, max_attempts=10):
"""Get parameters for ``crop`` for a random sized crop.
Args:
img (ndarray): Image to be cropped.
scale (tuple): Range of the random size of the cropped image
compared to the original image size.
ratio (tuple): Range of the random aspect ratio of the cropped
image compared to the original image area.
max_attempts (int): Maxinum number of attempts before falling back
to central crop. Defaults to 10.
Returns:
tuple: Params (ymin, xmin, ymax, xmax) to be passed to `crop` for
a random sized crop.
"""
height = img.shape[0]
width = img.shape[1]
area = height * width
for _ in range(max_attempts):
target_area = random.uniform(*scale) * area
log_ratio = (math.log(ratio[0]), math.log(ratio[1]))
aspect_ratio = math.exp(random.uniform(*log_ratio))
target_width = int(round(math.sqrt(target_area * aspect_ratio)))
target_height = int(round(math.sqrt(target_area / aspect_ratio)))
if 0 < target_width <= width and 0 < target_height <= height:
ymin = random.randint(0, height - target_height)
xmin = random.randint(0, width - target_width)
ymax = ymin + target_height - 1
xmax = xmin + target_width - 1
return ymin, xmin, ymax, xmax
# Fallback to central crop
in_ratio = float(width) / float(height)
if in_ratio < min(ratio):
target_width = width
target_height = int(round(target_width / min(ratio)))
elif in_ratio > max(ratio):
target_height = height
target_width = int(round(target_height * max(ratio)))
else: # whole image
target_width = width
target_height = height
ymin = (height - target_height) // 2
xmin = (width - target_width) // 2
ymax = ymin + target_height - 1
xmax = xmin + target_width - 1
return ymin, xmin, ymax, xmax
# https://github.com/kakaobrain/fast-autoaugment/blob/master/FastAutoAugment/data.py # noqa
@staticmethod
def get_params_efficientnet_style(img,
size,
scale,
ratio,
max_attempts=10,
min_covered=0.1,
crop_padding=32):
"""Get parameters for ``crop`` for a random sized crop in efficientnet
style.
Args:
img (ndarray): Image to be cropped.
size (sequence): Desired output size of the crop.
scale (tuple): Range of the random size of the cropped image
compared to the original image size.
ratio (tuple): Range of the random aspect ratio of the cropped
image compared to the original image area.
max_attempts (int): Maxinum number of attempts before falling back
to central crop. Defaults to 10.
min_covered (Number): Minimum ratio of the cropped area to the
original area. Only valid if efficientnet_style is true.
Defaults to 0.1.
crop_padding (int): The crop padding parameter in efficientnet
style center crop. Defaults to 32.
Returns:
tuple: Params (ymin, xmin, ymax, xmax) to be passed to `crop` for
a random sized crop.
"""
height, width = img.shape[:2]
area = height * width
min_target_area = scale[0] * area
max_target_area = scale[1] * area
for _ in range(max_attempts):
aspect_ratio = random.uniform(*ratio)
min_target_height = int(
round(math.sqrt(min_target_area / aspect_ratio)))
max_target_height = int(
round(math.sqrt(max_target_area / aspect_ratio)))
if max_target_height * aspect_ratio > width:
max_target_height = int((width + 0.5 - 1e-7) / aspect_ratio)
if max_target_height * aspect_ratio > width:
max_target_height -= 1
max_target_height = min(max_target_height, height)
min_target_height = min(max_target_height, min_target_height)
# slightly differs from tf inplementation
target_height = int(
round(random.uniform(min_target_height, max_target_height)))
target_width = int(round(target_height * aspect_ratio))
target_area = target_height * target_width
# slight differs from tf. In tf, if target_area > max_target_area,
# area will be recalculated
if (target_area < min_target_area or target_area > max_target_area
or target_width > width or target_height > height
or target_area < min_covered * area):
continue
ymin = random.randint(0, height - target_height)
xmin = random.randint(0, width - target_width)
ymax = ymin + target_height - 1
xmax = xmin + target_width - 1
return ymin, xmin, ymax, xmax
# Fallback to central crop
img_short = min(height, width)
crop_size = size[0] / (size[0] + crop_padding) * img_short
ymin = max(0, int(round((height - crop_size) / 2.)))
xmin = max(0, int(round((width - crop_size) / 2.)))
ymax = min(height, ymin + crop_size) - 1
xmax = min(width, xmin + crop_size) - 1
return ymin, xmin, ymax, xmax
def __call__(self, results):
for key in results.get('img_fields', ['img']):
img = results[key]
if self.efficientnet_style:
get_params_func = self.get_params_efficientnet_style
get_params_args = dict(
img=img,
size=self.size,
scale=self.scale,
ratio=self.ratio,
max_attempts=self.max_attempts,
min_covered=self.min_covered,
crop_padding=self.crop_padding)
else:
get_params_func = self.get_params
get_params_args = dict(
img=img,
scale=self.scale,
ratio=self.ratio,
max_attempts=self.max_attempts)
ymin, xmin, ymax, xmax = get_params_func(**get_params_args)
img = mmcv.imcrop(img, bboxes=np.array([xmin, ymin, xmax, ymax]))
results[key] = mmcv.imresize(
img,
tuple(self.size[::-1]),
interpolation=self.interpolation,
backend=self.backend)
return results
def __repr__(self):
repr_str = self.__class__.__name__ + f'(size={self.size}'
repr_str += f', scale={tuple(round(s, 4) for s in self.scale)}'
repr_str += f', ratio={tuple(round(r, 4) for r in self.ratio)}'
repr_str += f', max_attempts={self.max_attempts}'
repr_str += f', efficientnet_style={self.efficientnet_style}'
repr_str += f', min_covered={self.min_covered}'
repr_str += f', crop_padding={self.crop_padding}'
repr_str += f', interpolation={self.interpolation}'
repr_str += f', backend={self.backend})'
return repr_str
@PIPELINES.register_module()
class RandomGrayscale(object):
"""Randomly convert image to grayscale with a probability of gray_prob.
Args:
gray_prob (float): Probability that image should be converted to
grayscale. Default: 0.1.
Returns:
ndarray: Grayscale version of the input image with probability
gray_prob and unchanged with probability (1-gray_prob).
- If input image is 1 channel: grayscale version is 1 channel.
- If input image is 3 channel: grayscale version is 3 channel
with r == g == b.
"""
def __init__(self, gray_prob=0.1):
self.gray_prob = gray_prob
def __call__(self, results):
"""
Args:
img (ndarray): Image to be converted to grayscale.
Returns:
ndarray: Randomly grayscaled image.
"""
for key in results.get('img_fields', ['img']):
img = results[key]
num_output_channels = img.shape[2]
if random.random() < self.gray_prob:
if num_output_channels > 1:
img = mmcv.rgb2gray(img)[:, :, None]
results[key] = np.dstack(
[img for _ in range(num_output_channels)])
return results
results[key] = img
return results
def __repr__(self):
return self.__class__.__name__ + f'(gray_prob={self.gray_prob})'
@PIPELINES.register_module()
class RandomFlip(object):
"""Flip the image randomly.
Flip the image randomly based on flip probaility and flip direction.
Args:
flip_prob (float): probability of the image being flipped. Default: 0.5
direction (str): The flipping direction. Options are
'horizontal' and 'vertical'. Default: 'horizontal'.
"""
def __init__(self, flip_prob=0.5, direction='horizontal'):
assert 0 <= flip_prob <= 1
assert direction in ['horizontal', 'vertical']
self.flip_prob = flip_prob
self.direction = direction
def __call__(self, results):
"""Call function to flip image.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Flipped results, 'flip', 'flip_direction' keys are added into
result dict.
"""
flip = True if np.random.rand() < self.flip_prob else False
results['flip'] = flip
results['flip_direction'] = self.direction
if results['flip']:
# flip image
for key in results.get('img_fields', ['img']):
results[key] = mmcv.imflip(
results[key], direction=results['flip_direction'])
return results
def __repr__(self):
return self.__class__.__name__ + f'(flip_prob={self.flip_prob})'
@PIPELINES.register_module()
class RandomErasing(object):
"""Randomly selects a rectangle region in an image and erase pixels.
Args:
erase_prob (float): Probability that image will be randomly erased.
Default: 0.5
min_area_ratio (float): Minimum erased area / input image area
Default: 0.02
max_area_ratio (float): Maximum erased area / input image area
Default: 0.4
aspect_range (sequence | float): Aspect ratio range of erased area.
if float, it will be converted to (aspect_ratio, 1/aspect_ratio)
Default: (3/10, 10/3)
mode (str): Fill method in erased area, can be:
- 'const' (default): All pixels are assign with the same value.
- 'rand': each pixel is assigned with a random value in [0, 255]
fill_color (sequence | Number): Base color filled in erased area.
Default: (128, 128, 128)
fill_std (sequence | Number, optional): If set and mode='rand', fill
erased area with random color from normal distribution
(mean=fill_color, std=fill_std); If not set, fill erased area with
random color from uniform distribution (0~255)
Default: None
Note:
See https://arxiv.org/pdf/1708.04896.pdf
This paper provided 4 modes: RE-R, RE-M, RE-0, RE-255, and use RE-M as
default.
- RE-R: RandomErasing(mode='rand')
- RE-M: RandomErasing(mode='const', fill_color=(123.67, 116.3, 103.5))
- RE-0: RandomErasing(mode='const', fill_color=0)
- RE-255: RandomErasing(mode='const', fill_color=255)
"""
def __init__(self,
erase_prob=0.5,
min_area_ratio=0.02,
max_area_ratio=0.4,
aspect_range=(3 / 10, 10 / 3),
mode='const',
fill_color=(128, 128, 128),
fill_std=None):
assert isinstance(erase_prob, float) and 0. <= erase_prob <= 1.
assert isinstance(min_area_ratio, float) and 0. <= min_area_ratio <= 1.
assert isinstance(max_area_ratio, float) and 0. <= max_area_ratio <= 1.
assert min_area_ratio <= max_area_ratio, \
'min_area_ratio should be smaller than max_area_ratio'
if isinstance(aspect_range, float):
aspect_range = min(aspect_range, 1 / aspect_range)
aspect_range = (aspect_range, 1 / aspect_range)
assert isinstance(aspect_range, Sequence) and len(aspect_range) == 2 \
and all(isinstance(x, float) for x in aspect_range), \
'aspect_range should be a float or Sequence with two float.'
assert all(x > 0 for x in aspect_range), \
'aspect_range should be positive.'
assert aspect_range[0] <= aspect_range[1], \
'In aspect_range (min, max), min should be smaller than max.'
assert mode in ['const', 'rand']
if isinstance(fill_color, Number):
fill_color = [fill_color] * 3
assert isinstance(fill_color, Sequence) and len(fill_color) == 3 \
and all(isinstance(x, Number) for x in fill_color), \
'fill_color should be a float or Sequence with three int.'
if fill_std is not None:
if isinstance(fill_std, Number):
fill_std = [fill_std] * 3
assert isinstance(fill_std, Sequence) and len(fill_std) == 3 \
and all(isinstance(x, Number) for x in fill_std), \
'fill_std should be a float or Sequence with three int.'
self.erase_prob = erase_prob
self.min_area_ratio = min_area_ratio
self.max_area_ratio = max_area_ratio
self.aspect_range = aspect_range
self.mode = mode
self.fill_color = fill_color
self.fill_std = fill_std
def _fill_pixels(self, img, top, left, h, w):
if self.mode == 'const':
patch = np.empty((h, w, 3), dtype=np.uint8)
patch[:, :] = np.array(self.fill_color, dtype=np.uint8)
elif self.fill_std is None:
# Uniform distribution
patch = np.random.uniform(0, 256, (h, w, 3)).astype(np.uint8)
else:
# Normal distribution
patch = np.random.normal(self.fill_color, self.fill_std, (h, w, 3))
patch = np.clip(patch.astype(np.int32), 0, 255).astype(np.uint8)
img[top:top + h, left:left + w] = patch
return img
def __call__(self, results):
"""
Args:
results (dict): Results dict from pipeline
Returns:
dict: Results after the transformation.
"""
for key in results.get('img_fields', ['img']):
if np.random.rand() > self.erase_prob:
continue
img = results[key]
img_h, img_w = img.shape[:2]
# convert to log aspect to ensure equal probability of aspect ratio
log_aspect_range = np.log(
np.array(self.aspect_range, dtype=np.float32))
aspect_ratio = np.exp(np.random.uniform(*log_aspect_range))
area = img_h * img_w
area *= np.random.uniform(self.min_area_ratio, self.max_area_ratio)
h = min(int(round(np.sqrt(area * aspect_ratio))), img_h)
w = min(int(round(np.sqrt(area / aspect_ratio))), img_w)
top = np.random.randint(0, img_h - h) if img_h > h else 0
left = np.random.randint(0, img_w - w) if img_w > w else 0
img = self._fill_pixels(img, top, left, h, w)
results[key] = img
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(erase_prob={self.erase_prob}, '
repr_str += f'min_area_ratio={self.min_area_ratio}, '
repr_str += f'max_area_ratio={self.max_area_ratio}, '
repr_str += f'aspect_range={self.aspect_range}, '
repr_str += f'mode={self.mode}, '
repr_str += f'fill_color={self.fill_color}, '
repr_str += f'fill_std={self.fill_std})'
return repr_str
@PIPELINES.register_module()
class Resize(object):
"""Resize images.
Args:
size (int | tuple): Images scales for resizing (h, w).
When size is int, the default behavior is to resize an image
to (size, size). When size is tuple and the second value is -1,
the short edge of an image is resized to its first value.
For example, when size is 224, the image is resized to 224x224.
When size is (224, -1), the short side is resized to 224 and the
other side is computed based on the short side, maintaining the
aspect ratio.
interpolation (str): Interpolation method, accepted values are
"nearest", "bilinear", "bicubic", "area", "lanczos".
More details can be found in `mmcv.image.geometric`.
backend (str): The image resize backend type, accpeted values are
`cv2` and `pillow`. Default: `cv2`.
"""
def __init__(self, size, interpolation='bilinear', backend='cv2'):
assert isinstance(size, int) or (isinstance(size, tuple)
and len(size) == 2)
self.resize_w_short_side = False
if isinstance(size, int):
assert size > 0
size = (size, size)
else:
assert size[0] > 0 and (size[1] > 0 or size[1] == -1)
if size[1] == -1:
self.resize_w_short_side = True
assert interpolation in ('nearest', 'bilinear', 'bicubic', 'area',
'lanczos')
if backend not in ['cv2', 'pillow']:
raise ValueError(f'backend: {backend} is not supported for resize.'
'Supported backends are "cv2", "pillow"')
self.size = size
self.interpolation = interpolation
self.backend = backend
def _resize_img(self, results):
for key in results.get('img_fields', ['img']):
img = results[key]
ignore_resize = False
if self.resize_w_short_side:
h, w = img.shape[:2]
short_side = self.size[0]
if (w <= h and w == short_side) or (h <= w
and h == short_side):
ignore_resize = True
else:
if w < h:
width = short_side
height = int(short_side * h / w)
else:
height = short_side
width = int(short_side * w / h)
else:
height, width = self.size
if not ignore_resize:
img = mmcv.imresize(
img,
size=(width, height),
interpolation=self.interpolation,
return_scale=False,
backend=self.backend)
results[key] = img
results['img_shape'] = img.shape
def __call__(self, results):
self._resize_img(results)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(size={self.size}, '
repr_str += f'interpolation={self.interpolation})'
return repr_str
@PIPELINES.register_module()
class CenterCrop(object):
r"""Center crop the image.
Args:
crop_size (int | tuple): Expected size after cropping with the format
of (h, w).
efficientnet_style (bool): Whether to use efficientnet style center
crop. Defaults to False.
crop_padding (int): The crop padding parameter in efficientnet style
center crop. Only valid if efficientnet style is True. Defaults to
32.
interpolation (str): Interpolation method, accepted values are
'nearest', 'bilinear', 'bicubic', 'area', 'lanczos'. Only valid if
efficientnet style is True. Defaults to 'bilinear'.
backend (str): The image resize backend type, accpeted values are
`cv2` and `pillow`. Only valid if efficientnet style is True.
Defaults to `cv2`.
Notes:
If the image is smaller than the crop size, return the original image.
If efficientnet_style is set to False, the pipeline would be a simple
center crop using the crop_size.
If efficientnet_style is set to True, the pipeline will be to first to
perform the center crop with the crop_size_ as:
.. math::
crop\_size\_ = crop\_size / (crop\_size + crop\_padding) * short\_edge
And then the pipeline resizes the img to the input crop size.
"""
def __init__(self,
crop_size,
efficientnet_style=False,
crop_padding=32,
interpolation='bilinear',
backend='cv2'):
if efficientnet_style:
assert isinstance(crop_size, int)
assert crop_padding >= 0
assert interpolation in ('nearest', 'bilinear', 'bicubic', 'area',
'lanczos')
if backend not in ['cv2', 'pillow']:
raise ValueError(
f'backend: {backend} is not supported for '
'resize. Supported backends are "cv2", "pillow"')
else:
assert isinstance(crop_size, int) or (isinstance(crop_size, tuple)
and len(crop_size) == 2)
if isinstance(crop_size, int):
crop_size = (crop_size, crop_size)
assert crop_size[0] > 0 and crop_size[1] > 0
self.crop_size = crop_size
self.efficientnet_style = efficientnet_style
self.crop_padding = crop_padding
self.interpolation = interpolation
self.backend = backend
def __call__(self, results):
crop_height, crop_width = self.crop_size[0], self.crop_size[1]
for key in results.get('img_fields', ['img']):
img = results[key]
# img.shape has length 2 for grayscale, length 3 for color
img_height, img_width = img.shape[:2]
# https://github.com/tensorflow/tpu/blob/master/models/official/efficientnet/preprocessing.py#L118 # noqa
if self.efficientnet_style:
img_short = min(img_height, img_width)
crop_height = crop_height / (crop_height +
self.crop_padding) * img_short
crop_width = crop_width / (crop_width +
self.crop_padding) * img_short
y1 = max(0, int(round((img_height - crop_height) / 2.)))
x1 = max(0, int(round((img_width - crop_width) / 2.)))
y2 = min(img_height, y1 + crop_height) - 1
x2 = min(img_width, x1 + crop_width) - 1
# crop the image
img = mmcv.imcrop(img, bboxes=np.array([x1, y1, x2, y2]))
if self.efficientnet_style:
img = mmcv.imresize(
img,
tuple(self.crop_size[::-1]),
interpolation=self.interpolation,
backend=self.backend)
img_shape = img.shape
results[key] = img
results['img_shape'] = img_shape
return results
def __repr__(self):
repr_str = self.__class__.__name__ + f'(crop_size={self.crop_size}'
repr_str += f', efficientnet_style={self.efficientnet_style}'
repr_str += f', crop_padding={self.crop_padding}'
repr_str += f', interpolation={self.interpolation}'
repr_str += f', backend={self.backend})'
return repr_str
@PIPELINES.register_module()
class Normalize(object):
"""Normalize the image.
Args:
mean (sequence): Mean values of 3 channels.
std (sequence): Std values of 3 channels.
to_rgb (bool): Whether to convert the image from BGR to RGB,
default is true.
"""
def __init__(self, mean, std, to_rgb=True):
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.to_rgb = to_rgb
def __call__(self, results):
for key in results.get('img_fields', ['img']):
results[key] = mmcv.imnormalize(results[key], self.mean, self.std,
self.to_rgb)
results['img_norm_cfg'] = dict(
mean=self.mean, std=self.std, to_rgb=self.to_rgb)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(mean={list(self.mean)}, '
repr_str += f'std={list(self.std)}, '
repr_str += f'to_rgb={self.to_rgb})'
return repr_str
@PIPELINES.register_module()
class ColorJitter(object):
"""Randomly change the brightness, contrast and saturation of an image.
Args:
brightness (float): How much to jitter brightness.
brightness_factor is chosen uniformly from
[max(0, 1 - brightness), 1 + brightness].
contrast (float): How much to jitter contrast.
contrast_factor is chosen uniformly from
[max(0, 1 - contrast), 1 + contrast].
saturation (float): How much to jitter saturation.
saturation_factor is chosen uniformly from
[max(0, 1 - saturation), 1 + saturation].
"""
def __init__(self, brightness, contrast, saturation):
self.brightness = brightness
self.contrast = contrast
self.saturation = saturation
def __call__(self, results):
brightness_factor = random.uniform(0, self.brightness)
contrast_factor = random.uniform(0, self.contrast)
saturation_factor = random.uniform(0, self.saturation)
color_jitter_transforms = [
dict(
type='Brightness',
magnitude=brightness_factor,
prob=1.,
random_negative_prob=0.5),
dict(
type='Contrast',
magnitude=contrast_factor,
prob=1.,
random_negative_prob=0.5),
dict(
type='ColorTransform',
magnitude=saturation_factor,
prob=1.,
random_negative_prob=0.5)
]
random.shuffle(color_jitter_transforms)
transform = Compose(color_jitter_transforms)
return transform(results)
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(brightness={self.brightness}, '
repr_str += f'contrast={self.contrast}, '
repr_str += f'saturation={self.saturation})'
return repr_str
@PIPELINES.register_module()
class Lighting(object):
"""Adjust images lighting using AlexNet-style PCA jitter.
Args:
eigval (list): the eigenvalue of the convariance matrix of pixel
values, respectively.
eigvec (list[list]): the eigenvector of the convariance matrix of pixel
values, respectively.
alphastd (float): The standard deviation for distribution of alpha.
Dafaults to 0.1
to_rgb (bool): Whether to convert img to rgb.
"""
def __init__(self, eigval, eigvec, alphastd=0.1, to_rgb=True):
assert isinstance(eigval, list), \
f'eigval must be of type list, got {type(eigval)} instead.'
assert isinstance(eigvec, list), \
f'eigvec must be of type list, got {type(eigvec)} instead.'
for vec in eigvec:
assert isinstance(vec, list) and len(vec) == len(eigvec[0]), \
'eigvec must contains lists with equal length.'
self.eigval = np.array(eigval)
self.eigvec = np.array(eigvec)
self.alphastd = alphastd
self.to_rgb = to_rgb
def __call__(self, results):
for key in results.get('img_fields', ['img']):
img = results[key]
results[key] = mmcv.adjust_lighting(
img,
self.eigval,
self.eigvec,
alphastd=self.alphastd,
to_rgb=self.to_rgb)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(eigval={self.eigval.tolist()}, '
repr_str += f'eigvec={self.eigvec.tolist()}, '
repr_str += f'alphastd={self.alphastd}, '
repr_str += f'to_rgb={self.to_rgb})'
return repr_str
@PIPELINES.register_module()
class Albu(object):
"""Albumentation augmentation.
Adds custom transformations from Albumentations library.
Please, visit `https://albumentations.readthedocs.io`
to get more information.
An example of ``transforms`` is as followed:
.. code-block::
[
dict(
type='ShiftScaleRotate',
shift_limit=0.0625,
scale_limit=0.0,
rotate_limit=0,
interpolation=1,
p=0.5),
dict(
type='RandomBrightnessContrast',
brightness_limit=[0.1, 0.3],
contrast_limit=[0.1, 0.3],
p=0.2),
dict(type='ChannelShuffle', p=0.1),
dict(
type='OneOf',
transforms=[
dict(type='Blur', blur_limit=3, p=1.0),
dict(type='MedianBlur', blur_limit=3, p=1.0)
],
p=0.1),
]
Args:
transforms (list[dict]): A list of albu transformations
keymap (dict): Contains {'input key':'albumentation-style key'}
"""
def __init__(self, transforms, keymap=None, update_pad_shape=False):
if albumentations is None:
raise RuntimeError('albumentations is not installed')
else:
from albumentations import Compose
self.transforms = transforms
self.filter_lost_elements = False
self.update_pad_shape = update_pad_shape
self.aug = Compose([self.albu_builder(t) for t in self.transforms])
if not keymap:
self.keymap_to_albu = {
'img': 'image',
}
else:
self.keymap_to_albu = keymap
self.keymap_back = {v: k for k, v in self.keymap_to_albu.items()}
def albu_builder(self, cfg):
"""Import a module from albumentations.
It inherits some of :func:`build_from_cfg` logic.
Args:
cfg (dict): Config dict. It should at least contain the key "type".
Returns:
obj: The constructed object.
"""
assert isinstance(cfg, dict) and 'type' in cfg
args = cfg.copy()
obj_type = args.pop('type')
if mmcv.is_str(obj_type):
if albumentations is None:
raise RuntimeError('albumentations is not installed')
obj_cls = getattr(albumentations, obj_type)
elif inspect.isclass(obj_type):
obj_cls = obj_type
else:
raise TypeError(
f'type must be a str or valid type, but got {type(obj_type)}')
if 'transforms' in args:
args['transforms'] = [
self.albu_builder(transform)
for transform in args['transforms']
]
return obj_cls(**args)
@staticmethod
def mapper(d, keymap):
"""Dictionary mapper.
Renames keys according to keymap provided.
Args:
d (dict): old dict
keymap (dict): {'old_key':'new_key'}
Returns:
dict: new dict.
"""
updated_dict = {}
for k, v in zip(d.keys(), d.values()):
new_k = keymap.get(k, k)
updated_dict[new_k] = d[k]
return updated_dict
def __call__(self, results):
# dict to albumentations format
results = self.mapper(results, self.keymap_to_albu)
results = self.aug(**results)
if 'gt_labels' in results:
if isinstance(results['gt_labels'], list):
results['gt_labels'] = np.array(results['gt_labels'])
results['gt_labels'] = results['gt_labels'].astype(np.int64)
# back to the original format
results = self.mapper(results, self.keymap_back)
# update final shape
if self.update_pad_shape:
results['pad_shape'] = results['img'].shape
return results
def __repr__(self):
repr_str = self.__class__.__name__ + f'(transforms={self.transforms})'
return repr_str
from .distributed_sampler import DistributedSampler
__all__ = ['DistributedSampler']
import torch
from torch.utils.data import DistributedSampler as _DistributedSampler
class DistributedSampler(_DistributedSampler):
def __init__(self,
dataset,
num_replicas=None,
rank=None,
shuffle=True,
round_up=True):
super().__init__(dataset, num_replicas=num_replicas, rank=rank)
self.shuffle = shuffle
self.round_up = round_up
if self.round_up:
self.total_size = self.num_samples * self.num_replicas
else:
self.total_size = len(self.dataset)
def __iter__(self):
# deterministically shuffle based on epoch
if self.shuffle:
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = torch.arange(len(self.dataset)).tolist()
# add extra samples to make it evenly divisible
if self.round_up:
indices = (
indices *
int(self.total_size / len(indices) + 1))[:self.total_size]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
if self.round_up:
assert len(indices) == self.num_samples
return iter(indices)
import gzip
import hashlib
import os
import os.path
import shutil
import tarfile
import urllib.error
import urllib.request
import zipfile
__all__ = ['rm_suffix', 'check_integrity', 'download_and_extract_archive']
def rm_suffix(s, suffix=None):
if suffix is None:
return s[:s.rfind('.')]
else:
return s[:s.rfind(suffix)]
def calculate_md5(fpath, chunk_size=1024 * 1024):
md5 = hashlib.md5()
with open(fpath, 'rb') as f:
for chunk in iter(lambda: f.read(chunk_size), b''):
md5.update(chunk)
return md5.hexdigest()
def check_md5(fpath, md5, **kwargs):
return md5 == calculate_md5(fpath, **kwargs)
def check_integrity(fpath, md5=None):
if not os.path.isfile(fpath):
return False
if md5 is None:
return True
return check_md5(fpath, md5)
def download_url_to_file(url, fpath):
with urllib.request.urlopen(url) as resp, open(fpath, 'wb') as of:
shutil.copyfileobj(resp, of)
def download_url(url, root, filename=None, md5=None):
"""Download a file from a url and place it in root.
Args:
url (str): URL to download file from.
root (str): Directory to place downloaded file in.
filename (str | None): Name to save the file under.
If filename is None, use the basename of the URL.
md5 (str | None): MD5 checksum of the download.
If md5 is None, download without md5 check.
"""
root = os.path.expanduser(root)
if not filename:
filename = os.path.basename(url)
fpath = os.path.join(root, filename)
os.makedirs(root, exist_ok=True)
if check_integrity(fpath, md5):
print(f'Using downloaded and verified file: {fpath}')
else:
try:
print(f'Downloading {url} to {fpath}')
download_url_to_file(url, fpath)
except (urllib.error.URLError, IOError) as e:
if url[:5] == 'https':
url = url.replace('https:', 'http:')
print('Failed download. Trying https -> http instead.'
f' Downloading {url} to {fpath}')
download_url_to_file(url, fpath)
else:
raise e
# check integrity of downloaded file
if not check_integrity(fpath, md5):
raise RuntimeError('File not found or corrupted.')
def _is_tarxz(filename):
return filename.endswith('.tar.xz')
def _is_tar(filename):
return filename.endswith('.tar')
def _is_targz(filename):
return filename.endswith('.tar.gz')
def _is_tgz(filename):
return filename.endswith('.tgz')
def _is_gzip(filename):
return filename.endswith('.gz') and not filename.endswith('.tar.gz')
def _is_zip(filename):
return filename.endswith('.zip')
def extract_archive(from_path, to_path=None, remove_finished=False):
if to_path is None:
to_path = os.path.dirname(from_path)
if _is_tar(from_path):
with tarfile.open(from_path, 'r') as tar:
tar.extractall(path=to_path)
elif _is_targz(from_path) or _is_tgz(from_path):
with tarfile.open(from_path, 'r:gz') as tar:
tar.extractall(path=to_path)
elif _is_tarxz(from_path):
with tarfile.open(from_path, 'r:xz') as tar:
tar.extractall(path=to_path)
elif _is_gzip(from_path):
to_path = os.path.join(
to_path,
os.path.splitext(os.path.basename(from_path))[0])
with open(to_path, 'wb') as out_f, gzip.GzipFile(from_path) as zip_f:
out_f.write(zip_f.read())
elif _is_zip(from_path):
with zipfile.ZipFile(from_path, 'r') as z:
z.extractall(to_path)
else:
raise ValueError(f'Extraction of {from_path} not supported')
if remove_finished:
os.remove(from_path)
def download_and_extract_archive(url,
download_root,
extract_root=None,
filename=None,
md5=None,
remove_finished=False):
download_root = os.path.expanduser(download_root)
if extract_root is None:
extract_root = download_root
if not filename:
filename = os.path.basename(url)
download_url(url, download_root, filename, md5)
archive = os.path.join(download_root, filename)
print(f'Extracting {archive} to {extract_root}')
extract_archive(archive, extract_root, remove_finished)
import os.path as osp
import xml.etree.ElementTree as ET
import mmcv
import numpy as np
from .builder import DATASETS
from .multi_label import MultiLabelDataset
@DATASETS.register_module()
class VOC(MultiLabelDataset):
"""`Pascal VOC <http://host.robots.ox.ac.uk/pascal/VOC/>`_ Dataset."""
CLASSES = ('aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car',
'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse',
'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train',
'tvmonitor')
def __init__(self, **kwargs):
super(VOC, self).__init__(**kwargs)
if 'VOC2007' in self.data_prefix:
self.year = 2007
else:
raise ValueError('Cannot infer dataset year from img_prefix.')
def load_annotations(self):
"""Load annotations.
Returns:
list[dict]: Annotation info from XML file.
"""
data_infos = []
img_ids = mmcv.list_from_file(self.ann_file)
for img_id in img_ids:
filename = f'JPEGImages/{img_id}.jpg'
xml_path = osp.join(self.data_prefix, 'Annotations',
f'{img_id}.xml')
tree = ET.parse(xml_path)
root = tree.getroot()
labels = []
labels_difficult = []
for obj in root.findall('object'):
label_name = obj.find('name').text
# in case customized dataset has wrong labels
# or CLASSES has been override.
if label_name not in self.CLASSES:
continue
label = self.class_to_idx[label_name]
difficult = int(obj.find('difficult').text)
if difficult:
labels_difficult.append(label)
else:
labels.append(label)
gt_label = np.zeros(len(self.CLASSES))
# The order cannot be swapped for the case where multiple objects
# of the same kind exist and some are difficult.
gt_label[labels_difficult] = -1
gt_label[labels] = 1
info = dict(
img_prefix=self.data_prefix,
img_info=dict(filename=filename),
gt_label=gt_label.astype(np.int8))
data_infos.append(info)
return data_infos
from .backbones import * # noqa: F401,F403
from .builder import (BACKBONES, CLASSIFIERS, HEADS, LOSSES, NECKS,
build_backbone, build_classifier, build_head, build_loss,
build_neck)
from .classifiers import * # noqa: F401,F403
from .heads import * # noqa: F401,F403
from .losses import * # noqa: F401,F403
from .necks import * # noqa: F401,F403
__all__ = [
'BACKBONES', 'HEADS', 'NECKS', 'LOSSES', 'CLASSIFIERS', 'build_backbone',
'build_head', 'build_neck', 'build_loss', 'build_classifier'
]
from .alexnet import AlexNet
from .lenet import LeNet5
from .mobilenet_v2 import MobileNetV2
from .mobilenet_v3 import MobileNetv3
from .regnet import RegNet
from .resnest import ResNeSt
from .resnet import ResNet, ResNetV1d
from .resnet_cifar import ResNet_CIFAR
from .resnext import ResNeXt
from .seresnet import SEResNet
from .seresnext import SEResNeXt
from .shufflenet_v1 import ShuffleNetV1
from .shufflenet_v2 import ShuffleNetV2
from .vgg import VGG
from .vision_transformer import VisionTransformer
__all__ = [
'LeNet5', 'AlexNet', 'VGG', 'RegNet', 'ResNet', 'ResNeXt', 'ResNetV1d',
'ResNeSt', 'ResNet_CIFAR', 'SEResNet', 'SEResNeXt', 'ShuffleNetV1',
'ShuffleNetV2', 'MobileNetV2', 'MobileNetv3', 'VisionTransformer'
]
import torch.nn as nn
from ..builder import BACKBONES
from .base_backbone import BaseBackbone
@BACKBONES.register_module()
class AlexNet(BaseBackbone):
"""`AlexNet <https://en.wikipedia.org/wiki/AlexNet>`_ backbone.
The input for AlexNet is a 224x224 RGB image.
Args:
num_classes (int): number of classes for classification.
The default value is -1, which uses the backbone as
a feature extractor without the top classifier.
"""
def __init__(self, num_classes=-1):
super(AlexNet, self).__init__()
self.num_classes = num_classes
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(64, 192, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(192, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
if self.num_classes > 0:
self.classifier = nn.Sequential(
nn.Dropout(),
nn.Linear(256 * 6 * 6, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
if self.num_classes > 0:
x = x.view(x.size(0), 256 * 6 * 6)
x = self.classifier(x)
return x
from abc import ABCMeta, abstractmethod
from mmcv.runner import BaseModule
class BaseBackbone(BaseModule, metaclass=ABCMeta):
"""Base backbone.
This class defines the basic functions of a backbone. Any backbone that
inherits this class should at least define its own `forward` function.
"""
def __init__(self, init_cfg=None):
super(BaseBackbone, self).__init__(init_cfg)
@abstractmethod
def forward(self, x):
"""Forward computation.
Args:
x (tensor | tuple[tensor]): x could be a Torch.tensor or a tuple of
Torch.tensor, containing input data for forward computation.
"""
pass
def train(self, mode=True):
"""Set module status before forward computation.
Args:
mode (bool): Whether it is train_mode or test_mode
"""
super(BaseBackbone, self).train(mode)
import torch.nn as nn
from ..builder import BACKBONES
from .base_backbone import BaseBackbone
@BACKBONES.register_module()
class LeNet5(BaseBackbone):
"""`LeNet5 <https://en.wikipedia.org/wiki/LeNet>`_ backbone.
The input for LeNet-5 is a 32×32 grayscale image.
Args:
num_classes (int): number of classes for classification.
The default value is -1, which uses the backbone as
a feature extractor without the top classifier.
"""
def __init__(self, num_classes=-1):
super(LeNet5, self).__init__()
self.num_classes = num_classes
self.features = nn.Sequential(
nn.Conv2d(1, 6, kernel_size=5, stride=1), nn.Tanh(),
nn.AvgPool2d(kernel_size=2),
nn.Conv2d(6, 16, kernel_size=5, stride=1), nn.Tanh(),
nn.AvgPool2d(kernel_size=2),
nn.Conv2d(16, 120, kernel_size=5, stride=1), nn.Tanh())
if self.num_classes > 0:
self.classifier = nn.Sequential(
nn.Linear(120, 84),
nn.Tanh(),
nn.Linear(84, num_classes),
)
def forward(self, x):
x = self.features(x)
if self.num_classes > 0:
x = self.classifier(x.squeeze())
return x
import logging
import torch.nn as nn
import torch.utils.checkpoint as cp
from mmcv.cnn import ConvModule, constant_init, kaiming_init
from mmcv.runner import load_checkpoint
from torch.nn.modules.batchnorm import _BatchNorm
from mmcls.models.utils import make_divisible
from ..builder import BACKBONES
from .base_backbone import BaseBackbone
class InvertedResidual(nn.Module):
"""InvertedResidual block for MobileNetV2.
Args:
in_channels (int): The input channels of the InvertedResidual block.
out_channels (int): The output channels of the InvertedResidual block.
stride (int): Stride of the middle (first) 3x3 convolution.
expand_ratio (int): adjusts number of channels of the hidden layer
in InvertedResidual by this amount.
conv_cfg (dict, optional): Config dict for convolution layer.
Default: None, which means using conv2d.
norm_cfg (dict): Config dict for normalization layer.
Default: dict(type='BN').
act_cfg (dict): Config dict for activation layer.
Default: dict(type='ReLU6').
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
memory while slowing down the training speed. Default: False.
Returns:
Tensor: The output tensor
"""
def __init__(self,
in_channels,
out_channels,
stride,
expand_ratio,
conv_cfg=None,
norm_cfg=dict(type='BN'),
act_cfg=dict(type='ReLU6'),
with_cp=False):
super(InvertedResidual, self).__init__()
self.stride = stride
assert stride in [1, 2], f'stride must in [1, 2]. ' \
f'But received {stride}.'
self.with_cp = with_cp
self.use_res_connect = self.stride == 1 and in_channels == out_channels
hidden_dim = int(round(in_channels * expand_ratio))
layers = []
if expand_ratio != 1:
layers.append(
ConvModule(
in_channels=in_channels,
out_channels=hidden_dim,
kernel_size=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg))
layers.extend([
ConvModule(
in_channels=hidden_dim,
out_channels=hidden_dim,
kernel_size=3,
stride=stride,
padding=1,
groups=hidden_dim,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg),
ConvModule(
in_channels=hidden_dim,
out_channels=out_channels,
kernel_size=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=None)
])
self.conv = nn.Sequential(*layers)
def forward(self, x):
def _inner_forward(x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
if self.with_cp and x.requires_grad:
out = cp.checkpoint(_inner_forward, x)
else:
out = _inner_forward(x)
return out
@BACKBONES.register_module()
class MobileNetV2(BaseBackbone):
"""MobileNetV2 backbone.
Args:
widen_factor (float): Width multiplier, multiply number of
channels in each layer by this amount. Default: 1.0.
out_indices (None or Sequence[int]): Output from which stages.
Default: (7, ).
frozen_stages (int): Stages to be frozen (all param fixed).
Default: -1, which means not freezing any parameters.
conv_cfg (dict, optional): Config dict for convolution layer.
Default: None, which means using conv2d.
norm_cfg (dict): Config dict for normalization layer.
Default: dict(type='BN').
act_cfg (dict): Config dict for activation layer.
Default: dict(type='ReLU6').
norm_eval (bool): Whether to set norm layers to eval mode, namely,
freeze running stats (mean and var). Note: Effect on Batch Norm
and its variants only. Default: False.
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
memory while slowing down the training speed. Default: False.
"""
# Parameters to build layers. 4 parameters are needed to construct a
# layer, from left to right: expand_ratio, channel, num_blocks, stride.
arch_settings = [[1, 16, 1, 1], [6, 24, 2, 2], [6, 32, 3, 2],
[6, 64, 4, 2], [6, 96, 3, 1], [6, 160, 3, 2],
[6, 320, 1, 1]]
def __init__(self,
widen_factor=1.,
out_indices=(7, ),
frozen_stages=-1,
conv_cfg=None,
norm_cfg=dict(type='BN'),
act_cfg=dict(type='ReLU6'),
norm_eval=False,
with_cp=False,
init_cfg=[
dict(type='Kaiming', layer=['Conv2d']),
dict(
type='Constant',
val=1,
layer=['_BatchNorm', 'GroupNorm'])
]):
super(MobileNetV2, self).__init__(init_cfg)
self.widen_factor = widen_factor
self.out_indices = out_indices
for index in out_indices:
if index not in range(0, 8):
raise ValueError('the item in out_indices must in '
f'range(0, 8). But received {index}')
if frozen_stages not in range(-1, 8):
raise ValueError('frozen_stages must be in range(-1, 8). '
f'But received {frozen_stages}')
self.out_indices = out_indices
self.frozen_stages = frozen_stages
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
self.act_cfg = act_cfg
self.norm_eval = norm_eval
self.with_cp = with_cp
self.in_channels = make_divisible(32 * widen_factor, 8)
self.conv1 = ConvModule(
in_channels=3,
out_channels=self.in_channels,
kernel_size=3,
stride=2,
padding=1,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg,
act_cfg=self.act_cfg)
self.layers = []
for i, layer_cfg in enumerate(self.arch_settings):
expand_ratio, channel, num_blocks, stride = layer_cfg
out_channels = make_divisible(channel * widen_factor, 8)
inverted_res_layer = self.make_layer(
out_channels=out_channels,
num_blocks=num_blocks,
stride=stride,
expand_ratio=expand_ratio)
layer_name = f'layer{i + 1}'
self.add_module(layer_name, inverted_res_layer)
self.layers.append(layer_name)
if widen_factor > 1.0:
self.out_channel = int(1280 * widen_factor)
else:
self.out_channel = 1280
layer = ConvModule(
in_channels=self.in_channels,
out_channels=self.out_channel,
kernel_size=1,
stride=1,
padding=0,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg,
act_cfg=self.act_cfg)
self.add_module('conv2', layer)
self.layers.append('conv2')
def make_layer(self, out_channels, num_blocks, stride, expand_ratio):
"""Stack InvertedResidual blocks to build a layer for MobileNetV2.
Args:
out_channels (int): out_channels of block.
num_blocks (int): number of blocks.
stride (int): stride of the first block. Default: 1
expand_ratio (int): Expand the number of channels of the
hidden layer in InvertedResidual by this ratio. Default: 6.
"""
layers = []
for i in range(num_blocks):
if i >= 1:
stride = 1
layers.append(
InvertedResidual(
self.in_channels,
out_channels,
stride,
expand_ratio=expand_ratio,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg,
act_cfg=self.act_cfg,
with_cp=self.with_cp))
self.in_channels = out_channels
return nn.Sequential(*layers)
def init_weights(self, pretrained=None):
if isinstance(pretrained, str):
logger = logging.getLogger()
load_checkpoint(self, pretrained, strict=False, logger=logger)
elif pretrained is None:
for m in self.modules():
if isinstance(m, nn.Conv2d):
kaiming_init(m)
elif isinstance(m, (_BatchNorm, nn.GroupNorm)):
constant_init(m, 1)
else:
raise TypeError('pretrained must be a str or None')
def forward(self, x):
x = self.conv1(x)
outs = []
for i, layer_name in enumerate(self.layers):
layer = getattr(self, layer_name)
x = layer(x)
if i in self.out_indices:
outs.append(x)
if len(outs) == 1:
return outs[0]
else:
return tuple(outs)
def _freeze_stages(self):
if self.frozen_stages >= 0:
for param in self.conv1.parameters():
param.requires_grad = False
for i in range(1, self.frozen_stages + 1):
layer = getattr(self, f'layer{i}')
layer.eval()
for param in layer.parameters():
param.requires_grad = False
def train(self, mode=True):
super(MobileNetV2, self).train(mode)
self._freeze_stages()
if mode and self.norm_eval:
for m in self.modules():
if isinstance(m, _BatchNorm):
m.eval()
from mmcv.cnn import ConvModule
from torch.nn.modules.batchnorm import _BatchNorm
from ..builder import BACKBONES
from ..utils import InvertedResidual
from .base_backbone import BaseBackbone
@BACKBONES.register_module()
class MobileNetv3(BaseBackbone):
"""MobileNetv3 backbone.
Args:
arch (str): Architechture of mobilnetv3, from {small, big}.
Default: small.
conv_cfg (dict, optional): Config dict for convolution layer.
Default: None, which means using conv2d.
norm_cfg (dict): Config dict for normalization layer.
Default: dict(type='BN').
out_indices (None or Sequence[int]): Output from which stages.
Default: (10, ), which means output tensors from final stage.
frozen_stages (int): Stages to be frozen (all param fixed).
Defualt: -1, which means not freezing any parameters.
norm_eval (bool): Whether to set norm layers to eval mode, namely,
freeze running stats (mean and var). Note: Effect on Batch Norm
and its variants only. Default: False.
with_cp (bool): Use checkpoint or not. Using checkpoint will save
some memory while slowing down the training speed.
Defualt: False.
"""
# Parameters to build each block:
# [kernel size, mid channels, out channels, with_se, act type, stride]
arch_settings = {
'small': [[3, 16, 16, True, 'ReLU', 2],
[3, 72, 24, False, 'ReLU', 2],
[3, 88, 24, False, 'ReLU', 1],
[5, 96, 40, True, 'HSwish', 2],
[5, 240, 40, True, 'HSwish', 1],
[5, 240, 40, True, 'HSwish', 1],
[5, 120, 48, True, 'HSwish', 1],
[5, 144, 48, True, 'HSwish', 1],
[5, 288, 96, True, 'HSwish', 2],
[5, 576, 96, True, 'HSwish', 1],
[5, 576, 96, True, 'HSwish', 1]],
'big': [[3, 16, 16, False, 'ReLU', 1],
[3, 64, 24, False, 'ReLU', 2],
[3, 72, 24, False, 'ReLU', 1],
[5, 72, 40, True, 'ReLU', 2],
[5, 120, 40, True, 'ReLU', 1],
[5, 120, 40, True, 'ReLU', 1],
[3, 240, 80, False, 'HSwish', 2],
[3, 200, 80, False, 'HSwish', 1],
[3, 184, 80, False, 'HSwish', 1],
[3, 184, 80, False, 'HSwish', 1],
[3, 480, 112, True, 'HSwish', 1],
[3, 672, 112, True, 'HSwish', 1],
[5, 672, 160, True, 'HSwish', 1],
[5, 672, 160, True, 'HSwish', 2],
[5, 960, 160, True, 'HSwish', 1]]
} # yapf: disable
def __init__(self,
arch='small',
conv_cfg=None,
norm_cfg=dict(type='BN'),
out_indices=(10, ),
frozen_stages=-1,
norm_eval=False,
with_cp=False,
init_cfg=[
dict(type='Kaiming', layer=['Conv2d']),
dict(type='Constant', val=1, layer=['BatchNorm2d'])
]):
super(MobileNetv3, self).__init__(init_cfg)
assert arch in self.arch_settings
for index in out_indices:
if index not in range(0, len(self.arch_settings[arch])):
raise ValueError('the item in out_indices must in '
f'range(0, {len(self.arch_settings[arch])}). '
f'But received {index}')
if frozen_stages not in range(-1, len(self.arch_settings[arch])):
raise ValueError('frozen_stages must be in range(-1, '
f'{len(self.arch_settings[arch])}). '
f'But received {frozen_stages}')
self.out_indices = out_indices
self.frozen_stages = frozen_stages
self.arch = arch
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
self.out_indices = out_indices
self.frozen_stages = frozen_stages
self.norm_eval = norm_eval
self.with_cp = with_cp
self.in_channels = 16
self.conv1 = ConvModule(
in_channels=3,
out_channels=self.in_channels,
kernel_size=3,
stride=2,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=dict(type='HSwish'))
self.layers = self._make_layer()
self.feat_dim = self.arch_settings[arch][-1][2]
def _make_layer(self):
layers = []
layer_setting = self.arch_settings[self.arch]
for i, params in enumerate(layer_setting):
(kernel_size, mid_channels, out_channels, with_se, act,
stride) = params
if with_se:
se_cfg = dict(
channels=mid_channels,
ratio=4,
act_cfg=(dict(type='ReLU'), dict(type='HSigmoid')))
else:
se_cfg = None
layer = InvertedResidual(
in_channels=self.in_channels,
out_channels=out_channels,
mid_channels=mid_channels,
kernel_size=kernel_size,
stride=stride,
se_cfg=se_cfg,
with_expand_conv=True,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg,
act_cfg=dict(type=act),
with_cp=self.with_cp)
self.in_channels = out_channels
layer_name = 'layer{}'.format(i + 1)
self.add_module(layer_name, layer)
layers.append(layer_name)
return layers
def forward(self, x):
x = self.conv1(x)
outs = []
for i, layer_name in enumerate(self.layers):
layer = getattr(self, layer_name)
x = layer(x)
if i in self.out_indices:
outs.append(x)
if len(outs) == 1:
return outs[0]
else:
return tuple(outs)
def _freeze_stages(self):
if self.frozen_stages >= 0:
for param in self.conv1.parameters():
param.requires_grad = False
for i in range(1, self.frozen_stages + 1):
layer = getattr(self, f'layer{i}')
layer.eval()
for param in layer.parameters():
param.requires_grad = False
def train(self, mode=True):
super(MobileNetv3, self).train(mode)
self._freeze_stages()
if mode and self.norm_eval:
for m in self.modules():
if isinstance(m, _BatchNorm):
m.eval()
import numpy as np
import torch.nn as nn
from mmcv.cnn import build_conv_layer, build_norm_layer
from ..builder import BACKBONES
from .resnet import ResNet
from .resnext import Bottleneck
@BACKBONES.register_module()
class RegNet(ResNet):
"""RegNet backbone.
More details can be found in `paper <https://arxiv.org/abs/2003.13678>`_ .
Args:
arch (dict): The parameter of RegNets.
- w0 (int): initial width
- wa (float): slope of width
- wm (float): quantization parameter to quantize the width
- depth (int): depth of the backbone
- group_w (int): width of group
- bot_mul (float): bottleneck ratio, i.e. expansion of bottlneck.
strides (Sequence[int]): Strides of the first block of each stage.
base_channels (int): Base channels after stem layer.
in_channels (int): Number of input image channels. Default: 3.
dilations (Sequence[int]): Dilation of each stage.
out_indices (Sequence[int]): Output from which stages.
style (str): `pytorch` or `caffe`. If set to "pytorch", the stride-two
layer is the 3x3 conv layer, otherwise the stride-two layer is
the first 1x1 conv layer. Default: "pytorch".
frozen_stages (int): Stages to be frozen (all param fixed). -1 means
not freezing any parameters. Default: -1.
norm_cfg (dict): dictionary to construct and config norm layer.
Default: dict(type='BN', requires_grad=True).
norm_eval (bool): Whether to set norm layers to eval mode, namely,
freeze running stats (mean and var). Note: Effect on Batch Norm
and its variants only. Default: False.
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
memory while slowing down the training speed. Default: False.
zero_init_residual (bool): whether to use zero init for last norm layer
in resblocks to let them behave as identity. Default: True.
Example:
>>> from mmdet.models import RegNet
>>> import torch
>>> self = RegNet(
arch=dict(
w0=88,
wa=26.31,
wm=2.25,
group_w=48,
depth=25,
bot_mul=1.0))
>>> self.eval()
>>> inputs = torch.rand(1, 3, 32, 32)
>>> level_outputs = self.forward(inputs)
>>> for level_out in level_outputs:
... print(tuple(level_out.shape))
(1, 96, 8, 8)
(1, 192, 4, 4)
(1, 432, 2, 2)
(1, 1008, 1, 1)
"""
arch_settings = {
'regnetx_400mf':
dict(w0=24, wa=24.48, wm=2.54, group_w=16, depth=22, bot_mul=1.0),
'regnetx_800mf':
dict(w0=56, wa=35.73, wm=2.28, group_w=16, depth=16, bot_mul=1.0),
'regnetx_1.6gf':
dict(w0=80, wa=34.01, wm=2.25, group_w=24, depth=18, bot_mul=1.0),
'regnetx_3.2gf':
dict(w0=88, wa=26.31, wm=2.25, group_w=48, depth=25, bot_mul=1.0),
'regnetx_4.0gf':
dict(w0=96, wa=38.65, wm=2.43, group_w=40, depth=23, bot_mul=1.0),
'regnetx_6.4gf':
dict(w0=184, wa=60.83, wm=2.07, group_w=56, depth=17, bot_mul=1.0),
'regnetx_8.0gf':
dict(w0=80, wa=49.56, wm=2.88, group_w=120, depth=23, bot_mul=1.0),
'regnetx_12gf':
dict(w0=168, wa=73.36, wm=2.37, group_w=112, depth=19, bot_mul=1.0),
}
def __init__(self,
arch,
in_channels=3,
stem_channels=32,
base_channels=32,
strides=(2, 2, 2, 2),
dilations=(1, 1, 1, 1),
out_indices=(3, ),
style='pytorch',
deep_stem=False,
avg_down=False,
frozen_stages=-1,
conv_cfg=None,
norm_cfg=dict(type='BN', requires_grad=True),
norm_eval=False,
with_cp=False,
zero_init_residual=True,
init_cfg=None):
super(ResNet, self).__init__(init_cfg)
# Generate RegNet parameters first
if isinstance(arch, str):
assert arch in self.arch_settings, \
f'"arch": "{arch}" is not one of the' \
' arch_settings'
arch = self.arch_settings[arch]
elif not isinstance(arch, dict):
raise TypeError('Expect "arch" to be either a string '
f'or a dict, got {type(arch)}')
widths, num_stages = self.generate_regnet(
arch['w0'],
arch['wa'],
arch['wm'],
arch['depth'],
)
# Convert to per stage format
stage_widths, stage_blocks = self.get_stages_from_blocks(widths)
# Generate group widths and bot muls
group_widths = [arch['group_w'] for _ in range(num_stages)]
self.bottleneck_ratio = [arch['bot_mul'] for _ in range(num_stages)]
# Adjust the compatibility of stage_widths and group_widths
stage_widths, group_widths = self.adjust_width_group(
stage_widths, self.bottleneck_ratio, group_widths)
# Group params by stage
self.stage_widths = stage_widths
self.group_widths = group_widths
self.depth = sum(stage_blocks)
self.stem_channels = stem_channels
self.base_channels = base_channels
self.num_stages = num_stages
assert num_stages >= 1 and num_stages <= 4
self.strides = strides
self.dilations = dilations
assert len(strides) == len(dilations) == num_stages
self.out_indices = out_indices
assert max(out_indices) < num_stages
self.style = style
self.deep_stem = deep_stem
if self.deep_stem:
raise NotImplementedError(
'deep_stem has not been implemented for RegNet')
self.avg_down = avg_down
self.frozen_stages = frozen_stages
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
self.with_cp = with_cp
self.norm_eval = norm_eval
self.zero_init_residual = zero_init_residual
self.stage_blocks = stage_blocks[:num_stages]
self._make_stem_layer(in_channels, stem_channels)
_in_channels = stem_channels
self.res_layers = []
for i, num_blocks in enumerate(self.stage_blocks):
stride = self.strides[i]
dilation = self.dilations[i]
group_width = self.group_widths[i]
width = int(round(self.stage_widths[i] * self.bottleneck_ratio[i]))
stage_groups = width // group_width
res_layer = self.make_res_layer(
block=Bottleneck,
num_blocks=num_blocks,
in_channels=_in_channels,
out_channels=self.stage_widths[i],
expansion=1,
stride=stride,
dilation=dilation,
style=self.style,
avg_down=self.avg_down,
with_cp=self.with_cp,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg,
base_channels=self.stage_widths[i],
groups=stage_groups,
width_per_group=group_width)
_in_channels = self.stage_widths[i]
layer_name = f'layer{i + 1}'
self.add_module(layer_name, res_layer)
self.res_layers.append(layer_name)
self._freeze_stages()
self.feat_dim = stage_widths[-1]
def _make_stem_layer(self, in_channels, base_channels):
self.conv1 = build_conv_layer(
self.conv_cfg,
in_channels,
base_channels,
kernel_size=3,
stride=2,
padding=1,
bias=False)
self.norm1_name, norm1 = build_norm_layer(
self.norm_cfg, base_channels, postfix=1)
self.add_module(self.norm1_name, norm1)
self.relu = nn.ReLU(inplace=True)
def generate_regnet(self,
initial_width,
width_slope,
width_parameter,
depth,
divisor=8):
"""Generates per block width from RegNet parameters.
Args:
initial_width ([int]): Initial width of the backbone
width_slope ([float]): Slope of the quantized linear function
width_parameter ([int]): Parameter used to quantize the width.
depth ([int]): Depth of the backbone.
divisor (int): The divisor of channels. Defaults to 8.
Returns:
list, int: return a list of widths of each stage and the number of
stages
"""
assert width_slope >= 0
assert initial_width > 0
assert width_parameter > 1
assert initial_width % divisor == 0
widths_cont = np.arange(depth) * width_slope + initial_width
ks = np.round(
np.log(widths_cont / initial_width) / np.log(width_parameter))
widths = initial_width * np.power(width_parameter, ks)
widths = np.round(np.divide(widths, divisor)) * divisor
num_stages = len(np.unique(widths))
widths, widths_cont = widths.astype(int).tolist(), widths_cont.tolist()
return widths, num_stages
@staticmethod
def quantize_float(number, divisor):
"""Converts a float to closest non-zero int divisible by divior.
Args:
number (int): Original number to be quantized.
divisor (int): Divisor used to quantize the number.
Returns:
int: quantized number that is divisible by devisor.
"""
return int(round(number / divisor) * divisor)
def adjust_width_group(self, widths, bottleneck_ratio, groups):
"""Adjusts the compatibility of widths and groups.
Args:
widths (list[int]): Width of each stage.
bottleneck_ratio (float): Bottleneck ratio.
groups (int): number of groups in each stage
Returns:
tuple(list): The adjusted widths and groups of each stage.
"""
bottleneck_width = [
int(w * b) for w, b in zip(widths, bottleneck_ratio)
]
groups = [min(g, w_bot) for g, w_bot in zip(groups, bottleneck_width)]
bottleneck_width = [
self.quantize_float(w_bot, g)
for w_bot, g in zip(bottleneck_width, groups)
]
widths = [
int(w_bot / b)
for w_bot, b in zip(bottleneck_width, bottleneck_ratio)
]
return widths, groups
def get_stages_from_blocks(self, widths):
"""Gets widths/stage_blocks of network at each stage.
Args:
widths (list[int]): Width in each stage.
Returns:
tuple(list): width and depth of each stage
"""
width_diff = [
width != width_prev
for width, width_prev in zip(widths + [0], [0] + widths)
]
stage_widths = [
width for width, diff in zip(widths, width_diff[:-1]) if diff
]
stage_blocks = np.diff([
depth for depth, diff in zip(range(len(width_diff)), width_diff)
if diff
]).tolist()
return stage_widths, stage_blocks
def forward(self, x):
x = self.conv1(x)
x = self.norm1(x)
x = self.relu(x)
outs = []
for i, layer_name in enumerate(self.res_layers):
res_layer = getattr(self, layer_name)
x = res_layer(x)
if i in self.out_indices:
outs.append(x)
if len(outs) == 1:
return outs[0]
else:
return tuple(outs)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.checkpoint as cp
from mmcv.cnn import build_conv_layer, build_norm_layer
from ..builder import BACKBONES
from .resnet import Bottleneck as _Bottleneck
from .resnet import ResLayer, ResNetV1d
class RSoftmax(nn.Module):
"""Radix Softmax module in ``SplitAttentionConv2d``.
Args:
radix (int): Radix of input.
groups (int): Groups of input.
"""
def __init__(self, radix, groups):
super().__init__()
self.radix = radix
self.groups = groups
def forward(self, x):
batch = x.size(0)
if self.radix > 1:
x = x.view(batch, self.groups, self.radix, -1).transpose(1, 2)
x = F.softmax(x, dim=1)
x = x.reshape(batch, -1)
else:
x = torch.sigmoid(x)
return x
class SplitAttentionConv2d(nn.Module):
"""Split-Attention Conv2d.
Args:
in_channels (int): Same as nn.Conv2d.
out_channels (int): Same as nn.Conv2d.
kernel_size (int | tuple[int]): Same as nn.Conv2d.
stride (int | tuple[int]): Same as nn.Conv2d.
padding (int | tuple[int]): Same as nn.Conv2d.
dilation (int | tuple[int]): Same as nn.Conv2d.
groups (int): Same as nn.Conv2d.
radix (int): Radix of SpltAtConv2d. Default: 2
reduction_factor (int): Reduction factor of SplitAttentionConv2d.
Default: 4.
conv_cfg (dict, optional): Config dict for convolution layer.
Default: None, which means using conv2d.
norm_cfg (dict, optional): Config dict for normalization layer.
Default: None.
"""
def __init__(self,
in_channels,
channels,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups=1,
radix=2,
reduction_factor=4,
conv_cfg=None,
norm_cfg=dict(type='BN')):
super(SplitAttentionConv2d, self).__init__()
inter_channels = max(in_channels * radix // reduction_factor, 32)
self.radix = radix
self.groups = groups
self.channels = channels
self.conv = build_conv_layer(
conv_cfg,
in_channels,
channels * radix,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups * radix,
bias=False)
self.norm0_name, norm0 = build_norm_layer(
norm_cfg, channels * radix, postfix=0)
self.add_module(self.norm0_name, norm0)
self.relu = nn.ReLU(inplace=True)
self.fc1 = build_conv_layer(
None, channels, inter_channels, 1, groups=self.groups)
self.norm1_name, norm1 = build_norm_layer(
norm_cfg, inter_channels, postfix=1)
self.add_module(self.norm1_name, norm1)
self.fc2 = build_conv_layer(
None, inter_channels, channels * radix, 1, groups=self.groups)
self.rsoftmax = RSoftmax(radix, groups)
@property
def norm0(self):
return getattr(self, self.norm0_name)
@property
def norm1(self):
return getattr(self, self.norm1_name)
def forward(self, x):
x = self.conv(x)
x = self.norm0(x)
x = self.relu(x)
batch, rchannel = x.shape[:2]
if self.radix > 1:
splits = x.view(batch, self.radix, -1, *x.shape[2:])
gap = splits.sum(dim=1)
else:
gap = x
gap = F.adaptive_avg_pool2d(gap, 1)
gap = self.fc1(gap)
gap = self.norm1(gap)
gap = self.relu(gap)
atten = self.fc2(gap)
atten = self.rsoftmax(atten).view(batch, -1, 1, 1)
if self.radix > 1:
attens = atten.view(batch, self.radix, -1, *atten.shape[2:])
out = torch.sum(attens * splits, dim=1)
else:
out = atten * x
return out.contiguous()
class Bottleneck(_Bottleneck):
"""Bottleneck block for ResNeSt.
Args:
in_channels (int): Input channels of this block.
out_channels (int): Output channels of this block.
groups (int): Groups of conv2.
width_per_group (int): Width per group of conv2. 64x4d indicates
``groups=64, width_per_group=4`` and 32x8d indicates
``groups=32, width_per_group=8``.
radix (int): Radix of SpltAtConv2d. Default: 2
reduction_factor (int): Reduction factor of SplitAttentionConv2d.
Default: 4.
avg_down_stride (bool): Whether to use average pool for stride in
Bottleneck. Default: True.
stride (int): stride of the block. Default: 1
dilation (int): dilation of convolution. Default: 1
downsample (nn.Module, optional): downsample operation on identity
branch. Default: None
style (str): `pytorch` or `caffe`. If set to "pytorch", the stride-two
layer is the 3x3 conv layer, otherwise the stride-two layer is
the first 1x1 conv layer.
conv_cfg (dict, optional): dictionary to construct and config conv
layer. Default: None
norm_cfg (dict): dictionary to construct and config norm layer.
Default: dict(type='BN')
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
memory while slowing down the training speed.
"""
def __init__(self,
in_channels,
out_channels,
groups=1,
width_per_group=4,
base_channels=64,
radix=2,
reduction_factor=4,
avg_down_stride=True,
**kwargs):
super(Bottleneck, self).__init__(in_channels, out_channels, **kwargs)
self.groups = groups
self.width_per_group = width_per_group
# For ResNet bottleneck, middle channels are determined by expansion
# and out_channels, but for ResNeXt bottleneck, it is determined by
# groups and width_per_group and the stage it is located in.
if groups != 1:
assert self.mid_channels % base_channels == 0
self.mid_channels = (
groups * width_per_group * self.mid_channels // base_channels)
self.avg_down_stride = avg_down_stride and self.conv2_stride > 1
self.norm1_name, norm1 = build_norm_layer(
self.norm_cfg, self.mid_channels, postfix=1)
self.norm3_name, norm3 = build_norm_layer(
self.norm_cfg, self.out_channels, postfix=3)
self.conv1 = build_conv_layer(
self.conv_cfg,
self.in_channels,
self.mid_channels,
kernel_size=1,
stride=self.conv1_stride,
bias=False)
self.add_module(self.norm1_name, norm1)
self.conv2 = SplitAttentionConv2d(
self.mid_channels,
self.mid_channels,
kernel_size=3,
stride=1 if self.avg_down_stride else self.conv2_stride,
padding=self.dilation,
dilation=self.dilation,
groups=groups,
radix=radix,
reduction_factor=reduction_factor,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg)
delattr(self, self.norm2_name)
if self.avg_down_stride:
self.avd_layer = nn.AvgPool2d(3, self.conv2_stride, padding=1)
self.conv3 = build_conv_layer(
self.conv_cfg,
self.mid_channels,
self.out_channels,
kernel_size=1,
bias=False)
self.add_module(self.norm3_name, norm3)
def forward(self, x):
def _inner_forward(x):
identity = x
out = self.conv1(x)
out = self.norm1(out)
out = self.relu(out)
out = self.conv2(out)
if self.avg_down_stride:
out = self.avd_layer(out)
out = self.conv3(out)
out = self.norm3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
return out
if self.with_cp and x.requires_grad:
out = cp.checkpoint(_inner_forward, x)
else:
out = _inner_forward(x)
out = self.relu(out)
return out
@BACKBONES.register_module()
class ResNeSt(ResNetV1d):
"""ResNeSt backbone.
Please refer to the `paper <https://arxiv.org/pdf/2004.08955.pdf>`_ for
details.
Args:
depth (int): Network depth, from {50, 101, 152, 200}.
groups (int): Groups of conv2 in Bottleneck. Default: 32.
width_per_group (int): Width per group of conv2 in Bottleneck.
Default: 4.
radix (int): Radix of SpltAtConv2d. Default: 2
reduction_factor (int): Reduction factor of SplitAttentionConv2d.
Default: 4.
avg_down_stride (bool): Whether to use average pool for stride in
Bottleneck. Default: True.
in_channels (int): Number of input image channels. Default: 3.
stem_channels (int): Output channels of the stem layer. Default: 64.
num_stages (int): Stages of the network. Default: 4.
strides (Sequence[int]): Strides of the first block of each stage.
Default: ``(1, 2, 2, 2)``.
dilations (Sequence[int]): Dilation of each stage.
Default: ``(1, 1, 1, 1)``.
out_indices (Sequence[int]): Output from which stages. If only one
stage is specified, a single tensor (feature map) is returned,
otherwise multiple stages are specified, a tuple of tensors will
be returned. Default: ``(3, )``.
style (str): `pytorch` or `caffe`. If set to "pytorch", the stride-two
layer is the 3x3 conv layer, otherwise the stride-two layer is
the first 1x1 conv layer.
deep_stem (bool): Replace 7x7 conv in input stem with 3 3x3 conv.
Default: False.
avg_down (bool): Use AvgPool instead of stride conv when
downsampling in the bottleneck. Default: False.
frozen_stages (int): Stages to be frozen (stop grad and set eval mode).
-1 means not freezing any parameters. Default: -1.
conv_cfg (dict | None): The config dict for conv layers. Default: None.
norm_cfg (dict): The config dict for norm layers.
norm_eval (bool): Whether to set norm layers to eval mode, namely,
freeze running stats (mean and var). Note: Effect on Batch Norm
and its variants only. Default: False.
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
memory while slowing down the training speed. Default: False.
zero_init_residual (bool): Whether to use zero init for last norm layer
in resblocks to let them behave as identity. Default: True.
"""
arch_settings = {
50: (Bottleneck, (3, 4, 6, 3)),
101: (Bottleneck, (3, 4, 23, 3)),
152: (Bottleneck, (3, 8, 36, 3)),
200: (Bottleneck, (3, 24, 36, 3)),
269: (Bottleneck, (3, 30, 48, 8))
}
def __init__(self,
depth,
groups=1,
width_per_group=4,
radix=2,
reduction_factor=4,
avg_down_stride=True,
**kwargs):
self.groups = groups
self.width_per_group = width_per_group
self.radix = radix
self.reduction_factor = reduction_factor
self.avg_down_stride = avg_down_stride
super(ResNeSt, self).__init__(depth=depth, **kwargs)
def make_res_layer(self, **kwargs):
return ResLayer(
groups=self.groups,
width_per_group=self.width_per_group,
base_channels=self.base_channels,
radix=self.radix,
reduction_factor=self.reduction_factor,
avg_down_stride=self.avg_down_stride,
**kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment