import math from collections import OrderedDict import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Variable from torch.nn import init from .DCNv2.dcn_v2 import DCN BN_MOMENTUM = 0.1 def conv_bn(inp, oup, stride): return nn.Sequential( nn.Conv2d(inp, oup, 3, stride, 1, bias=False), nn.BatchNorm2d(oup), nn.ReLU(inplace=True) ) def conv_1x1_bn(inp, oup): return nn.Sequential( nn.Conv2d(inp, oup, 1, 1, 0, bias=False), nn.BatchNorm2d(oup), nn.ReLU(inplace=True) ) def channel_shuffle(x, groups): batchsize, num_channels, height, width = x.data.size() channels_per_group = num_channels // groups # reshape x = x.view(batchsize, groups, channels_per_group, height, width) x = torch.transpose(x, 1, 2).contiguous() # flatten x = x.view(batchsize, -1, height, width) return x def fill_up_weights(up): w = up.weight.data f = math.ceil(w.size(2) / 2) c = (2 * f - 1 - f % 2) / (2. * f) for i in range(w.size(2)): for j in range(w.size(3)): w[0, 0, i, j] = \ (1 - math.fabs(i / f - c)) * (1 - math.fabs(j / f - c)) for c in range(1, w.size(0)): w[c, 0, :, :] = w[0, 0, :, :] class InvertedResidual(nn.Module): def __init__(self, inp, oup, stride, benchmodel): super(InvertedResidual, self).__init__() self.benchmodel = benchmodel self.stride = stride assert stride in [1, 2] oup_inc = oup//2 if self.benchmodel == 1: #assert inp == oup_inc self.banch2 = nn.Sequential( # pw nn.Conv2d(oup_inc, oup_inc, 1, 1, 0, bias=False), nn.BatchNorm2d(oup_inc), nn.ReLU(inplace=True), # dw nn.Conv2d(oup_inc, oup_inc, 3, stride, 1, groups=oup_inc, bias=False), nn.BatchNorm2d(oup_inc), # pw-linear nn.Conv2d(oup_inc, oup_inc, 1, 1, 0, bias=False), nn.BatchNorm2d(oup_inc), nn.ReLU(inplace=True), ) else: self.banch1 = nn.Sequential( # dw nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False), nn.BatchNorm2d(inp), # pw-linear nn.Conv2d(inp, oup_inc, 1, 1, 0, bias=False), nn.BatchNorm2d(oup_inc), nn.ReLU(inplace=True), ) self.banch2 = nn.Sequential( # pw nn.Conv2d(inp, oup_inc, 1, 1, 0, bias=False), nn.BatchNorm2d(oup_inc), nn.ReLU(inplace=True), # dw nn.Conv2d(oup_inc, oup_inc, 3, stride, 1, groups=oup_inc, bias=False), nn.BatchNorm2d(oup_inc), # pw-linear nn.Conv2d(oup_inc, oup_inc, 1, 1, 0, bias=False), nn.BatchNorm2d(oup_inc), nn.ReLU(inplace=True), ) @staticmethod def _concat(x, out): # concatenate along channel axis return torch.cat((x, out), 1) def forward(self, x): if 1==self.benchmodel: x1 = x[:, :(x.shape[1]//2), :, :] x2 = x[:, (x.shape[1]//2):, :, :] out = self._concat(x1, self.banch2(x2)) elif 2==self.benchmodel: out = self._concat(self.banch1(x), self.banch2(x)) return channel_shuffle(out, 2) class ShuffleNetV2(nn.Module): def __init__(self, input_size=512, width_mult=1.): super(ShuffleNetV2, self).__init__() self.inplanes = 24 self.deconv_with_bias = False assert input_size % 32 == 0 self.stage_repeats = [4, 8, 4] #self.stage_repeats = [2, 3, 2] # index 0 is invalid and should never be called. # only used for indexing convenience. if width_mult == 0.5: self.stage_out_channels = [-1, 24, 48, 96, 192, 1024] elif width_mult == 1.0: self.stage_out_channels = [-1, 24, 116, 232, 464, 1024] elif width_mult == 1.5: self.stage_out_channels = [-1, 24, 176, 352, 704, 1024] elif width_mult == 2.0: self.stage_out_channels = [-1, 24, 224, 488, 976, 2048] else: raise ValueError( """{} groups is not supported for 1x1 Grouped Convolutions""".format(num_groups)) # building first layer input_channel = self.stage_out_channels[1] self.conv1 = conv_bn(3, input_channel, 2) self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) self.features = [] # building inverted residual blocks for idxstage in range(len(self.stage_repeats)): numrepeat = self.stage_repeats[idxstage] output_channel = self.stage_out_channels[idxstage+2] for i in range(numrepeat): if i == 0: #inp, oup, stride, benchmodel): self.features.append(InvertedResidual(input_channel, output_channel, 2, 2)) else: self.features.append(InvertedResidual(input_channel, output_channel, 1, 1)) input_channel = output_channel self.inplanes = output_channel # make it nn.Sequential self.features = nn.Sequential(*self.features) # consider here to add the last sevearal layers # building last several layers # self.conv_last = conv_1x1_bn(input_channel, self.stage_out_channels[-1]) # self.globalpool = nn.Sequential(nn.AvgPool2d(int(input_size/32))) # used for deconv layers self.deconv_layers = self._make_deconv_layer( 3, [256, 256, 256], [4, 4, 4], ) def _get_deconv_cfg(self, deconv_kernel, index): if deconv_kernel == 4: padding = 1 output_padding = 0 elif deconv_kernel == 3: padding = 1 output_padding = 1 elif deconv_kernel == 2: padding = 0 output_padding = 0 return deconv_kernel, padding, output_padding def _make_deconv_layer(self, num_layers, num_filters, num_kernels): assert num_layers == len(num_filters), \ 'ERROR: num_deconv_layers is different len(num_deconv_filters)' assert num_layers == len(num_kernels), \ 'ERROR: num_deconv_layers is different len(num_deconv_filters)' layers = [] for i in range(num_layers): kernel, padding, output_padding = \ self._get_deconv_cfg(num_kernels[i], i) planes = num_filters[i] fc = DCN(self.inplanes, planes, kernel_size=(3,3), stride=1, padding=1, dilation=1, deformable_groups=1) # fc = nn.Conv2d(self.inplanes, planes, # kernel_size=3, stride=1, # padding=1, dilation=1, bias=False) # fill_fc_weights(fc) up = nn.ConvTranspose2d( in_channels=planes, out_channels=planes, kernel_size=kernel, stride=2, padding=padding, output_padding=output_padding, bias=self.deconv_with_bias) fill_up_weights(up) layers.append(fc) layers.append(nn.BatchNorm2d(planes)) layers.append(nn.ReLU(inplace=True)) layers.append(up) layers.append(nn.BatchNorm2d(planes)) layers.append(nn.ReLU(inplace=True)) self.inplanes = planes return nn.Sequential(*layers) def init_weights(self, pretrained=True): if pretrained: # print('=> init resnet deconv weights from normal distribution') print('=> init deconv weights from normal distribution') for name, m in self.deconv_layers.named_modules(): if isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) #pretrained_state_dict = torch.load(pretrained) #address = "/data/pretrained_model/shufflenetv2_x1_69.390_88.412.pth.tar" #pretrained_state_dict = torch.load(address) #self.load_state_dict(pretrained_state_dict, strict=False) def forward(self, x): #import pdb; pdb.set_trace() x = self.conv1(x) x = self.maxpool(x) x = self.features(x) x = self.deconv_layers(x) return x def shufflenetv2(width_mult=1.): model = ShuffleNetV2(width_mult=width_mult) return model def get_shufflev2_net(num_layers, cfg): model = ShuffleNetV2() model.init_weights( pretrained=True) return model