Commit b6c19984 authored by dengjb's avatar dengjb
Browse files

update

parents
# encoding: utf-8
# based on:
# https://github.com/zhunzhong07/person-re-ranking
__all__ = ['re_ranking']
import numpy as np
def re_ranking(q_g_dist, q_q_dist, g_g_dist, k1: int = 20, k2: int = 6, lambda_value: float = 0.3):
original_dist = np.concatenate(
[np.concatenate([q_q_dist, q_g_dist], axis=1),
np.concatenate([q_g_dist.T, g_g_dist], axis=1)],
axis=0)
original_dist = np.power(original_dist, 2).astype(np.float32)
original_dist = np.transpose(1. * original_dist / np.max(original_dist, axis=0))
V = np.zeros_like(original_dist).astype(np.float32)
initial_rank = np.argsort(original_dist).astype(np.int32)
query_num = q_g_dist.shape[0]
gallery_num = q_g_dist.shape[0] + q_g_dist.shape[1]
all_num = gallery_num
for i in range(all_num):
# k-reciprocal neighbors
forward_k_neigh_index = initial_rank[i, :k1 + 1]
backward_k_neigh_index = initial_rank[forward_k_neigh_index, :k1 + 1]
fi = np.where(backward_k_neigh_index == i)[0]
k_reciprocal_index = forward_k_neigh_index[fi]
k_reciprocal_expansion_index = k_reciprocal_index
for j in range(len(k_reciprocal_index)):
candidate = k_reciprocal_index[j]
candidate_forward_k_neigh_index = initial_rank[candidate,
:int(np.around(k1 / 2.)) + 1]
candidate_backward_k_neigh_index = initial_rank[candidate_forward_k_neigh_index,
:int(np.around(k1 / 2.)) + 1]
fi_candidate = np.where(candidate_backward_k_neigh_index == candidate)[0]
candidate_k_reciprocal_index = candidate_forward_k_neigh_index[fi_candidate]
if len(np.intersect1d(candidate_k_reciprocal_index, k_reciprocal_index)) > 2. / 3 * len(
candidate_k_reciprocal_index):
k_reciprocal_expansion_index = np.append(k_reciprocal_expansion_index, candidate_k_reciprocal_index)
k_reciprocal_expansion_index = np.unique(k_reciprocal_expansion_index)
weight = np.exp(-original_dist[i, k_reciprocal_expansion_index])
V[i, k_reciprocal_expansion_index] = 1. * weight / np.sum(weight)
original_dist = original_dist[:query_num, ]
if k2 != 1:
V_qe = np.zeros_like(V, dtype=np.float32)
for i in range(all_num):
V_qe[i, :] = np.mean(V[initial_rank[i, :k2], :], axis=0)
V = V_qe
del V_qe
del initial_rank
invIndex = []
for i in range(gallery_num):
invIndex.append(np.where(V[:, i] != 0)[0])
jaccard_dist = np.zeros_like(original_dist, dtype=np.float32)
for i in range(query_num):
temp_min = np.zeros(shape=[1, gallery_num], dtype=np.float32)
indNonZero = np.where(V[i, :] != 0)[0]
indImages = [invIndex[ind] for ind in indNonZero]
for j in range(len(indNonZero)):
temp_min[0, indImages[j]] = temp_min[0, indImages[j]] + np.minimum(V[i, indNonZero[j]],
V[indImages[j], indNonZero[j]])
jaccard_dist[i] = 1 - temp_min / (2. - temp_min)
final_dist = jaccard_dist * (1 - lambda_value) + original_dist * lambda_value
del original_dist, V, jaccard_dist
final_dist = final_dist[:query_num, query_num:]
return final_dist
# encoding: utf-8
"""
@author: l1aoxingyu
@contact: sherlockliao01@gmail.com
"""
import warnings
import faiss
import numpy as np
try:
from .rank_cylib.roc_cy import evaluate_roc_cy
IS_CYTHON_AVAI = True
except ImportError:
IS_CYTHON_AVAI = False
warnings.warn(
'Cython roc evaluation (very fast so highly recommended) is '
'unavailable, now use python evaluation.'
)
def evaluate_roc_py(distmat, q_pids, g_pids, q_camids, g_camids):
r"""Evaluation with ROC curve.
Key: for each query identity, its gallery images from the same camera view are discarded.
Args:
distmat (np.ndarray): cosine distance matrix
"""
num_q, num_g = distmat.shape
indices = np.argsort(distmat, axis=1)
matches = (g_pids[indices] == q_pids[:, np.newaxis]).astype(np.int32)
pos = []
neg = []
for q_idx in range(num_q):
# get query pid and camid
q_pid = q_pids[q_idx]
q_camid = q_camids[q_idx]
# Remove gallery samples that have the same pid and camid with query
order = indices[q_idx]
remove = (g_pids[order] == q_pid) & (g_camids[order] == q_camid)
keep = np.invert(remove)
raw_cmc = matches[q_idx][keep]
sort_idx = order[keep]
q_dist = distmat[q_idx]
ind_pos = np.where(raw_cmc == 1)[0]
pos.extend(q_dist[sort_idx[ind_pos]])
ind_neg = np.where(raw_cmc == 0)[0]
neg.extend(q_dist[sort_idx[ind_neg]])
scores = np.hstack((pos, neg))
labels = np.hstack((np.zeros(len(pos)), np.ones(len(neg))))
return scores, labels
def evaluate_roc(
distmat,
q_pids,
g_pids,
q_camids,
g_camids,
use_cython=True
):
"""Evaluates CMC rank.
Args:
distmat (numpy.ndarray): distance matrix of shape (num_query, num_gallery).
q_pids (numpy.ndarray): 1-D array containing person identities
of each query instance.
g_pids (numpy.ndarray): 1-D array containing person identities
of each gallery instance.
q_camids (numpy.ndarray): 1-D array containing camera views under
which each query instance is captured.
g_camids (numpy.ndarray): 1-D array containing camera views under
which each gallery instance is captured.
use_cython (bool, optional): use cython code for evaluation. Default is True.
This is highly recommended as the cython code can speed up the cmc computation
by more than 10x. This requires Cython to be installed.
"""
if use_cython and IS_CYTHON_AVAI:
return evaluate_roc_cy(distmat, q_pids, g_pids, q_camids, g_camids)
else:
return evaluate_roc_py(distmat, q_pids, g_pids, q_camids, g_camids)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
import pprint
import sys
from collections import Mapping, OrderedDict
import numpy as np
from tabulate import tabulate
from termcolor import colored
def print_csv_format(results):
"""
Print main metrics in a format similar to Detectron2,
so that they are easy to copypaste into a spreadsheet.
Args:
results (OrderedDict): {metric -> score}
"""
# unordered results cannot be properly printed
assert isinstance(results, OrderedDict) or not len(results), results
logger = logging.getLogger(__name__)
dataset_name = results.pop('dataset')
metrics = ["Dataset"] + [k for k in results]
csv_results = [(dataset_name, *list(results.values()))]
# tabulate it
table = tabulate(
csv_results,
tablefmt="pipe",
floatfmt=".2f",
headers=metrics,
numalign="left",
)
logger.info("Evaluation results in csv format: \n" + colored(table, "cyan"))
def verify_results(cfg, results):
"""
Args:
results (OrderedDict[dict]): task_name -> {metric -> score}
Returns:
bool: whether the verification succeeds or not
"""
expected_results = cfg.TEST.EXPECTED_RESULTS
if not len(expected_results):
return True
ok = True
for task, metric, expected, tolerance in expected_results:
actual = results[task][metric]
if not np.isfinite(actual):
ok = False
diff = abs(actual - expected)
if diff > tolerance:
ok = False
logger = logging.getLogger(__name__)
if not ok:
logger.error("Result verification failed!")
logger.error("Expected Results: " + str(expected_results))
logger.error("Actual Results: " + pprint.pformat(results))
sys.exit(1)
else:
logger.info("Results verification passed.")
return ok
def flatten_results_dict(results):
"""
Expand a hierarchical dict of scalars into a flat dict of scalars.
If results[k1][k2][k3] = v, the returned dict will have the entry
{"k1/k2/k3": v}.
Args:
results (dict):
"""
r = {}
for k, v in results.items():
if isinstance(v, Mapping):
v = flatten_results_dict(v)
for kk, vv in v.items():
r[k + "/" + kk] = vv
else:
r[k] = v
return r
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
from .activation import *
from .batch_norm import *
from .context_block import ContextBlock
from .drop import DropPath, DropBlock2d, drop_block_2d, drop_path
from .frn import FRN, TLU
from .gather_layer import GatherLayer
from .helpers import to_ntuple, to_2tuple, to_3tuple, to_4tuple, make_divisible
from .non_local import Non_local
from .se_layer import SELayer
from .splat import SplAtConv2d, DropBlock2D
from .weight_init import (
trunc_normal_, variance_scaling_, lecun_normal_, weights_init_kaiming, weights_init_classifier
)
# encoding: utf-8
"""
@author: xingyu liao
@contact: sherlockliao01@gmail.com
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
__all__ = [
'Mish',
'Swish',
'MemoryEfficientSwish',
'GELU']
class Mish(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
# inlining this saves 1 second per epoch (V100 GPU) vs having a temp x and then returning x(!)
return x * (torch.tanh(F.softplus(x)))
class Swish(nn.Module):
def forward(self, x):
return x * torch.sigmoid(x)
class SwishImplementation(torch.autograd.Function):
@staticmethod
def forward(ctx, i):
result = i * torch.sigmoid(i)
ctx.save_for_backward(i)
return result
@staticmethod
def backward(ctx, grad_output):
i = ctx.saved_variables[0]
sigmoid_i = torch.sigmoid(i)
return grad_output * (sigmoid_i * (1 + i * (1 - sigmoid_i)))
class MemoryEfficientSwish(nn.Module):
def forward(self, x):
return SwishImplementation.apply(x)
class GELU(nn.Module):
"""
Paper Section 3.4, last paragraph notice that BERT used the GELU instead of RELU
"""
def forward(self, x):
return 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
import torch
import torch.nn as nn
__all__ = [
"Linear",
"ArcSoftmax",
"CosSoftmax",
"CircleSoftmax"
]
class Linear(nn.Module):
def __init__(self, num_classes, scale, margin):
super().__init__()
self.num_classes = num_classes
self.s = scale
self.m = margin
def forward(self, logits, targets):
return logits.mul_(self.s)
def extra_repr(self):
return f"num_classes={self.num_classes}, scale={self.s}, margin={self.m}"
class CosSoftmax(Linear):
r"""Implement of large margin cosine distance:
"""
def forward(self, logits, targets):
index = torch.where(targets != -1)[0]
m_hot = torch.zeros(index.size()[0], logits.size()[1], device=logits.device, dtype=logits.dtype)
m_hot.scatter_(1, targets[index, None], self.m)
logits[index] -= m_hot
logits.mul_(self.s)
return logits
class ArcSoftmax(Linear):
def forward(self, logits, targets):
index = torch.where(targets != -1)[0]
m_hot = torch.zeros(index.size()[0], logits.size()[1], device=logits.device, dtype=logits.dtype)
m_hot.scatter_(1, targets[index, None], self.m)
logits.acos_()
logits[index] += m_hot
logits.cos_().mul_(self.s)
return logits
class CircleSoftmax(Linear):
def forward(self, logits, targets):
alpha_p = torch.clamp_min(-logits.detach() + 1 + self.m, min=0.)
alpha_n = torch.clamp_min(logits.detach() + self.m, min=0.)
delta_p = 1 - self.m
delta_n = self.m
# When use model parallel, there are some targets not in class centers of local rank
index = torch.where(targets != -1)[0]
m_hot = torch.zeros(index.size()[0], logits.size()[1], device=logits.device, dtype=logits.dtype)
m_hot.scatter_(1, targets[index, None], 1)
logits_p = alpha_p * (logits - delta_p)
logits_n = alpha_n * (logits - delta_n)
logits[index] = logits_p[index] * m_hot + logits_n[index] * (1 - m_hot)
neg_index = torch.where(targets == -1)[0]
logits[neg_index] = logits_n[neg_index]
logits.mul_(self.s)
return logits
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
import logging
import torch
import torch.nn.functional as F
from torch import nn
__all__ = ["IBN", "get_norm"]
class BatchNorm(nn.BatchNorm2d):
def __init__(self, num_features, eps=1e-05, momentum=0.1, weight_freeze=False, bias_freeze=False, weight_init=1.0,
bias_init=0.0, **kwargs):
super().__init__(num_features, eps=eps, momentum=momentum)
if weight_init is not None: nn.init.constant_(self.weight, weight_init)
if bias_init is not None: nn.init.constant_(self.bias, bias_init)
self.weight.requires_grad_(not weight_freeze)
self.bias.requires_grad_(not bias_freeze)
class SyncBatchNorm(nn.SyncBatchNorm):
def __init__(self, num_features, eps=1e-05, momentum=0.1, weight_freeze=False, bias_freeze=False, weight_init=1.0,
bias_init=0.0):
super().__init__(num_features, eps=eps, momentum=momentum)
if weight_init is not None: nn.init.constant_(self.weight, weight_init)
if bias_init is not None: nn.init.constant_(self.bias, bias_init)
self.weight.requires_grad_(not weight_freeze)
self.bias.requires_grad_(not bias_freeze)
class IBN(nn.Module):
def __init__(self, planes, bn_norm, **kwargs):
super(IBN, self).__init__()
half1 = int(planes / 2)
self.half = half1
half2 = planes - half1
self.IN = nn.InstanceNorm2d(half1, affine=True)
self.BN = get_norm(bn_norm, half2, **kwargs)
def forward(self, x):
split = torch.split(x, self.half, 1)
out1 = self.IN(split[0].contiguous())
out2 = self.BN(split[1].contiguous())
out = torch.cat((out1, out2), 1)
return out
class GhostBatchNorm(BatchNorm):
def __init__(self, num_features, num_splits=1, **kwargs):
super().__init__(num_features, **kwargs)
self.num_splits = num_splits
self.register_buffer('running_mean', torch.zeros(num_features))
self.register_buffer('running_var', torch.ones(num_features))
def forward(self, input):
N, C, H, W = input.shape
if self.training or not self.track_running_stats:
self.running_mean = self.running_mean.repeat(self.num_splits)
self.running_var = self.running_var.repeat(self.num_splits)
outputs = F.batch_norm(
input.view(-1, C * self.num_splits, H, W), self.running_mean, self.running_var,
self.weight.repeat(self.num_splits), self.bias.repeat(self.num_splits),
True, self.momentum, self.eps).view(N, C, H, W)
self.running_mean = torch.mean(self.running_mean.view(self.num_splits, self.num_features), dim=0)
self.running_var = torch.mean(self.running_var.view(self.num_splits, self.num_features), dim=0)
return outputs
else:
return F.batch_norm(
input, self.running_mean, self.running_var,
self.weight, self.bias, False, self.momentum, self.eps)
class FrozenBatchNorm(nn.Module):
"""
BatchNorm2d where the batch statistics and the affine parameters are fixed.
It contains non-trainable buffers called
"weight" and "bias", "running_mean", "running_var",
initialized to perform identity transformation.
The pre-trained backbone models from Caffe2 only contain "weight" and "bias",
which are computed from the original four parameters of BN.
The affine transform `x * weight + bias` will perform the equivalent
computation of `(x - running_mean) / sqrt(running_var) * weight + bias`.
When loading a backbone model from Caffe2, "running_mean" and "running_var"
will be left unchanged as identity transformation.
Other pre-trained backbone models may contain all 4 parameters.
The forward is implemented by `F.batch_norm(..., training=False)`.
"""
_version = 3
def __init__(self, num_features, eps=1e-5, **kwargs):
super().__init__()
self.num_features = num_features
self.eps = eps
self.register_buffer("weight", torch.ones(num_features))
self.register_buffer("bias", torch.zeros(num_features))
self.register_buffer("running_mean", torch.zeros(num_features))
self.register_buffer("running_var", torch.ones(num_features) - eps)
def forward(self, x):
if x.requires_grad:
# When gradients are needed, F.batch_norm will use extra memory
# because its backward op computes gradients for weight/bias as well.
scale = self.weight * (self.running_var + self.eps).rsqrt()
bias = self.bias - self.running_mean * scale
scale = scale.reshape(1, -1, 1, 1)
bias = bias.reshape(1, -1, 1, 1)
return x * scale + bias
else:
# When gradients are not needed, F.batch_norm is a single fused op
# and provide more optimization opportunities.
return F.batch_norm(
x,
self.running_mean,
self.running_var,
self.weight,
self.bias,
training=False,
eps=self.eps,
)
def _load_from_state_dict(
self, state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs
):
version = local_metadata.get("version", None)
if version is None or version < 2:
# No running_mean/var in early versions
# This will silent the warnings
if prefix + "running_mean" not in state_dict:
state_dict[prefix + "running_mean"] = torch.zeros_like(self.running_mean)
if prefix + "running_var" not in state_dict:
state_dict[prefix + "running_var"] = torch.ones_like(self.running_var)
if version is not None and version < 3:
logger = logging.getLogger(__name__)
logger.info("FrozenBatchNorm {} is upgraded to version 3.".format(prefix.rstrip(".")))
# In version < 3, running_var are used without +eps.
state_dict[prefix + "running_var"] -= self.eps
super()._load_from_state_dict(
state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs
)
def __repr__(self):
return "FrozenBatchNorm2d(num_features={}, eps={})".format(self.num_features, self.eps)
@classmethod
def convert_frozen_batchnorm(cls, module):
"""
Convert BatchNorm/SyncBatchNorm in module into FrozenBatchNorm.
Args:
module (torch.nn.Module):
Returns:
If module is BatchNorm/SyncBatchNorm, returns a new module.
Otherwise, in-place convert module and return it.
Similar to convert_sync_batchnorm in
https://github.com/pytorch/pytorch/blob/master/torch/nn/modules/batchnorm.py
"""
bn_module = nn.modules.batchnorm
bn_module = (bn_module.BatchNorm2d, bn_module.SyncBatchNorm)
res = module
if isinstance(module, bn_module):
res = cls(module.num_features)
if module.affine:
res.weight.data = module.weight.data.clone().detach()
res.bias.data = module.bias.data.clone().detach()
res.running_mean.data = module.running_mean.data
res.running_var.data = module.running_var.data
res.eps = module.eps
else:
for name, child in module.named_children():
new_child = cls.convert_frozen_batchnorm(child)
if new_child is not child:
res.add_module(name, new_child)
return res
def get_norm(norm, out_channels, **kwargs):
"""
Args:
norm (str or callable): either one of BN, GhostBN, FrozenBN, GN or SyncBN;
or a callable that takes a channel number and returns
the normalization layer as a nn.Module
out_channels: number of channels for normalization layer
Returns:
nn.Module or None: the normalization layer
"""
if isinstance(norm, str):
if len(norm) == 0:
return None
norm = {
"BN": BatchNorm,
"syncBN": SyncBatchNorm,
"GhostBN": GhostBatchNorm,
"FrozenBN": FrozenBatchNorm,
"GN": lambda channels, **args: nn.GroupNorm(32, channels),
}[norm]
return norm(out_channels, **kwargs)
# copy from https://github.com/xvjiarui/GCNet/blob/master/mmdet/ops/gcb/context_block.py
import torch
from torch import nn
__all__ = ['ContextBlock']
def last_zero_init(m):
if isinstance(m, nn.Sequential):
nn.init.constant_(m[-1].weight, val=0)
if hasattr(m[-1], 'bias') and m[-1].bias is not None:
nn.init.constant_(m[-1].bias, 0)
else:
nn.init.constant_(m.weight, val=0)
if hasattr(m, 'bias') and m.bias is not None:
nn.init.constant_(m.bias, 0)
class ContextBlock(nn.Module):
def __init__(self,
inplanes,
ratio,
pooling_type='att',
fusion_types=('channel_add',)):
super(ContextBlock, self).__init__()
assert pooling_type in ['avg', 'att']
assert isinstance(fusion_types, (list, tuple))
valid_fusion_types = ['channel_add', 'channel_mul']
assert all([f in valid_fusion_types for f in fusion_types])
assert len(fusion_types) > 0, 'at least one fusion should be used'
self.inplanes = inplanes
self.ratio = ratio
self.planes = int(inplanes * ratio)
self.pooling_type = pooling_type
self.fusion_types = fusion_types
if pooling_type == 'att':
self.conv_mask = nn.Conv2d(inplanes, 1, kernel_size=1)
self.softmax = nn.Softmax(dim=2)
else:
self.avg_pool = nn.AdaptiveAvgPool2d(1)
if 'channel_add' in fusion_types:
self.channel_add_conv = nn.Sequential(
nn.Conv2d(self.inplanes, self.planes, kernel_size=1),
nn.LayerNorm([self.planes, 1, 1]),
nn.ReLU(inplace=True), # yapf: disable
nn.Conv2d(self.planes, self.inplanes, kernel_size=1))
else:
self.channel_add_conv = None
if 'channel_mul' in fusion_types:
self.channel_mul_conv = nn.Sequential(
nn.Conv2d(self.inplanes, self.planes, kernel_size=1),
nn.LayerNorm([self.planes, 1, 1]),
nn.ReLU(inplace=True), # yapf: disable
nn.Conv2d(self.planes, self.inplanes, kernel_size=1))
else:
self.channel_mul_conv = None
self.reset_parameters()
def reset_parameters(self):
if self.pooling_type == 'att':
nn.init.kaiming_normal_(self.conv_mask.weight, a=0, mode='fan_in', nonlinearity='relu')
if hasattr(self.conv_mask, 'bias') and self.conv_mask.bias is not None:
nn.init.constant_(self.conv_mask.bias, 0)
self.conv_mask.inited = True
if self.channel_add_conv is not None:
last_zero_init(self.channel_add_conv)
if self.channel_mul_conv is not None:
last_zero_init(self.channel_mul_conv)
def spatial_pool(self, x):
batch, channel, height, width = x.size()
if self.pooling_type == 'att':
input_x = x
# [N, C, H * W]
input_x = input_x.view(batch, channel, height * width)
# [N, 1, C, H * W]
input_x = input_x.unsqueeze(1)
# [N, 1, H, W]
context_mask = self.conv_mask(x)
# [N, 1, H * W]
context_mask = context_mask.view(batch, 1, height * width)
# [N, 1, H * W]
context_mask = self.softmax(context_mask)
# [N, 1, H * W, 1]
context_mask = context_mask.unsqueeze(-1)
# [N, 1, C, 1]
context = torch.matmul(input_x, context_mask)
# [N, C, 1, 1]
context = context.view(batch, channel, 1, 1)
else:
# [N, C, 1, 1]
context = self.avg_pool(x)
return context
def forward(self, x):
# [N, C, 1, 1]
context = self.spatial_pool(x)
out = x
if self.channel_mul_conv is not None:
# [N, C, 1, 1]
channel_mul_term = torch.sigmoid(self.channel_mul_conv(context))
out = out * channel_mul_term
if self.channel_add_conv is not None:
# [N, C, 1, 1]
channel_add_term = self.channel_add_conv(context)
out = out + channel_add_term
return out
""" DropBlock, DropPath
PyTorch implementations of DropBlock and DropPath (Stochastic Depth) regularization layers.
Papers:
DropBlock: A regularization method for convolutional networks (https://arxiv.org/abs/1810.12890)
Deep Networks with Stochastic Depth (https://arxiv.org/abs/1603.09382)
Code:
DropBlock impl inspired by two Tensorflow impl that I liked:
- https://github.com/tensorflow/tpu/blob/master/models/official/resnet/resnet_model.py#L74
- https://github.com/clovaai/assembled-cnn/blob/master/nets/blocks.py
Hacked together by / Copyright 2020 Ross Wightman
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
def drop_block_2d(
x, drop_prob: float = 0.1, block_size: int = 7, gamma_scale: float = 1.0,
with_noise: bool = False, inplace: bool = False, batchwise: bool = False):
""" DropBlock. See https://arxiv.org/pdf/1810.12890.pdf
DropBlock with an experimental gaussian noise option. This layer has been tested on a few training
runs with success, but needs further validation and possibly optimization for lower runtime impact.
"""
B, C, H, W = x.shape
total_size = W * H
clipped_block_size = min(block_size, min(W, H))
# seed_drop_rate, the gamma parameter
gamma = gamma_scale * drop_prob * total_size / clipped_block_size ** 2 / (
(W - block_size + 1) * (H - block_size + 1))
# Forces the block to be inside the feature map.
w_i, h_i = torch.meshgrid(torch.arange(W).to(x.device), torch.arange(H).to(x.device))
valid_block = ((w_i >= clipped_block_size // 2) & (w_i < W - (clipped_block_size - 1) // 2)) & \
((h_i >= clipped_block_size // 2) & (h_i < H - (clipped_block_size - 1) // 2))
valid_block = torch.reshape(valid_block, (1, 1, H, W)).to(dtype=x.dtype)
if batchwise:
# one mask for whole batch, quite a bit faster
uniform_noise = torch.rand((1, C, H, W), dtype=x.dtype, device=x.device)
else:
uniform_noise = torch.rand_like(x)
block_mask = ((2 - gamma - valid_block + uniform_noise) >= 1).to(dtype=x.dtype)
block_mask = -F.max_pool2d(
-block_mask,
kernel_size=clipped_block_size, # block_size,
stride=1,
padding=clipped_block_size // 2)
if with_noise:
normal_noise = torch.randn((1, C, H, W), dtype=x.dtype, device=x.device) if batchwise else torch.randn_like(x)
if inplace:
x.mul_(block_mask).add_(normal_noise * (1 - block_mask))
else:
x = x * block_mask + normal_noise * (1 - block_mask)
else:
normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)).to(x.dtype)
if inplace:
x.mul_(block_mask * normalize_scale)
else:
x = x * block_mask * normalize_scale
return x
def drop_block_fast_2d(
x: torch.Tensor, drop_prob: float = 0.1, block_size: int = 7,
gamma_scale: float = 1.0, with_noise: bool = False, inplace: bool = False, batchwise: bool = False):
""" DropBlock. See https://arxiv.org/pdf/1810.12890.pdf
DropBlock with an experimental gaussian noise option. Simplied from above without concern for valid
block mask at edges.
"""
B, C, H, W = x.shape
total_size = W * H
clipped_block_size = min(block_size, min(W, H))
gamma = gamma_scale * drop_prob * total_size / clipped_block_size ** 2 / (
(W - block_size + 1) * (H - block_size + 1))
if batchwise:
# one mask for whole batch, quite a bit faster
block_mask = torch.rand((1, C, H, W), dtype=x.dtype, device=x.device) < gamma
else:
# mask per batch element
block_mask = torch.rand_like(x) < gamma
block_mask = F.max_pool2d(
block_mask.to(x.dtype), kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2)
if with_noise:
normal_noise = torch.randn((1, C, H, W), dtype=x.dtype, device=x.device) if batchwise else torch.randn_like(x)
if inplace:
x.mul_(1. - block_mask).add_(normal_noise * block_mask)
else:
x = x * (1. - block_mask) + normal_noise * block_mask
else:
block_mask = 1 - block_mask
normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)).to(dtype=x.dtype)
if inplace:
x.mul_(block_mask * normalize_scale)
else:
x = x * block_mask * normalize_scale
return x
class DropBlock2d(nn.Module):
""" DropBlock. See https://arxiv.org/pdf/1810.12890.pdf
"""
def __init__(self,
drop_prob=0.1,
block_size=7,
gamma_scale=1.0,
with_noise=False,
inplace=False,
batchwise=False,
fast=True):
super(DropBlock2d, self).__init__()
self.drop_prob = drop_prob
self.gamma_scale = gamma_scale
self.block_size = block_size
self.with_noise = with_noise
self.inplace = inplace
self.batchwise = batchwise
self.fast = fast # FIXME finish comparisons of fast vs not
def forward(self, x):
if not self.training or not self.drop_prob:
return x
if self.fast:
return drop_block_fast_2d(
x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace, self.batchwise)
else:
return drop_block_2d(
x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace, self.batchwise)
def drop_path(x, drop_prob: float = 0., training: bool = False):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
'survival rate' as the argument.
"""
if drop_prob == 0. or not training:
return x
keep_prob = 1 - drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_() # binarize
output = x.div(keep_prob) * random_tensor
return output
class DropPath(nn.Module):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
"""
def __init__(self, drop_prob=None):
super(DropPath, self).__init__()
self.drop_prob = drop_prob
def forward(self, x):
return drop_path(x, self.drop_prob, self.training)
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
import torch
from torch import nn
from torch.nn.modules.batchnorm import BatchNorm2d
from torch.nn import ReLU, LeakyReLU
from torch.nn.parameter import Parameter
class TLU(nn.Module):
def __init__(self, num_features):
"""max(y, tau) = max(y - tau, 0) + tau = ReLU(y - tau) + tau"""
super(TLU, self).__init__()
self.num_features = num_features
self.tau = Parameter(torch.Tensor(num_features))
self.reset_parameters()
def reset_parameters(self):
nn.init.zeros_(self.tau)
def extra_repr(self):
return 'num_features={num_features}'.format(**self.__dict__)
def forward(self, x):
return torch.max(x, self.tau.view(1, self.num_features, 1, 1))
class FRN(nn.Module):
def __init__(self, num_features, eps=1e-6, is_eps_leanable=False):
"""
weight = gamma, bias = beta
beta, gamma:
Variables of shape [1, 1, 1, C]. if TensorFlow
Variables of shape [1, C, 1, 1]. if PyTorch
eps: A scalar constant or learnable variable.
"""
super(FRN, self).__init__()
self.num_features = num_features
self.init_eps = eps
self.is_eps_leanable = is_eps_leanable
self.weight = Parameter(torch.Tensor(num_features))
self.bias = Parameter(torch.Tensor(num_features))
if is_eps_leanable:
self.eps = Parameter(torch.Tensor(1))
else:
self.register_buffer('eps', torch.Tensor([eps]))
self.reset_parameters()
def reset_parameters(self):
nn.init.ones_(self.weight)
nn.init.zeros_(self.bias)
if self.is_eps_leanable:
nn.init.constant_(self.eps, self.init_eps)
def extra_repr(self):
return 'num_features={num_features}, eps={init_eps}'.format(**self.__dict__)
def forward(self, x):
"""
0, 1, 2, 3 -> (B, H, W, C) in TensorFlow
0, 1, 2, 3 -> (B, C, H, W) in PyTorch
TensorFlow code
nu2 = tf.reduce_mean(tf.square(x), axis=[1, 2], keepdims=True)
x = x * tf.rsqrt(nu2 + tf.abs(eps))
# This Code include TLU function max(y, tau)
return tf.maximum(gamma * x + beta, tau)
"""
# Compute the mean norm of activations per channel.
nu2 = x.pow(2).mean(dim=[2, 3], keepdim=True)
# Perform FRN.
x = x * torch.rsqrt(nu2 + self.eps.abs())
# Scale and Bias
x = self.weight.view(1, self.num_features, 1, 1) * x + self.bias.view(1, self.num_features, 1, 1)
# x = self.weight * x + self.bias
return x
def bnrelu_to_frn(module):
"""
Convert 'BatchNorm2d + ReLU' to 'FRN + TLU'
"""
mod = module
before_name = None
before_child = None
is_before_bn = False
for name, child in module.named_children():
if is_before_bn and isinstance(child, (ReLU, LeakyReLU)):
# Convert BN to FRN
if isinstance(before_child, BatchNorm2d):
mod.add_module(
before_name, FRN(num_features=before_child.num_features))
else:
raise NotImplementedError()
# Convert ReLU to TLU
mod.add_module(name, TLU(num_features=before_child.num_features))
else:
mod.add_module(name, bnrelu_to_frn(child))
before_name = name
before_child = child
is_before_bn = isinstance(child, BatchNorm2d)
return mod
def convert(module, flag_name):
mod = module
before_ch = None
for name, child in module.named_children():
if hasattr(child, flag_name) and getattr(child, flag_name):
if isinstance(child, BatchNorm2d):
before_ch = child.num_features
mod.add_module(name, FRN(num_features=child.num_features))
# TODO bn is no good...
if isinstance(child, (ReLU, LeakyReLU)):
mod.add_module(name, TLU(num_features=before_ch))
else:
mod.add_module(name, convert(child, flag_name))
return mod
def remove_flags(module, flag_name):
mod = module
for name, child in module.named_children():
if hasattr(child, 'is_convert_frn'):
delattr(child, flag_name)
mod.add_module(name, remove_flags(child, flag_name))
else:
mod.add_module(name, remove_flags(child, flag_name))
return mod
def bnrelu_to_frn2(model, input_size=(3, 128, 128), batch_size=2, flag_name='is_convert_frn'):
forard_hooks = list()
backward_hooks = list()
is_before_bn = [False]
def register_forward_hook(module):
def hook(self, input, output):
if isinstance(module, (nn.Sequential, nn.ModuleList)) or (module == model):
is_before_bn.append(False)
return
# input and output is required in hook def
is_converted = is_before_bn[-1] and isinstance(self, (ReLU, LeakyReLU))
if is_converted:
setattr(self, flag_name, True)
is_before_bn.append(isinstance(self, BatchNorm2d))
forard_hooks.append(module.register_forward_hook(hook))
is_before_relu = [False]
def register_backward_hook(module):
def hook(self, input, output):
if isinstance(module, (nn.Sequential, nn.ModuleList)) or (module == model):
is_before_relu.append(False)
return
is_converted = is_before_relu[-1] and isinstance(self, BatchNorm2d)
if is_converted:
setattr(self, flag_name, True)
is_before_relu.append(isinstance(self, (ReLU, LeakyReLU)))
backward_hooks.append(module.register_backward_hook(hook))
# multiple inputs to the network
if isinstance(input_size, tuple):
input_size = [input_size]
# batch_size of 2 for batchnorm
x = [torch.rand(batch_size, *in_size) for in_size in input_size]
# register hook
model.apply(register_forward_hook)
model.apply(register_backward_hook)
# make a forward pass
output = model(*x)
output.sum().backward() # Raw output is not enabled to use backward()
# remove these hooks
for h in forard_hooks:
h.remove()
for h in backward_hooks:
h.remove()
model = convert(model, flag_name=flag_name)
model = remove_flags(model, flag_name=flag_name)
return model
# encoding: utf-8
"""
@author: xingyu liao
@contact: sherlockliao01@gmail.com
"""
# based on: https://github.com/open-mmlab/OpenSelfSup/blob/master/openselfsup/models/utils/gather_layer.py
import torch
import torch.distributed as dist
class GatherLayer(torch.autograd.Function):
"""Gather tensors from all process, supporting backward propagation.
"""
@staticmethod
def forward(ctx, input):
ctx.save_for_backward(input)
output = [torch.zeros_like(input) \
for _ in range(dist.get_world_size())]
dist.all_gather(output, input)
return tuple(output)
@staticmethod
def backward(ctx, *grads):
input, = ctx.saved_tensors
grad_out = torch.zeros_like(input)
grad_out[:] = grads[dist.get_rank()]
return grad_out
""" Layer/Module Helpers
Hacked together by / Copyright 2020 Ross Wightman
"""
import collections.abc
from itertools import repeat
# From PyTorch internals
def _ntuple(n):
def parse(x):
if isinstance(x, collections.abc.Iterable):
return x
return tuple(repeat(x, n))
return parse
to_1tuple = _ntuple(1)
to_2tuple = _ntuple(2)
to_3tuple = _ntuple(3)
to_4tuple = _ntuple(4)
to_ntuple = _ntuple
def make_divisible(v, divisor=8, min_value=None):
min_value = min_value or divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
# encoding: utf-8
import torch
from torch import nn
from .batch_norm import get_norm
class Non_local(nn.Module):
def __init__(self, in_channels, bn_norm, reduc_ratio=2):
super(Non_local, self).__init__()
self.in_channels = in_channels
self.inter_channels = reduc_ratio // reduc_ratio
self.g = nn.Conv2d(in_channels=self.in_channels, out_channels=self.inter_channels,
kernel_size=1, stride=1, padding=0)
self.W = nn.Sequential(
nn.Conv2d(in_channels=self.inter_channels, out_channels=self.in_channels,
kernel_size=1, stride=1, padding=0),
get_norm(bn_norm, self.in_channels),
)
nn.init.constant_(self.W[1].weight, 0.0)
nn.init.constant_(self.W[1].bias, 0.0)
self.theta = nn.Conv2d(in_channels=self.in_channels, out_channels=self.inter_channels,
kernel_size=1, stride=1, padding=0)
self.phi = nn.Conv2d(in_channels=self.in_channels, out_channels=self.inter_channels,
kernel_size=1, stride=1, padding=0)
def forward(self, x):
"""
:param x: (b, t, h, w)
:return x: (b, t, h, w)
"""
batch_size = x.size(0)
g_x = self.g(x).view(batch_size, self.inter_channels, -1)
g_x = g_x.permute(0, 2, 1)
theta_x = self.theta(x).view(batch_size, self.inter_channels, -1)
theta_x = theta_x.permute(0, 2, 1)
phi_x = self.phi(x).view(batch_size, self.inter_channels, -1)
f = torch.matmul(theta_x, phi_x)
N = f.size(-1)
f_div_C = f / N
y = torch.matmul(f_div_C, g_x)
y = y.permute(0, 2, 1).contiguous()
y = y.view(batch_size, self.inter_channels, *x.size()[2:])
W_y = self.W(y)
z = W_y + x
return z
# encoding: utf-8
"""
@author: l1aoxingyu
@contact: sherlockliao01@gmail.com
"""
import torch
import torch.nn.functional as F
from torch import nn
__all__ = [
'Identity',
'Flatten',
'GlobalAvgPool',
'GlobalMaxPool',
'GeneralizedMeanPooling',
'GeneralizedMeanPoolingP',
'FastGlobalAvgPool',
'AdaptiveAvgMaxPool',
'ClipGlobalAvgPool',
]
class Identity(nn.Module):
def __init__(self, *args, **kwargs):
super().__init__()
def forward(self, input):
return input
class Flatten(nn.Module):
def __init__(self, *args, **kwargs):
super().__init__()
def forward(self, input):
return input.view(input.size(0), -1, 1, 1)
class GlobalAvgPool(nn.AdaptiveAvgPool2d):
def __init__(self, output_size=1, *args, **kwargs):
super().__init__(output_size)
class GlobalMaxPool(nn.AdaptiveMaxPool2d):
def __init__(self, output_size=1, *args, **kwargs):
super().__init__(output_size)
class GeneralizedMeanPooling(nn.Module):
r"""Applies a 2D power-average adaptive pooling over an input signal composed of several input planes.
The function computed is: :math:`f(X) = pow(sum(pow(X, p)), 1/p)`
- At p = infinity, one gets Max Pooling
- At p = 1, one gets Average Pooling
The output is of size H x W, for any input size.
The number of output features is equal to the number of input planes.
Args:
output_size: the target output size of the image of the form H x W.
Can be a tuple (H, W) or a single H for a square image H x H
H and W can be either a ``int``, or ``None`` which means the size will
be the same as that of the input.
"""
def __init__(self, norm=3, output_size=(1, 1), eps=1e-6, *args, **kwargs):
super(GeneralizedMeanPooling, self).__init__()
assert norm > 0
self.p = float(norm)
self.output_size = output_size
self.eps = eps
def forward(self, x):
x = x.clamp(min=self.eps).pow(self.p)
return F.adaptive_avg_pool2d(x, self.output_size).pow(1. / self.p)
def __repr__(self):
return self.__class__.__name__ + '(' \
+ str(self.p) + ', ' \
+ 'output_size=' + str(self.output_size) + ')'
class GeneralizedMeanPoolingP(GeneralizedMeanPooling):
""" Same, but norm is trainable
"""
def __init__(self, norm=3, output_size=(1, 1), eps=1e-6, *args, **kwargs):
super(GeneralizedMeanPoolingP, self).__init__(norm, output_size, eps)
self.p = nn.Parameter(torch.ones(1) * norm)
class AdaptiveAvgMaxPool(nn.Module):
def __init__(self, output_size=1, *args, **kwargs):
super().__init__()
self.gap = FastGlobalAvgPool()
self.gmp = GlobalMaxPool(output_size)
def forward(self, x):
avg_feat = self.gap(x)
max_feat = self.gmp(x)
feat = avg_feat + max_feat
return feat
class FastGlobalAvgPool(nn.Module):
def __init__(self, flatten=False, *args, **kwargs):
super().__init__()
self.flatten = flatten
def forward(self, x):
if self.flatten:
in_size = x.size()
return x.view((in_size[0], in_size[1], -1)).mean(dim=2)
else:
return x.view(x.size(0), x.size(1), -1).mean(-1).view(x.size(0), x.size(1), 1, 1)
class ClipGlobalAvgPool(nn.Module):
def __init__(self, *args, **kwargs):
super().__init__()
self.avgpool = FastGlobalAvgPool()
def forward(self, x):
x = self.avgpool(x)
x = torch.clamp(x, min=0., max=1.)
return x
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
from torch import nn
class SELayer(nn.Module):
def __init__(self, channel, reduction=16):
super(SELayer, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Sequential(
nn.Linear(channel, int(channel / reduction), bias=False),
nn.ReLU(inplace=True),
nn.Linear(int(channel / reduction), channel, bias=False),
nn.Sigmoid()
)
def forward(self, x):
b, c, _, _ = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1, 1)
return x * y.expand_as(x)
# encoding: utf-8
"""
@author: xingyu liao
@contact: sherlockliao01@gmail.com
"""
import torch
import torch.nn.functional as F
from torch import nn
from torch.nn import Conv2d, ReLU
from torch.nn.modules.utils import _pair
from fastreid.layers import get_norm
class SplAtConv2d(nn.Module):
"""Split-Attention Conv2d
"""
def __init__(self, in_channels, channels, kernel_size, stride=(1, 1), padding=(0, 0),
dilation=(1, 1), groups=1, bias=True,
radix=2, reduction_factor=4,
rectify=False, rectify_avg=False, norm_layer=None,
dropblock_prob=0.0, **kwargs):
super(SplAtConv2d, self).__init__()
padding = _pair(padding)
self.rectify = rectify and (padding[0] > 0 or padding[1] > 0)
self.rectify_avg = rectify_avg
inter_channels = max(in_channels * radix // reduction_factor, 32)
self.radix = radix
self.cardinality = groups
self.channels = channels
self.dropblock_prob = dropblock_prob
if self.rectify:
from rfconv import RFConv2d
self.conv = RFConv2d(in_channels, channels * radix, kernel_size, stride, padding, dilation,
groups=groups * radix, bias=bias, average_mode=rectify_avg, **kwargs)
else:
self.conv = Conv2d(in_channels, channels * radix, kernel_size, stride, padding, dilation,
groups=groups * radix, bias=bias, **kwargs)
self.use_bn = norm_layer is not None
if self.use_bn:
self.bn0 = get_norm(norm_layer, channels * radix)
self.relu = ReLU(inplace=True)
self.fc1 = Conv2d(channels, inter_channels, 1, groups=self.cardinality)
if self.use_bn:
self.bn1 = get_norm(norm_layer, inter_channels)
self.fc2 = Conv2d(inter_channels, channels * radix, 1, groups=self.cardinality)
if dropblock_prob > 0.0:
self.dropblock = DropBlock2D(dropblock_prob, 3)
self.rsoftmax = rSoftMax(radix, groups)
def forward(self, x):
x = self.conv(x)
if self.use_bn:
x = self.bn0(x)
if self.dropblock_prob > 0.0:
x = self.dropblock(x)
x = self.relu(x)
batch, rchannel = x.shape[:2]
if self.radix > 1:
if torch.__version__ < '1.5':
splited = torch.split(x, int(rchannel // self.radix), dim=1)
else:
splited = torch.split(x, rchannel // self.radix, dim=1)
gap = sum(splited)
else:
gap = x
gap = F.adaptive_avg_pool2d(gap, 1)
gap = self.fc1(gap)
if self.use_bn:
gap = self.bn1(gap)
gap = self.relu(gap)
atten = self.fc2(gap)
atten = self.rsoftmax(atten).view(batch, -1, 1, 1)
if self.radix > 1:
if torch.__version__ < '1.5':
attens = torch.split(atten, int(rchannel // self.radix), dim=1)
else:
attens = torch.split(atten, rchannel // self.radix, dim=1)
out = sum([att * split for (att, split) in zip(attens, splited)])
else:
out = atten * x
return out.contiguous()
class rSoftMax(nn.Module):
def __init__(self, radix, cardinality):
super().__init__()
self.radix = radix
self.cardinality = cardinality
def forward(self, x):
batch = x.size(0)
if self.radix > 1:
x = x.view(batch, self.cardinality, self.radix, -1).transpose(1, 2)
x = F.softmax(x, dim=1)
x = x.reshape(batch, -1)
else:
x = torch.sigmoid(x)
return x
class DropBlock2D(object):
def __init__(self, *args, **kwargs):
raise NotImplementedError
# encoding: utf-8
"""
@author: xingyu liao
@contact: sherlockliao01@gmail.com
"""
import math
import warnings
import torch
from torch import nn, Tensor
def weights_init_kaiming(m):
classname = m.__class__.__name__
if classname.find('Linear') != -1:
nn.init.normal_(m.weight, 0, 0.01)
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
elif classname.find('Conv') != -1:
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
elif classname.find('BatchNorm') != -1:
if m.affine:
nn.init.constant_(m.weight, 1.0)
nn.init.constant_(m.bias, 0.0)
def weights_init_classifier(m):
classname = m.__class__.__name__
if classname.find('Linear') != -1:
nn.init.normal_(m.weight, std=0.001)
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
from torch.nn.init import _calculate_fan_in_and_fan_out
def _no_grad_trunc_normal_(tensor, mean, std, a, b):
# Cut & paste from PyTorch official master until it's in a few official releases - RW
# Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
def norm_cdf(x):
# Computes standard normal cumulative distribution function
return (1. + math.erf(x / math.sqrt(2.))) / 2.
if (mean < a - 2 * std) or (mean > b + 2 * std):
warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. "
"The distribution of values may be incorrect.",
stacklevel=2)
with torch.no_grad():
# Values are generated by using a truncated uniform distribution and
# then using the inverse CDF for the normal distribution.
# Get upper and lower cdf values
l = norm_cdf((a - mean) / std)
u = norm_cdf((b - mean) / std)
# Uniformly fill tensor with values from [l, u], then translate to
# [2l-1, 2u-1].
tensor.uniform_(2 * l - 1, 2 * u - 1)
# Use inverse cdf transform for normal distribution to get truncated
# standard normal
tensor.erfinv_()
# Transform to proper mean, std
tensor.mul_(std * math.sqrt(2.))
tensor.add_(mean)
# Clamp to ensure it's in the proper range
tensor.clamp_(min=a, max=b)
return tensor
def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.):
# type: (Tensor, float, float, float, float) -> Tensor
r"""Fills the input Tensor with values drawn from a truncated
normal distribution. The values are effectively drawn from the
normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)`
with values outside :math:`[a, b]` redrawn until they are within
the bounds. The method used for generating the random values works
best when :math:`a \leq \text{mean} \leq b`.
Args:
tensor: an n-dimensional `torch.Tensor`
mean: the mean of the normal distribution
std: the standard deviation of the normal distribution
a: the minimum cutoff value
b: the maximum cutoff value
Examples:
>>> w = torch.empty(3, 5)
>>> nn.init.trunc_normal_(w)
"""
return _no_grad_trunc_normal_(tensor, mean, std, a, b)
def variance_scaling_(tensor, scale=1.0, mode='fan_in', distribution='normal'):
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor)
if mode == 'fan_in':
denom = fan_in
elif mode == 'fan_out':
denom = fan_out
elif mode == 'fan_avg':
denom = (fan_in + fan_out) / 2
variance = scale / denom
if distribution == "truncated_normal":
# constant is stddev of standard normal truncated to (-2, 2)
trunc_normal_(tensor, std=math.sqrt(variance) / .87962566103423978)
elif distribution == "normal":
tensor.normal_(std=math.sqrt(variance))
elif distribution == "uniform":
bound = math.sqrt(3 * variance)
tensor.uniform_(-bound, bound)
else:
raise ValueError(f"invalid distribution {distribution}")
def lecun_normal_(tensor):
variance_scaling_(tensor, mode='fan_in', distribution='truncated_normal')
# encoding: utf-8
"""
@author: sherlock
@contact: sherlockliao01@gmail.com
"""
from . import losses
from .backbones import (
BACKBONE_REGISTRY,
build_resnet_backbone,
build_backbone,
)
from .heads import (
REID_HEADS_REGISTRY,
build_heads,
EmbeddingHead,
)
from .meta_arch import (
build_model,
META_ARCH_REGISTRY,
)
__all__ = [k for k in globals().keys() if not k.startswith("_")]
\ No newline at end of file
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
from .build import build_backbone, BACKBONE_REGISTRY
from .resnet import build_resnet_backbone
from .osnet import build_osnet_backbone
from .resnest import build_resnest_backbone
from .resnext import build_resnext_backbone
from .regnet import build_regnet_backbone, build_effnet_backbone
from .shufflenet import build_shufflenetv2_backbone
from .mobilenet import build_mobilenetv2_backbone
from .mobilenetv3 import build_mobilenetv3_backbone
from .repvgg import build_repvgg_backbone
from .vision_transformer import build_vit_backbone
# encoding: utf-8
"""
@author: liaoxingyu
@contact: sherlockliao01@gmail.com
"""
from ...utils.registry import Registry
BACKBONE_REGISTRY = Registry("BACKBONE")
BACKBONE_REGISTRY.__doc__ = """
Registry for backbones, which extract feature maps from images
The registered object must be a callable that accepts two arguments:
1. A :class:`fastreid.config.CfgNode`
It must returns an instance of :class:`Backbone`.
"""
def build_backbone(cfg):
"""
Build a backbone from `cfg.MODEL.BACKBONE.NAME`.
Returns:
an instance of :class:`Backbone`
"""
backbone_name = cfg.MODEL.BACKBONE.NAME
backbone = BACKBONE_REGISTRY.get(backbone_name)(cfg)
return backbone
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment