misc.py 5.96 KB
Newer Older
eellison's avatar
eellison committed
1
2
3
from collections import OrderedDict
from torch.jit.annotations import Optional, List
from torch import Tensor
4
5
6
7
8
9
10
11
12
13
14
15

"""
helper class that supports empty tensors on some nn functions.

Ideally, add support directly in PyTorch to empty tensors in
those functions.

This can be removed once https://github.com/pytorch/pytorch/issues/12013
is implemented
"""

import math
16
import warnings
17
import torch
eellison's avatar
eellison committed
18
19
20
from torchvision.ops import _new_empty_tensor
from torch.nn import Module, Conv2d
import torch.nn.functional as F
21
22
23


class ConvTranspose2d(torch.nn.ConvTranspose2d):
24
25
26
27
28
    """
    Equivalent to nn.ConvTranspose2d, but with support for empty batch sizes.
    This will eventually be supported natively by PyTorch, and this
    class can go away.
    """
29
30
    def forward(self, x):
        if x.numel() > 0:
eellison's avatar
eellison committed
31
            return self.super_forward(x)
32
33
34
35
36
37
        # get output shape

        output_shape = [
            (i - 1) * d - 2 * p + (di * (k - 1) + 1) + op
            for i, p, di, k, d, op in zip(
                x.shape[-2:],
eellison's avatar
eellison committed
38
39
40
41
42
                list(self.padding),
                list(self.dilation),
                list(self.kernel_size),
                list(self.stride),
                list(self.output_padding),
43
44
            )
        ]
45
        output_shape = [x.shape[0], self.out_channels] + output_shape
eellison's avatar
eellison committed
46
47
48
49
50
51
52
53
54
55
56
57
        return _new_empty_tensor(x, output_shape)

    def super_forward(self, input, output_size=None):
        # type: (Tensor, Optional[List[int]]) -> Tensor
        if self.padding_mode != 'zeros':
            raise ValueError('Only `zeros` padding mode is supported for ConvTranspose2d')

        output_padding = self._output_padding(input, output_size, self.stride, self.padding, self.kernel_size)

        return F.conv_transpose2d(
            input, self.weight, self.bias, self.stride, self.padding,
            output_padding, self.groups, self.dilation)
58
59
60


class BatchNorm2d(torch.nn.BatchNorm2d):
61
62
63
64
65
    """
    Equivalent to nn.BatchNorm2d, but with support for empty batch sizes.
    This will eventually be supported natively by PyTorch, and this
    class can go away.
    """
66
67
68
69
70
    def forward(self, x):
        if x.numel() > 0:
            return super(BatchNorm2d, self).forward(x)
        # get output shape
        output_shape = x.shape
eellison's avatar
eellison committed
71
72
73
74
75
76
77
78
79
        return _new_empty_tensor(x, output_shape)


def _check_size_scale_factor(dim, size, scale_factor):
    # type: (int, Optional[List[int]], Optional[float]) -> None
    if size is None and scale_factor is None:
        raise ValueError("either size or scale_factor should be defined")
    if size is not None and scale_factor is not None:
        raise ValueError("only one of size or scale_factor should be defined")
80
    if scale_factor is not None:
eellison's avatar
eellison committed
81
82
83
84
85
86
        if isinstance(scale_factor, (list, tuple)):
            if len(scale_factor) != dim:
                raise ValueError(
                    "scale_factor shape must match input shape. "
                    "Input is {}D, scale_factor size is {}".format(dim, len(scale_factor))
                )
87
88


eellison's avatar
eellison committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
def _output_size(dim, input, size, scale_factor):
    # type: (int, Tensor, Optional[List[int]], Optional[float]) -> List[int]
    assert dim == 2
    _check_size_scale_factor(dim, size, scale_factor)
    if size is not None:
        return size
    # if dim is not 2 or scale_factor is iterable use _ntuple instead of concat
    assert scale_factor is not None and isinstance(scale_factor, (int, float))
    scale_factors = [scale_factor, scale_factor]
    # math.floor might return float in py2.7
    return [
        int(math.floor(input.size(i + 2) * scale_factors[i])) for i in range(dim)
    ]


def interpolate(input, size=None, scale_factor=None, mode="nearest", align_corners=None):
    # type: (Tensor, Optional[List[int]], Optional[float], str, Optional[bool]) -> Tensor
106
107
108
109
110
    """
    Equivalent to nn.functional.interpolate, but with support for empty batch sizes.
    This will eventually be supported natively by PyTorch, and this
    class can go away.
    """
111
112
113
114
115
    if input.numel() > 0:
        return torch.nn.functional.interpolate(
            input, size, scale_factor, mode, align_corners
        )

eellison's avatar
eellison committed
116
    output_shape = _output_size(2, input, size, scale_factor)
117
    output_shape = list(input.shape[:-2]) + list(output_shape)
eellison's avatar
eellison committed
118
    return _new_empty_tensor(input, output_shape)
119
120
121


# This is not in nn
122
class FrozenBatchNorm2d(torch.nn.Module):
123
124
125
126
127
    """
    BatchNorm2d where the batch statistics and the affine parameters
    are fixed
    """

128
129
130
131
132
133
    def __init__(self, num_features, eps=0., n=None):
        # n=None for backward-compatibility
        if n is not None:
            warnings.warn("`n` argument is deprecated and has been renamed `num_features`",
                          DeprecationWarning)
            num_features = n
134
        super(FrozenBatchNorm2d, self).__init__()
135
136
137
138
139
        self.eps = eps
        self.register_buffer("weight", torch.ones(num_features))
        self.register_buffer("bias", torch.zeros(num_features))
        self.register_buffer("running_mean", torch.zeros(num_features))
        self.register_buffer("running_var", torch.ones(num_features))
140

141
142
143
144
145
146
147
148
149
150
    def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
                              missing_keys, unexpected_keys, error_msgs):
        num_batches_tracked_key = prefix + 'num_batches_tracked'
        if num_batches_tracked_key in state_dict:
            del state_dict[num_batches_tracked_key]

        super(FrozenBatchNorm2d, self)._load_from_state_dict(
            state_dict, prefix, local_metadata, strict,
            missing_keys, unexpected_keys, error_msgs)

151
152
153
154
155
156
157
    def forward(self, x):
        # move reshapes to the beginning
        # to make it fuser-friendly
        w = self.weight.reshape(1, -1, 1, 1)
        b = self.bias.reshape(1, -1, 1, 1)
        rv = self.running_var.reshape(1, -1, 1, 1)
        rm = self.running_mean.reshape(1, -1, 1, 1)
158
        scale = w * (rv + self.eps).rsqrt()
159
160
        bias = b - rm * scale
        return x * scale + bias
161
162
163

    def __repr__(self):
        return f"{self.__class__.__name__}({self.weight.shape[0]})"