Commit 322546ff authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'add_Recommendation' into 'main'

添加openmmlab测试用例

See merge request dcutoolkit/deeplearing/dlexamples_new!32
parents 1f4ba993 8c867a92
import warnings
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
import cv2
import mmcv
import torch
import torch.distributed as dist
from mmcv import color_val
from mmcv.runner import BaseModule
# TODO import `auto_fp16` from mmcv and delete them from mmcls
try:
from mmcv.runner import auto_fp16
except ImportError:
warnings.warn('auto_fp16 from mmcls will be deprecated.'
'Please install mmcv>=1.1.4.')
from mmcls.core import auto_fp16
class BaseClassifier(BaseModule, metaclass=ABCMeta):
"""Base class for classifiers."""
def __init__(self, init_cfg=None):
super(BaseClassifier, self).__init__(init_cfg)
self.fp16_enabled = False
@property
def with_neck(self):
return hasattr(self, 'neck') and self.neck is not None
@property
def with_head(self):
return hasattr(self, 'head') and self.head is not None
@abstractmethod
def extract_feat(self, imgs):
pass
def extract_feats(self, imgs):
assert isinstance(imgs, list)
for img in imgs:
yield self.extract_feat(img)
@abstractmethod
def forward_train(self, imgs, **kwargs):
"""
Args:
img (list[Tensor]): List of tensors of shape (1, C, H, W).
Typically these should be mean centered and std scaled.
kwargs (keyword arguments): Specific to concrete implementation.
"""
pass
@abstractmethod
def simple_test(self, img, **kwargs):
pass
def forward_test(self, imgs, **kwargs):
"""
Args:
imgs (List[Tensor]): the outer list indicates test-time
augmentations and inner Tensor should have a shape NxCxHxW,
which contains all images in the batch.
"""
if isinstance(imgs, torch.Tensor):
imgs = [imgs]
for var, name in [(imgs, 'imgs')]:
if not isinstance(var, list):
raise TypeError(f'{name} must be a list, but got {type(var)}')
if len(imgs) == 1:
return self.simple_test(imgs[0], **kwargs)
else:
raise NotImplementedError('aug_test has not been implemented')
@auto_fp16(apply_to=('img', ))
def forward(self, img, return_loss=True, **kwargs):
"""Calls either forward_train or forward_test depending on whether
return_loss=True.
Note this setting will change the expected inputs. When
`return_loss=True`, img and img_meta are single-nested (i.e. Tensor and
List[dict]), and when `resturn_loss=False`, img and img_meta should be
double nested (i.e. List[Tensor], List[List[dict]]), with the outer
list indicating test time augmentations.
"""
if return_loss:
return self.forward_train(img, **kwargs)
else:
return self.forward_test(img, **kwargs)
def _parse_losses(self, losses):
log_vars = OrderedDict()
for loss_name, loss_value in losses.items():
if isinstance(loss_value, torch.Tensor):
log_vars[loss_name] = loss_value.mean()
elif isinstance(loss_value, list):
log_vars[loss_name] = sum(_loss.mean() for _loss in loss_value)
elif isinstance(loss_value, dict):
for name, value in loss_value.items():
log_vars[name] = value
else:
raise TypeError(
f'{loss_name} is not a tensor or list of tensors')
loss = sum(_value for _key, _value in log_vars.items()
if 'loss' in _key)
log_vars['loss'] = loss
for loss_name, loss_value in log_vars.items():
# reduce loss when distributed training
if dist.is_available() and dist.is_initialized():
loss_value = loss_value.data.clone()
dist.all_reduce(loss_value.div_(dist.get_world_size()))
log_vars[loss_name] = loss_value.item()
return loss, log_vars
def train_step(self, data, optimizer):
"""The iteration step during training.
This method defines an iteration step during training, except for the
back propagation and optimizer updating, which are done in an optimizer
hook. Note that in some complicated cases or models, the whole process
including back propagation and optimizer updating are also defined in
this method, such as GAN.
Args:
data (dict): The output of dataloader.
optimizer (:obj:`torch.optim.Optimizer` | dict): The optimizer of
runner is passed to ``train_step()``. This argument is unused
and reserved.
Returns:
dict: It should contain at least 3 keys: ``loss``, ``log_vars``,
``num_samples``.
``loss`` is a tensor for back propagation, which can be a
weighted sum of multiple losses.
``log_vars`` contains all the variables to be sent to the
logger.
``num_samples`` indicates the batch size (when the model is
DDP, it means the batch size on each GPU), which is used for
averaging the logs.
"""
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))
return outputs
def val_step(self, data, optimizer):
"""The iteration step during validation.
This method shares the same signature as :func:`train_step`, but used
during val epochs. Note that the evaluation after training epochs is
not implemented with this method, but an evaluation hook.
"""
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))
return outputs
def show_result(self,
img,
result,
text_color='green',
font_scale=0.5,
row_width=20,
show=False,
win_name='',
wait_time=0,
out_file=None):
"""Draw `result` over `img`.
Args:
img (str or Tensor): The image to be displayed.
result (Tensor): The classification results to draw over `img`.
text_color (str or tuple or :obj:`Color`): Color of texts.
font_scale (float): Font scales of texts.
row_width (int): width between each row of results on the image.
show (bool): Whether to show the image.
Default: False.
win_name (str): The window name.
wait_time (int): Value of waitKey param.
Default: 0.
out_file (str or None): The filename to write the image.
Default: None.
Returns:
img (Tensor): Only if not `show` or `out_file`
"""
img = mmcv.imread(img)
img = img.copy()
# write results on left-top of the image
x, y = 0, row_width
text_color = color_val(text_color)
for k, v in result.items():
if isinstance(v, float):
v = f'{v:.2f}'
label_text = f'{k}: {v}'
cv2.putText(img, label_text, (x, y), cv2.FONT_HERSHEY_COMPLEX,
font_scale, text_color)
y += row_width
# if out_file specified, do not show image in window
if out_file is not None:
show = False
if show:
mmcv.imshow(img, win_name, wait_time)
if out_file is not None:
mmcv.imwrite(img, out_file)
if not (show or out_file):
warnings.warn('show==False and out_file is not specified, only '
'result image will be returned')
return img
import copy
import warnings
from ..builder import CLASSIFIERS, build_backbone, build_head, build_neck
from ..utils.augment import Augments
from .base import BaseClassifier
@CLASSIFIERS.register_module()
class ImageClassifier(BaseClassifier):
def __init__(self,
backbone,
neck=None,
head=None,
pretrained=None,
train_cfg=None,
init_cfg=None):
super(ImageClassifier, self).__init__(init_cfg)
if pretrained is not None:
warnings.warn('DeprecationWarning: pretrained is a deprecated \
key, please consider using init_cfg')
self.init_cfg = dict(type='Pretrained', checkpoint=pretrained)
self.backbone = build_backbone(backbone)
if neck is not None:
self.neck = build_neck(neck)
if head is not None:
self.head = build_head(head)
self.augments = None
if train_cfg is not None:
augments_cfg = train_cfg.get('augments', None)
if augments_cfg is not None:
self.augments = Augments(augments_cfg)
else:
# Considering BC-breaking
mixup_cfg = train_cfg.get('mixup', None)
cutmix_cfg = train_cfg.get('cutmix', None)
assert mixup_cfg is None or cutmix_cfg is None, \
'If mixup and cutmix are set simultaneously,' \
'use augments instead.'
if mixup_cfg is not None:
warnings.warn('The mixup attribute will be deprecated. '
'Please use augments instead.')
cfg = copy.deepcopy(mixup_cfg)
cfg['type'] = 'BatchMixup'
# In the previous version, mixup_prob is always 1.0.
cfg['prob'] = 1.0
self.augments = Augments(cfg)
if cutmix_cfg is not None:
warnings.warn('The cutmix attribute will be deprecated. '
'Please use augments instead.')
cfg = copy.deepcopy(cutmix_cfg)
cutmix_prob = cfg.pop('cutmix_prob')
cfg['type'] = 'BatchCutMix'
cfg['prob'] = cutmix_prob
self.augments = Augments(cfg)
def extract_feat(self, img):
"""Directly extract features from the backbone + neck."""
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x
def forward_train(self, img, gt_label, **kwargs):
"""Forward computation during training.
Args:
img (Tensor): of shape (N, C, H, W) encoding input images.
Typically these should be mean centered and std scaled.
gt_label (Tensor): It should be of shape (N, 1) encoding the
ground-truth label of input images for single label task. It
shoulf be of shape (N, C) encoding the ground-truth label
of input images for multi-labels task.
Returns:
dict[str, Tensor]: a dictionary of loss components
"""
if self.augments is not None:
img, gt_label = self.augments(img, gt_label)
x = self.extract_feat(img)
losses = dict()
loss = self.head.forward_train(x, gt_label)
losses.update(loss)
return losses
def simple_test(self, img, img_metas):
"""Test without augmentation."""
x = self.extract_feat(img)
return self.head.simple_test(x)
from .cls_head import ClsHead
from .linear_head import LinearClsHead
from .multi_label_head import MultiLabelClsHead
from .multi_label_linear_head import MultiLabelLinearClsHead
from .vision_transformer_head import VisionTransformerClsHead
__all__ = [
'ClsHead', 'LinearClsHead', 'MultiLabelClsHead', 'MultiLabelLinearClsHead',
'VisionTransformerClsHead'
]
from abc import ABCMeta, abstractmethod
from mmcv.runner import BaseModule
class BaseHead(BaseModule, metaclass=ABCMeta):
"""Base head."""
def __init__(self, init_cfg=None):
super(BaseHead, self).__init__(init_cfg)
@abstractmethod
def forward_train(self, x, gt_label, **kwargs):
pass
import torch
import torch.nn.functional as F
from mmcls.models.losses import Accuracy
from ..builder import HEADS, build_loss
from .base_head import BaseHead
@HEADS.register_module()
class ClsHead(BaseHead):
"""classification head.
Args:
loss (dict): Config of classification loss.
topk (int | tuple): Top-k accuracy.
cal_acc (bool): Whether to calculate accuracy during training.
If you use Mixup/CutMix or something like that during training,
it is not reasonable to calculate accuracy. Defaults to False.
"""
def __init__(self,
loss=dict(type='CrossEntropyLoss', loss_weight=1.0),
topk=(1, ),
cal_acc=False,
init_cfg=None):
super(ClsHead, self).__init__(init_cfg=init_cfg)
assert isinstance(loss, dict)
assert isinstance(topk, (int, tuple))
if isinstance(topk, int):
topk = (topk, )
for _topk in topk:
assert _topk > 0, 'Top-k should be larger than 0'
self.topk = topk
self.compute_loss = build_loss(loss)
self.compute_accuracy = Accuracy(topk=self.topk)
self.cal_acc = cal_acc
def loss(self, cls_score, gt_label):
num_samples = len(cls_score)
losses = dict()
# compute loss
loss = self.compute_loss(cls_score, gt_label, avg_factor=num_samples)
if self.cal_acc:
# compute accuracy
acc = self.compute_accuracy(cls_score, gt_label)
assert len(acc) == len(self.topk)
losses['accuracy'] = {
f'top-{k}': a
for k, a in zip(self.topk, acc)
}
losses['loss'] = loss
return losses
def forward_train(self, cls_score, gt_label):
losses = self.loss(cls_score, gt_label)
return losses
def simple_test(self, cls_score):
"""Test without augmentation."""
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
pred = F.softmax(cls_score, dim=1) if cls_score is not None else None
on_trace = hasattr(torch.jit, 'is_tracing') and torch.jit.is_tracing()
if torch.onnx.is_in_onnx_export() or on_trace:
return pred
pred = list(pred.detach().cpu().numpy())
return pred
import torch
import torch.nn as nn
import torch.nn.functional as F
from ..builder import HEADS
from .cls_head import ClsHead
@HEADS.register_module()
class LinearClsHead(ClsHead):
"""Linear classifier head.
Args:
num_classes (int): Number of categories excluding the background
category.
in_channels (int): Number of channels in the input feature map.
"""
def __init__(self,
num_classes,
in_channels,
init_cfg=None,
*args,
**kwargs):
init_cfg = init_cfg or dict(
type='Normal',
mean=0.,
std=0.01,
bias=0.,
override=dict(name='fc'))
super(LinearClsHead, self).__init__(init_cfg=init_cfg, *args, **kwargs)
self.in_channels = in_channels
self.num_classes = num_classes
if self.num_classes <= 0:
raise ValueError(
f'num_classes={num_classes} must be a positive integer')
self._init_layers()
def _init_layers(self):
self.fc = nn.Linear(self.in_channels, self.num_classes)
def simple_test(self, img):
"""Test without augmentation."""
cls_score = self.fc(img)
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
pred = F.softmax(cls_score, dim=1) if cls_score is not None else None
on_trace = hasattr(torch.jit, 'is_tracing') and torch.jit.is_tracing()
if torch.onnx.is_in_onnx_export() or on_trace:
return pred
pred = list(pred.detach().cpu().numpy())
return pred
def forward_train(self, x, gt_label):
cls_score = self.fc(x)
losses = self.loss(cls_score, gt_label)
return losses
import torch
import torch.nn.functional as F
from ..builder import HEADS, build_loss
from .base_head import BaseHead
@HEADS.register_module()
class MultiLabelClsHead(BaseHead):
"""Classification head for multilabel task.
Args:
loss (dict): Config of classification loss.
"""
def __init__(self,
loss=dict(
type='CrossEntropyLoss',
use_sigmoid=True,
reduction='mean',
loss_weight=1.0),
init_cfg=None):
super(MultiLabelClsHead, self).__init__(init_cfg=init_cfg)
assert isinstance(loss, dict)
self.compute_loss = build_loss(loss)
def loss(self, cls_score, gt_label):
gt_label = gt_label.type_as(cls_score)
num_samples = len(cls_score)
losses = dict()
# map difficult examples to positive ones
_gt_label = torch.abs(gt_label)
# compute loss
loss = self.compute_loss(cls_score, _gt_label, avg_factor=num_samples)
losses['loss'] = loss
return losses
def forward_train(self, cls_score, gt_label):
gt_label = gt_label.type_as(cls_score)
losses = self.loss(cls_score, gt_label)
return losses
def simple_test(self, cls_score):
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
pred = F.sigmoid(cls_score) if cls_score is not None else None
on_trace = hasattr(torch.jit, 'is_tracing') and torch.jit.is_tracing()
if torch.onnx.is_in_onnx_export() or on_trace:
return pred
pred = list(pred.detach().cpu().numpy())
return pred
import torch
import torch.nn as nn
import torch.nn.functional as F
from ..builder import HEADS
from .multi_label_head import MultiLabelClsHead
@HEADS.register_module()
class MultiLabelLinearClsHead(MultiLabelClsHead):
"""Linear classification head for multilabel task.
Args:
num_classes (int): Number of categories.
in_channels (int): Number of channels in the input feature map.
loss (dict): Config of classification loss.
"""
def __init__(self,
num_classes,
in_channels,
loss=dict(
type='CrossEntropyLoss',
use_sigmoid=True,
reduction='mean',
loss_weight=1.0),
init_cfg=dict(
type='Normal',
mean=0.,
std=0.01,
bias=0.,
override=dict(name='fc'))):
super(MultiLabelLinearClsHead, self).__init__(
loss=loss, init_cfg=init_cfg)
if num_classes <= 0:
raise ValueError(
f'num_classes={num_classes} must be a positive integer')
self.in_channels = in_channels
self.num_classes = num_classes
self._init_layers()
def _init_layers(self):
self.fc = nn.Linear(self.in_channels, self.num_classes)
def forward_train(self, x, gt_label):
gt_label = gt_label.type_as(x)
cls_score = self.fc(x)
losses = self.loss(cls_score, gt_label)
return losses
def simple_test(self, img):
"""Test without augmentation."""
cls_score = self.fc(img)
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
pred = F.sigmoid(cls_score) if cls_score is not None else None
on_trace = hasattr(torch.jit, 'is_tracing') and torch.jit.is_tracing()
if torch.onnx.is_in_onnx_export() or on_trace:
return pred
pred = list(pred.detach().cpu().numpy())
return pred
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmcv.cnn import build_activation_layer, constant_init, kaiming_init
from ..builder import HEADS
from .cls_head import ClsHead
@HEADS.register_module()
class VisionTransformerClsHead(ClsHead):
"""Vision Transformer classifier head.
Args:
num_classes (int): Number of categories excluding the background
category.
in_channels (int): Number of channels in the input feature map.
hidden_dim (int): Number of the dimensions for hidden layer. Only
available during pre-training. Default None.
act_cfg (dict): The activation config. Only available during
pre-training. Defalut Tanh.
"""
def __init__(self,
num_classes,
in_channels,
hidden_dim=None,
act_cfg=dict(type='Tanh'),
*args,
**kwargs):
super(VisionTransformerClsHead, self).__init__(*args, **kwargs)
self.in_channels = in_channels
self.num_classes = num_classes
self.hidden_dim = hidden_dim
self.act_cfg = act_cfg
if self.num_classes <= 0:
raise ValueError(
f'num_classes={num_classes} must be a positive integer')
self._init_layers()
def _init_layers(self):
if self.hidden_dim is None:
layers = [('head', nn.Linear(self.in_channels, self.num_classes))]
else:
layers = [
('pre_logits', nn.Linear(self.in_channels, self.hidden_dim)),
('act', build_activation_layer(self.act_cfg)),
('head', nn.Linear(self.hidden_dim, self.num_classes)),
]
self.layers = nn.Sequential(OrderedDict(layers))
def init_weights(self):
super(VisionTransformerClsHead, self).init_weights()
# Modified from ClassyVision
if hasattr(self.layers, 'pre_logits'):
# Lecun norm
kaiming_init(
self.layers.pre_logits, mode='fan_in', nonlinearity='linear')
constant_init(self.layers.head, 0)
def simple_test(self, img):
"""Test without augmentation."""
cls_score = self.layers(img)
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
pred = F.softmax(cls_score, dim=1) if cls_score is not None else None
on_trace = hasattr(torch.jit, 'is_tracing') and torch.jit.is_tracing()
if torch.onnx.is_in_onnx_export() or on_trace:
return pred
pred = list(pred.detach().cpu().numpy())
return pred
def forward_train(self, x, gt_label):
cls_score = self.layers(x)
losses = self.loss(cls_score, gt_label)
return losses
from .accuracy import Accuracy, accuracy
from .asymmetric_loss import AsymmetricLoss, asymmetric_loss
from .cross_entropy_loss import (CrossEntropyLoss, binary_cross_entropy,
cross_entropy)
from .focal_loss import FocalLoss, sigmoid_focal_loss
from .label_smooth_loss import LabelSmoothLoss
from .utils import (convert_to_one_hot, reduce_loss, weight_reduce_loss,
weighted_loss)
__all__ = [
'accuracy', 'Accuracy', 'asymmetric_loss', 'AsymmetricLoss',
'cross_entropy', 'binary_cross_entropy', 'CrossEntropyLoss', 'reduce_loss',
'weight_reduce_loss', 'LabelSmoothLoss', 'weighted_loss', 'FocalLoss',
'sigmoid_focal_loss', 'convert_to_one_hot'
]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment