Commit 7246044d authored by mibaumgartner's avatar mibaumgartner
Browse files

Merge remote-tracking branch 'origin/master' into main

parents fcec502f 6f4c3333
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import torch
import torch.nn as nn
from abc import abstractmethod
from typing import Sequence, Callable, Union, Tuple
from nndet.arch.conv import NdParam
from nndet.arch.blocks.res import ResBasic
class AbstractBlock(nn.Module):
def __init__(self, out_channels: int, **kwargs):
"""
Basic building block of the encoder
"""
super().__init__(**kwargs)
self.out_channels = out_channels
def get_output_channels(self) -> int:
"""
Determine number of output channels of block
Returns:
int: number of output channels
"""
return self.out_channels
class StackedBlock(AbstractBlock):
expansion = 2
def __init__(self,
conv: Callable[[], nn.Module],
in_channels: int,
conv_kernel: NdParam,
stride: NdParam = None,
out_channels: int = None,
max_out_channels: int = None,
num_blocks: int = 1,
**kwargs):
"""
Plain stack of convolutions. Strides > 1 are applied at the beginning
by a strided convolution and the first convolution raises the number of
channels to :param:`out_channels`.
Args:
conv: conv generator to use for internal convolutions
in_channels: number of input channels
conv_kernel: kernel size of convolution
stride: Stride of first convolution. If None stride=1 will be used.
Defaults to None.
out_channels: If given, then number of output channels will be set
to this value. Otherwise the number of the input channels are
doubled. Defaults to None.
max_out_channels: Maximum number of output channels.
Defaults to None.
num_blocks: Number of blocks. Defaults to 1.
Raises:
ValueError: raise if given output channels are larger than max
output channels
"""
super().__init__(out_channels=None) # out_channels will be overwritten later
if (out_channels is not None and
max_out_channels is not None and
out_channels > max_out_channels):
raise ValueError("Output channels can not be larger"
"than max output channels")
if out_channels is None:
out_channels = in_channels * self.expansion
if max_out_channels is not None and out_channels > max_out_channels:
out_channels = max_out_channels
if stride is None:
stride = 1
if not isinstance(conv_kernel, Sequence):
conv_kernel = [conv_kernel] * conv.dim
padding = tuple([(i - 1) // 2 for i in conv_kernel])
_convs = []
_convs.append(self.build_block(
conv=conv, in_channels=in_channels, out_channels=out_channels,
kernel_size=conv_kernel, stride=stride, padding=padding, **kwargs))
for _ in range(num_blocks - 1):
_convs.append(self.build_block(
conv=conv, in_channels=out_channels, out_channels=out_channels,
kernel_size=conv_kernel, stride=1, padding=padding, **kwargs))
self.convs = nn.Sequential(*_convs)
self.out_channels = out_channels
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward tensor
Returns:
torch.Tensor: output tensor
"""
return self.convs(x)
@abstractmethod
def build_block(self, conv: Callable[[], nn.Module],
in_channels: int, out_channels: int,
kernel_size: NdParam,
stride: NdParam,
padding: NdParam,
) -> nn.Module:
raise NotImplementedError
class StackedConvBlock2(StackedBlock):
def build_block(self, conv: Callable, in_channels: int,
out_channels: int, kernel_size: NdParam,
stride: NdParam, padding: NdParam,
**kwargs) -> nn.Module:
"""
Build 2 consequtive convolutions
Args:
conv: generator for convolutions
in_channels: number of input channels
out_channels: number of output channels
kernel_size: kernel size oh convolutions
stride: stride of first convolution
padding: padding of convolutions
Returns:
nn.Module: stacked convolutions
"""
return torch.nn.Sequential(
conv(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size,
stride=stride, padding=padding, **kwargs),
conv(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size,
stride=1, padding=padding, **kwargs),
)
class StackedConvBlock3(StackedBlock):
def build_block(self, conv: Callable, in_channels: int,
out_channels: int, kernel_size: NdParam,
stride: NdParam, padding: NdParam,
**kwargs) -> nn.Module:
"""
Build 2 consequtive convolutions
Args:
conv: generator for convolutions
in_channels: number of input channels
out_channels: number of output channels
kernel_size: kernel size oh convolutions
stride: stride of first convolution
padding: padding of convolutions
Returns:
nn.Module: stacked convolutions
"""
return torch.nn.Sequential(
conv(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size,
stride=stride, padding=padding, **kwargs),
conv(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size,
stride=1, padding=padding, **kwargs),
conv(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size,
stride=1, padding=padding, **kwargs),
)
class StackedResidualBlock(StackedBlock):
def build_block(self, conv: Callable[[], nn.Module], in_channels: int,
out_channels: int, kernel_size: NdParam,
stride: NdParam, padding: NdParam,
**kwargs) -> nn.Module:
"""
Build Residual Block
Args:
conv: generator for convolutions
in_channels: number of input channels
out_channels: number of output channels
kernel_size: kernel size oh convolutions
stride: stride of first convolution
padding: padding of convolutions
Returns:
nn.Module: stacked convolutions
"""
return ResBasic(conv=conv, in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size, stride=stride,
padding=padding, **kwargs)
class StackedConvBlock(AbstractBlock):
expansion = 2
def __init__(self,
conv: Callable[[], nn.Module],
in_channels: int,
conv_kernel: Union[Tuple[int], int],
stride: Union[Tuple[int], int] = None,
out_channels: int = None,
max_out_channels: int = None,
num_blocks: int = 2,
**kwargs):
"""
Plain stack of convolutions. Strides > 1 are applied at the beginning
by a strided convolution and the first convolution raises the number of
channels to :param:`out_channels`.
Args:
conv: conv generator to use for internal convolutions
in_channels: number of input channels
conv_kernel: kernel size of convolution
stride: Stride of first convolution. If None stride=1 will be used.
Defaults to None.
out_channels: If given, then number of output channels will be set
to this value. Otherwise the number of the input channels are
doubled. Defaults to None.
max_out_channels: Maximum number of output channels.
Defaults to None.
num_blocks: Number of convolutions. Defaults to 2.
Raises:
ValueError: raise if given output channels are larger than max
output channels
"""
super().__init__(out_channels=None) # out_channels will be overwritten later
if (out_channels is not None and
max_out_channels is not None and
out_channels > max_out_channels):
raise ValueError("Output channels can not be larger"
"than max output channels")
if out_channels is None:
out_channels = in_channels * self.expansion
if max_out_channels is not None and out_channels > max_out_channels:
out_channels = max_out_channels
if stride is None:
stride = 1
if not isinstance(conv_kernel, Sequence):
conv_kernel = [conv_kernel] * conv.dim
padding = tuple([(i - 1) // 2 for i in conv_kernel])
_convs = []
_convs.append(conv(in_channels=in_channels,
out_channels=out_channels,
kernel_size=conv_kernel,
stride=stride,
padding=padding,
**kwargs))
for _ in range(num_blocks - 1):
_convs.append(conv(in_channels=out_channels,
out_channels=out_channels,
kernel_size=conv_kernel,
stride=1,
padding=padding,
**kwargs))
self.convs = nn.Sequential(*_convs)
self.out_channels = out_channels
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward tensor
Returns:
torch.Tensor: output tensor
"""
return self.convs(x)
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
Don't use these. Next nnDetection Version will introduce better/fixed implementations.
"""
import torch
import torch.nn as nn
from typing import Sequence, Callable, Optional
from functools import reduce
from loguru import logger
from nndet.arch.conv import nd_pool
from nndet.arch.conv import NdParam
class ResBasic(nn.Module):
def __init__(self,
conv: Callable,
in_channels: int,
out_channels: int,
kernel_size: NdParam,
stride: NdParam,
padding: NdParam,
attention: Optional[nn.Module] = None,
):
"""
Build a plan residual block
Zero init norm according to https://arxiv.org/abs/1706.02677
Avg pool in downsampling path https://arxiv.org/pdf/1812.01187.pdf
Args:
conv: generator for convolutions
in_channels: number of input channels
out_channels: number of output channels
kernel_size: kernel size oh convolutions
stride: stride of first convolution
padding: padding of convolutions
attention: additional attention layer applied after convolutions
"""
super().__init__()
logger.warning("ResidualBlock uses normal relu! This might not be "
"desired if conv uses a different non linearity")
self.conv1 = conv(in_channels, out_channels, kernel_size=kernel_size,
padding=padding, stride=stride)
self.conv2 = conv(out_channels, out_channels, kernel_size=kernel_size,
padding=padding, relu=None)
self.relu = nn.ReLU(inplace=True)
stride_prod = (reduce((lambda x, y: x * y), stride)
if isinstance(stride, Sequence) else stride)
if stride_prod > 1:
self.shortcut = nn.Sequential(
nd_pool("Avg", dim=conv.dim, kernel_size=stride, stride=stride),
conv(in_channels, out_channels, kernel_size=1, relu=None),
)
else:
self.shortcut = None
self.attention = attention
self.init_weights()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward input
Args:
x (torch.Tensor) : input tensor
Returns:
torch.Tensor: output tensor
"""
residual = x
out = self.conv1(x)
out = self.conv2(out)
if self.attention:
out = self.attention(out)
if self.shortcut:
residual = self.shortcut(x)
out += residual
out = self.relu(out)
return out
def init_weights(self) -> None:
try:
torch.nn.init.zeros_(self.conv2.norm.weight)
except:
logger.info(f"Zero init of last norm layer {self.conv2.norm} failed")
class ResBottleneck(nn.Module):
def __init__(self,
conv: Callable,
in_channels: int,
internal_channels: int,
kernel_size: NdParam,
stride: NdParam,
padding: NdParam,
expansion: int = 1,
attention: Optional[nn.Module] = None,
):
"""
Build a bottleneck residual block
Zero init norm according to https://arxiv.org/abs/1706.02677
Avg pool in downsampling path https://arxiv.org/pdf/1812.01187.pdf
in_channels -> internal_channels -> internal_channels * expansion
Args:
conv: generator for convolutions
in_channels: number of input channels
internal_channels: number of internal channels to use.
The number of output channels will be
internal_channels * expansion
kernel_size: kernel size oh convolutions
stride: stride of first convolution
padding: padding of convolutions
expansion: expansion for last conv block. Default expansion
is one to be compatible with modular encoder! Original
implementation uses expansion=4.
attention: additional attention layer applied after convolutions
"""
super().__init__()
logger.warning("ResidualBlock uses normal relu! This might not be "
"desired if conv uses a different non linearity")
out_channels = internal_channels * expansion
self.conv1 = conv(in_channels, internal_channels,
kernel_size=1, padding=0, stride=1,
)
self.conv2 = conv(internal_channels, internal_channels,
kernel_size=kernel_size, padding=padding, stride=stride,
)
self.conv3 = conv(internal_channels, out_channels,
kernel_size=1, padding=0, relu=None, stride=1,
)
self.relu = nn.ReLU(inplace=True)
# downsampling path
stride_prod = (reduce((lambda x, y: x * y), stride)
if isinstance(stride, Sequence) else stride)
if stride_prod > 1:
self.shortcut = nn.Sequential(
nd_pool("Avg", dim=conv.dim, kernel_size=stride, stride=stride),
conv(in_channels, out_channels, kernel_size=1, relu=None),
)
else:
self.shortcut = None
self.attention = attention
self.init_weights()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward input
Args:
x (torch.Tensor) : input tensor
Returns:
torch.Tensor: output tensor
"""
residual = x
out = self.conv1(x)
out = self.conv2(out)
out = self.conv3(out)
if self.attention:
out = self.attention(out)
if self.shortcut:
residual = self.shortcut(x)
out += residual
out = self.relu(out)
return out
def init_weights(self) -> None:
try:
torch.nn.init.zeros_(self.conv2.norm.weight)
except:
logger.info(f"Zero init of last norm layer {self.conv2.norm} failed")
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
Don't use these. Next nnDetection Version will introduce better/fixed implementations.
"""
import torch
import torch.nn as nn
from nndet.arch.conv import nd_pool, nd_conv
class SELayer(nn.Module):
def __init__(self,
dim: int,
in_channels: int,
reduction: int = 16,
):
"""
Squeeze and Excitation Layer
https://arxiv.org/abs/1709.01507
Args
dim: number of spatial dimensions
in_channels: number of input channels
reduction: channel reduction for internal computations
"""
super(SELayer, self).__init__()
self.pool = nd_pool("AdaptiveAvg", dim, 1)
self.fc = nn.Sequential(
nd_conv(dim, in_channels, in_channels // reduction,
kernel_size=1, stride=1, bias=False),
nn.ReLU(inplace=True),
nd_conv(dim, in_channels // reduction, in_channels,
kernel_size=1, stride=1, bias=False),
nn.Sigmoid(),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
y = self.pool(x)
y = self.fc(y)
return x * y
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import torch
import torch.nn as nn
from typing import Union, Callable, Any, Optional, Tuple, Sequence, Type
from nndet.arch.initializer import InitWeights_He
from nndet.arch.layers.norm import GroupNorm
NdParam = Union[int, Tuple[int, int], Tuple[int, int, int]]
class Generator:
def __init__(self, conv_cls, dim: int):
"""
Factory helper which saves the conv class and dimension to generate objects
Args:
conv_cls (callable): class of convolution
dim (int): number of spatial dimensions (in general 2 or 3)
"""
self.dim = dim
self.conv_cls = conv_cls
def __call__(self, *args, **kwargs) -> Any:
"""
Create object
Args:
*args: passed to object
**kwargs: passed to object
Returns:
Any
"""
return self.conv_cls(self.dim, *args, **kwargs)
class BaseConvNormAct(torch.nn.Sequential):
def __init__(self,
dim: int,
in_channels: int,
out_channels: int,
norm: Optional[Union[Callable[..., Type[nn.Module]], str]],
act: Optional[Union[Callable[..., Type[nn.Module]], str]],
kernel_size: Union[int, tuple],
stride: Union[int, tuple] = 1,
padding: Union[int, tuple] = 0,
dilation: Union[int, tuple] = 1,
groups: int = 1,
bias: bool = None,
transposed: bool = False,
norm_kwargs: Optional[dict] = None,
act_inplace: Optional[bool] = None,
act_kwargs: Optional[dict] = None,
initializer: Callable[[nn.Module], None] = None,
):
"""
Baseclass for default ordering:
conv -> norm -> activation
Args
dim: number of dimensions the convolution should be chosen for
in_channels: input channels
out_channels: output_channels
norm: type of normalization. If None, no normalization will be applied
kernel_size: size of convolution kernel
act: class of non linearity; if None no actication is used.
stride: convolution stride
padding: padding value
(if input or output padding depends on whether the convolution
is transposed or not)
dilation: convolution dilation
groups: number of convolution groups
bias: whether to include bias or not
If None, the bias will be determined dynamicaly: False
if a normalization follows otherwise True
transposed: whether the convolution should be transposed or not
norm_kwargs: keyword arguments for normalization layer
act_inplace: whether to perform activation inplce or not
If None, inplace will be determined dynamicaly: True
if a normalization follows otherwise False
act_kwargs: keyword arguments for non linearity layer.
initializer: initilize weights
"""
super().__init__()
# process optional arguments
norm_kwargs = {} if norm_kwargs is None else norm_kwargs
act_kwargs = {} if act_kwargs is None else act_kwargs
if "inplace" in act_kwargs:
raise ValueError("Use keyword argument to en-/disable inplace activations")
if act_inplace is None:
act_inplace = bool(norm is not None)
act_kwargs["inplace"] = act_inplace
# process dynamic values
bias = bool(norm is None) if bias is None else bias
conv = nd_conv(dim=dim,
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
transposed=transposed
)
self.add_module("conv", conv)
if norm is not None:
if isinstance(norm, str):
_norm = nd_norm(norm, dim, out_channels, **norm_kwargs)
else:
_norm = norm(dim, out_channels, **norm_kwargs)
self.add_module("norm", _norm)
if act is not None:
if isinstance(act, str):
_act = nd_act(act, dim, **act_kwargs)
else:
_act = act(**act_kwargs)
self.add_module("act", _act)
if initializer is not None:
self.apply(initializer)
class ConvInstanceRelu(BaseConvNormAct):
def __init__(self,
dim: int,
in_channels: int,
out_channels: int,
kernel_size: Union[int, tuple],
stride: Union[int, tuple] = 1,
padding: Union[int, tuple] = 0,
dilation: Union[int, tuple] = 1,
groups: int = 1,
bias: bool = None,
transposed: bool = False,
add_norm: bool = True,
add_act: bool = True,
act_inplace: Optional[bool] = None,
norm_eps: float = 1e-5,
norm_affine: bool = True,
initializer: Callable[[nn.Module], None] = None,
):
"""
Baseclass for default ordering:
conv -> norm -> activation
Args
dim: number of dimensions the convolution should be chosen for
in_channels: input channels
out_channels: output_channels
norm: type of normalization. If None, no normalization will be applied
kernel_size: size of convolution kernel
act: class of non linearity; if None no actication is used.
stride: convolution stride
padding: padding value
(if input or output padding depends on whether the convolution
is transposed or not)
dilation: convolution dilation
groups: number of convolution groups
bias: whether to include bias or not
If None the bias will be determined dynamicaly: False
if a normalization follows otherwise True
transposed: whether the convolution should be transposed or not
add_norm: add normalisation layer to conv block
add_act: add activation layer to conv block
act_inplace: whether to perform activation inplce or not
If None, inplace will be determined dynamicaly: True
if a normalization follows otherwise False
norm_eps: instance norm eps (see pytorch for more info)
norm_affine: instance affine parameter (see pytorch for more info)
initializer: initilize weights
"""
norm = "Instance" if add_norm else None
act = "ReLU" if add_act else None
super().__init__(
dim=dim,
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
transposed=transposed,
norm=norm,
act=act,
norm_kwargs={
"eps": norm_eps,
"affine": norm_affine,
},
act_inplace=act_inplace,
initializer=initializer,
)
class ConvGroupRelu(BaseConvNormAct):
def __init__(self,
dim: int,
in_channels: int,
out_channels: int,
kernel_size: Union[int, tuple],
stride: Union[int, tuple] = 1,
padding: Union[int, tuple] = 0,
dilation: Union[int, tuple] = 1,
groups: int = 1,
bias: bool = None,
transposed: bool = False,
add_norm: bool = True,
add_act: bool = True,
act_inplace: Optional[bool] = None,
norm_eps: float = 1e-5,
norm_affine: bool = True,
norm_channels_per_group: int = 16,
initializer: Callable[[nn.Module], None] = None,
):
"""
Baseclass for default ordering:
conv -> norm -> activation
Args
dim: number of dimensions the convolution should be chosen for
in_channels: input channels
out_channels: output_channels
norm: type of normalization. If None, no normalization will be applied
kernel_size: size of convolution kernel
act: class of non linearity; if None no actication is used.
stride: convolution stride
padding: padding value
(if input or output padding depends on whether the convolution
is transposed or not)
dilation: convolution dilation
groups: number of convolution groups
bias: whether to include bias or not
If None the bias will be determined dynamicaly: False
if a normalization follows otherwise True
transposed: whether the convolution should be transposed or not
add_norm: add normalisation layer to conv block
add_act: add activation layer to conv block
act_inplace: whether to perform activation inplce or not
If None, inplace will be determined dynamicaly: True
if a normalization follows otherwise False
norm_eps: instance norm eps (see pytorch for more info)
norm_affine: instance affine parameter (see pytorch for more info)
norm_channels_per_group: channels per group for group norm
initializer: initilize weights
"""
norm = "Group" if add_norm else None
act = "ReLU" if add_act else None
super().__init__(
dim=dim,
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias,
transposed=transposed,
norm=norm,
act=act,
norm_kwargs={
"eps": norm_eps,
"affine": norm_affine,
"channels_per_group": norm_channels_per_group,
},
act_inplace=act_inplace,
initializer=initializer,
)
def nd_conv(dim: int,
in_channels: int,
out_channels: int,
kernel_size: Union[int, tuple],
stride: Union[int, tuple] = 1,
padding: Union[int, tuple] = 0,
dilation: Union[int, tuple] = 1,
groups: int = 1,
bias: bool = True,
transposed: bool = False,
**kwargs,
) -> torch.nn.Module:
"""
Convolution Wrapper to Switch accross dimensions and transposed by a
single argument
Args
n_dim (int): number of dimensions the convolution should be chosen for
in_channels (int): input channels
out_channels (int): output_channels
kernel_size (int or Iterable): size of convolution kernel
stride (int or Iterable): convolution stride
padding (int or Iterable): padding value
(if input or output padding depends on whether the convolution
is transposed or not)
dilation (int or Iterable): convolution dilation
groups (int): number of convolution groups
bias (bool): whether to include bias or not
transposed (bool): whether the convolution should be transposed or not
Returns:
torch.nn.Module: generated module
See Also
Torch Convolutions:
* :class:`torch.nn.Conv1d`
* :class:`torch.nn.Conv2d`
* :class:`torch.nn.Conv3d`
* :class:`torch.nn.ConvTranspose1d`
* :class:`torch.nn.ConvTranspose2d`
* :class:`torch.nn.ConvTranspose3d`
"""
if transposed:
transposed_str = "Transpose"
else:
transposed_str = ""
conv_cls = getattr(torch.nn, f"Conv{transposed_str}{dim}d")
return conv_cls(in_channels=in_channels, out_channels=out_channels,
kernel_size=kernel_size, stride=stride, padding=padding,
dilation=dilation, groups=groups, bias=bias, **kwargs)
def nd_pool(pooling_type: str, dim: int, *args, **kwargs) -> torch.nn.Module:
"""
Wrapper to switch between different pooling types and convolutions by a single argument
Args
pooling_type (str): Type of Pooling, case sensitive.
Supported values are
* ``Max``
* ``Avg``
* ``AdaptiveAvg``
* ``AdaptiveMax``
n_dim (int): number of dimensions
*args : positional arguments of the chosen pooling class
**kwargs : keyword arguments of the chosen pooling class
Returns:
torch.nn.Module: generated module
See Also
Torch Pooling Classes:
* :class:`torch.nn.MaxPool1d`
* :class:`torch.nn.MaxPool2d`
* :class:`torch.nn.MaxPool3d`
* :class:`torch.nn.AvgPool1d`
* :class:`torch.nn.AvgPool2d`
* :class:`torch.nn.AvgPool3d`
* :class:`torch.nn.AdaptiveMaxPool1d`
* :class:`torch.nn.AdaptiveMaxPool2d`
* :class:`torch.nn.AdaptiveMaxPool3d`
* :class:`torch.nn.AdaptiveAvgPool1d`
* :class:`torch.nn.AdaptiveAvgPool2d`
* :class:`torch.nn.AdaptiveAvgPool3d`
"""
pool_cls = getattr(torch.nn, f"{pooling_type}Pool{dim}d")
return pool_cls(*args, **kwargs)
def nd_norm(norm_type: str, dim: int, *args, **kwargs) -> torch.nn.Module:
"""
Wrapper to switch between different types of normalization and
dimensions by a single argument
Args
norm_type (str): type of normalization, case sensitive.
Supported types are:
* ``Batch``
* ``Instance``
* ``LocalResponse``
* ``Group``
* ``Layer``
n_dim (int, None): dimension of normalization input; can be None if normalization
is dimension-agnostic (e.g. LayerNorm)
*args : positional arguments of chosen normalization class
**kwargs : keyword arguments of chosen normalization class
Returns
torch.nn.Module: generated module
See Also
Torch Normalizations:
* :class:`torch.nn.BatchNorm1d`
* :class:`torch.nn.BatchNorm2d`
* :class:`torch.nn.BatchNorm3d`
* :class:`torch.nn.InstanceNorm1d`
* :class:`torch.nn.InstanceNorm2d`
* :class:`torch.nn.InstanceNorm3d`
* :class:`torch.nn.LocalResponseNorm`
* :class:`nndet.arch.layers.norm.GroupNorm`
"""
if dim is None:
dim_str = ""
else:
dim_str = str(dim)
if norm_type.lower() == "group":
norm_cls = GroupNorm
else:
norm_cls = getattr(torch.nn, f"{norm_type}Norm{dim_str}d")
return norm_cls(*args, **kwargs)
def nd_act(act_type: str, dim: int, *args, **kwargs) -> torch.nn.Module:
"""
Helper to search for activations by string
The dim parameter is ignored.
Searches in torch.nn for activatio.
Args:
act_type: name of activation layer to look up.
dim: ignored
Returns:
torch.nn.Module: activation module
"""
act_cls = getattr(torch.nn, f"{act_type}")
return act_cls(*args, **kwargs)
def nd_dropout(dim: int, p: float = 0.5, inplace: bool = False, **kwargs) -> torch.nn.Module:
"""
Generate 1,2,3 dimensional dropout
Args:
dim (int): number of dimensions
p (float): doupout probability
inplace (bool): apply operation inplace
**kwargs: passed to dropout
Returns:
torch.nn.Module: generated module
"""
dropout_cls = getattr(torch.nn, "Dropout%dd" % dim)
return dropout_cls(p=p, inplace=inplace, **kwargs)
def compute_padding_for_kernel(kernel_size: Union[int, Sequence[int]]) -> \
Union[int, Tuple[int, int], Tuple[int, int, int]]:
"""
Compute padding such that feature maps keep their size with stride 1
Args:
kernel_size: kernel size to compute padding for
Returns:
Union[int, Tuple[int, int], Tuple[int, int, int]]: computed padding
"""
if isinstance(kernel_size, Sequence):
padding = tuple([(i - 1) // 2 for i in kernel_size])
else:
padding = (kernel_size - 1) // 2
return padding
def conv_kwargs_helper(norm: bool, activation: bool):
"""
Helper to force disable normalization and activation in layers
which have those by default
Args:
norm: en-/disable normalization layer
activation: en-/disable activation layer
Returns:
dict: keyword arguments to pass to conv generator
"""
kwargs = {
"add_norm": norm,
"add_act": activation,
}
return kwargs
from nndet.arch.decoder.base import BaseUFPN, UFPNModular, PAUFPN
This diff is collapsed.
from nndet.arch.encoder.abstract import AbstractEncoder
from nndet.arch.encoder.modular import Encoder
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import torch
import torch.nn as nn
from typing import List, Dict, Union, TypeVar
from abc import abstractmethod
__all__ = ["AbstractEncoder"]
class AbstractEncoder(nn.Module):
def __int__(self, **kwargs):
"""
Provides an abstract interface for backbone networks
"""
super().__init__(**kwargs)
@abstractmethod
def forward(self, x) -> List[torch.Tensor]:
"""
Forward input through network
Args
x (torch.tensor): input tensor
Returns
list: list with feature maps from multiple resolutions
"""
raise NotImplementedError
@abstractmethod
def get_channels(self) -> List[int]:
"""
Compute number of channels for each returned feature map
inside the forward pass
Returns
List[int]: list with number of channels corresponding to
returned feature maps
"""
raise NotImplementedError
@abstractmethod
def get_strides(self) -> List[Dict[str, Union[List[int], int]]]:
"""
Compute number backbone strides for 2d and 3d case and all options
of network
Returns
List[Dict[str, Union[List[int], int]]]: dict with 'xy' for 2d
stride and optional 'z' for 3d cases. List
describes stride at respective output level
"""
raise NotImplementedError
EncoderType = TypeVar('EncoderType', bound=AbstractEncoder)
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import torch
import torch.nn as nn
from typing import Callable, Tuple, Sequence, Union, List, Dict, Optional
from nndet.arch.encoder.abstract import AbstractEncoder
from nndet.arch.blocks.basic import AbstractBlock
__all__ = ["Encoder"]
class Encoder(AbstractEncoder):
def __init__(self,
conv: Callable[[], nn.Module],
conv_kernels: Sequence[Union[Tuple[int], int]],
strides: Sequence[Union[Tuple[int], int]],
block_cls: AbstractBlock,
in_channels: int,
start_channels: int,
stage_kwargs: Sequence[dict] = None,
out_stages: Sequence[int] = None,
max_channels: int = None,
first_block_cls: Optional[AbstractBlock] = None,
):
"""
Build a modular encoder model with specified blocks
The Encoder consists of "stages" which (in general) represent one
resolution in the resolution pyramid. The first level alwasys has
full resolution.
Args:
conv: conv generator to use for internal convolutions
strides: strides for pooling layers. Should have one
element less than conv_kernels
conv_kernels: kernel sizes for convolutions
block_cls: generate a block of convolutions (
e.g. stacked residual blocks)
in_channels: number of input channels
start_channels: number of start channels
stage_kwargs: additional keyword arguments for stages.
Defaults to None.
out_stages: define which stages should be returned. If `None` all
stages will be returned.Defaults to None.
first_block_cls: generate a block of convolutions for the first stage
By default this equal the provided block_cls
"""
super().__init__()
self.num_stages = len(conv_kernels)
self.dim = conv.dim
if stage_kwargs is None:
stage_kwargs = [{}] * self.num_stages
elif isinstance(stage_kwargs, dict):
stage_kwargs = [stage_kwargs] * self.num_stages
assert len(stage_kwargs) == len(conv_kernels)
if out_stages is None:
self.out_stages = list(range(self.num_stages))
else:
self.out_stages = out_stages
if first_block_cls is None:
first_block_cls = block_cls
stages = []
self.out_channels = []
if isinstance(strides[0], int):
strides = [tuple([s] * self.dim) for s in strides]
self.strides = strides
for stage_id in range(self.num_stages):
if stage_id == 0:
_block = first_block_cls(
conv=conv,
in_channels=in_channels,
out_channels=start_channels,
conv_kernel=conv_kernels[stage_id],
stride=None,
max_out_channels=max_channels,
**stage_kwargs[stage_id],
)
else:
_block = block_cls(
conv=conv,
in_channels=in_channels,
out_channels=None,
conv_kernel=conv_kernels[stage_id],
stride=strides[stage_id - 1],
max_out_channels=max_channels,
**stage_kwargs[stage_id],
)
in_channels = _block.get_output_channels()
self.out_channels.append(in_channels)
stages.append(_block)
self.stages = torch.nn.ModuleList(stages)
def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
"""
Forward data through encoder
Args:
x: input data
Returns:
List[torch.Tensor]: list of output from stages defined by
param:`out_stages`
"""
outputs = []
for stage_id, module in enumerate(self.stages):
x = module(x)
if stage_id in self.out_stages:
outputs.append(x)
return outputs
def get_channels(self) -> List[int]:
"""
Compute number of channels for each returned feature map inside the forward pass
Returns
list: list with number of channels corresponding to returned feature maps
"""
out_channels = []
for stage_id in range(self.num_stages):
if stage_id in self.out_stages:
out_channels.append(self.out_channels[stage_id])
return out_channels
def get_strides(self) -> List[List[int]]:
"""
Compute number backbone strides for 2d and 3d case and all options of network
Returns
List[List[int]]: defines the absolute stride for each output
feature map with respect to input size
"""
out_strides = []
for stage_id in range(self.num_stages):
if stage_id == 0:
out_strides.append([1] * self.dim)
else:
new_stride = [prev_stride * pool_size for prev_stride, pool_size
in zip(out_strides[stage_id - 1], self.strides[stage_id - 1])]
out_strides.append(new_stride)
return out_strides
from nndet.arch.heads.classifier import ClassifierType, Classifier
from nndet.arch.heads.comb import HeadType, AbstractHead
from nndet.arch.heads.regressor import RegressorType, Regressor
from nndet.arch.heads.segmenter import SegmenterType, Segmenter
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import torch
import math
import torch.nn as nn
from typing import Optional, TypeVar
from torch import Tensor
from abc import abstractmethod
from loguru import logger
from nndet.losses.classification import (
FocalLossWithLogits,
BCEWithLogitsLossOneHot,
CrossEntropyLoss,
)
CONV_TYPES = (nn.Conv2d, nn.Conv3d)
class Classifier(nn.Module):
@abstractmethod
def compute_loss(self, pred_logits: Tensor, targets: Tensor, **kwargs) -> Tensor:
"""
Compute classification loss (cross entropy loss)
Args:
pred_logits (Tensor): predicted logits
targets (Tensor): classification targets
Returns:
Tensor: classification loss
"""
raise NotImplementedError
@abstractmethod
def box_logits_to_probs(self, box_logits: Tensor) -> Tensor:
"""
Convert bounding box logits to probabilities
Args:
box_logits (Tensor): bounding box logits [N, C], C=number of classes
Returns:
Tensor: probabilities
"""
raise NotImplementedError
class BaseClassifier(Classifier):
def __init__(self,
conv,
in_channels: int,
internal_channels: int,
num_classes: int,
anchors_per_pos: int,
num_levels: int,
num_convs: int = 3,
add_norm: bool = True,
**kwargs
):
"""
Base class to build classifier heads with typical conv structure
conv(in, internal) -> num_convs x conv(internal, internal) ->
conv(internal, out)
Args:
conv: Convolution modules which handles a single layer
in_channels: number of input channels
internal_channels: number of channels internally used
num_classes: number of foreground classes
anchors_per_pos: number of anchors per position
num_levels: number of decoder levels which are passed through the
classifier
num_convs: number of convolutions
input_conv -> num_convs -> output_convs
add_norm: en-/disable normalization layers in internal layers
kwargs: keyword arguments passed to first and internal convolutions
Notes:
`self.loss` needs to be overwritten in subclasses
`self.logits_convert_fn` needs to be overwritten in subclasses
"""
super().__init__()
self.dim = conv.dim
self.num_levels = num_levels
self.num_convs = num_convs
self.num_classes = num_classes
self.anchors_per_pos = anchors_per_pos
self.in_channels = in_channels
self.internal_channels = internal_channels
self.conv_internal = self.build_conv_internal(conv, add_norm=add_norm, **kwargs)
self.conv_out = self.build_conv_out(conv)
self.loss: Optional[nn.Module] = None
self.logits_convert_fn: Optional[nn.Module] = None
self.init_weights()
def build_conv_internal(self, conv, **kwargs):
"""
Build internal convolutions
"""
_conv_internal = nn.Sequential()
_conv_internal.add_module(
name="c_in",
module=conv(
self.in_channels,
self.internal_channels,
kernel_size=3,
stride=1,
padding=1,
**kwargs,
))
for i in range(self.num_convs):
_conv_internal.add_module(
name=f"c_internal{i}",
module=conv(
self.internal_channels,
self.internal_channels,
kernel_size=3,
stride=1,
padding=1,
**kwargs,
))
return _conv_internal
def build_conv_out(self, conv):
"""
Build final convolutions
"""
out_channels = self.num_classes * self.anchors_per_pos
return conv(
self.internal_channels,
out_channels,
kernel_size=3,
stride=1,
padding=1,
add_norm=False,
add_act=False,
bias=True,
)
def forward(self,
x: torch.Tensor,
level: int,
**kwargs,
) -> torch.Tensor:
"""
Forward input
Args:
x (torch.Tensor): input feature map of size (N x C x Y x X x Z)
Returns:
torch.Tensor: classification logits for each anchor
(N x anchors x num_classes)
"""
class_logits = self.conv_out(self.conv_internal(x))
axes = (0, 2, 3, 1) if self.dim == 2 else (0, 2, 3, 4, 1)
class_logits = class_logits.permute(*axes)
class_logits = class_logits.contiguous()
class_logits = class_logits.view(x.size()[0], -1, self.num_classes)
return class_logits
def compute_loss(self, pred_logits: Tensor, targets: Tensor, **kwargs) -> Tensor:
"""
Base classifier with cross entropy loss (in general hard negative
example mining should be done before this)
Args:
pred_logits (Tensor): predicted logits
targets (Tensor): classification targets
Returns:
Tensor: classification loss
"""
return self.loss(pred_logits, targets.long(), **kwargs)
def box_logits_to_probs(self, box_logits: Tensor) -> Tensor:
"""
Convert bounding box logits to probabilities
Args:
box_logits (Tensor): bounding box logits [N, C]
N = number of anchors, C=number of foreground classes
Returns:
Tensor: probabilities
"""
return self.logits_convert_fn(box_logits)
def init_weights(self) -> None:
"""
Init weights with prior prob
"""
if self.prior_prob is not None:
logger.info(f"Init classifier weights: prior prob {self.prior_prob}")
for layer in self.modules():
if isinstance(layer, CONV_TYPES):
torch.nn.init.normal_(layer.weight, mean=0, std=0.01)
if layer.bias is not None:
torch.nn.init.constant_(layer.bias, 0)
# Use prior in model initialization to improve stability
bias_value = -math.log((1 - self.prior_prob) / self.prior_prob)
for layer in self.conv_out.modules():
if isinstance(layer, CONV_TYPES):
torch.nn.init.constant_(layer.bias, bias_value)
else:
logger.info("Init classifier weights: conv default")
class BCECLassifier(BaseClassifier):
def __init__(self,
conv,
in_channels: int,
internal_channels: int,
num_classes: int,
anchors_per_pos: int,
num_levels: int,
num_convs: int = 3,
add_norm: bool = True,
prior_prob: Optional[float] = None,
weight: Optional[Tensor] = None,
reduction: str = "mean",
smoothing: float = 0.0,
loss_weight: float = 1.,
**kwargs
):
"""
Classifier Head with sigmoid based BCE loss computation and prio
prob weight init
conv(in, internal) -> num_convs x conv(internal, internal) ->
conv(internal, out)
Args:
conv: Convolution modules which handles a single layer
in_channels: number of input channels
internal_channels: number of channels internally used
num_classes: number of foreground classes
anchors_per_pos: number of anchors per position
num_levels: number of decoder levels which are passed through the
classifier
num_convs: number of convolutions
input_conv -> num_convs -> output_convs
add_norm: en-/disable normalization layers in internal layers
prior_prob: initialize final conv with given prior probability
weight: weight in BCEWithLogitsLoss (see pytorch for more info)
reduction: reduction to apply to loss. 'sum' | 'mean' | 'none'
smoothing: label smoothing
loss_weight: scalar to balance multiple losses
kwargs: keyword arguments passed to first and internal convolutions
"""
self.prior_prob = prior_prob
super().__init__(
conv=conv,
in_channels=in_channels,
num_convs=num_convs,
add_norm=add_norm,
internal_channels=internal_channels,
num_classes=num_classes,
anchors_per_pos=anchors_per_pos,
num_levels=num_levels,
**kwargs,
)
self.loss = BCEWithLogitsLossOneHot(
num_classes=num_classes,
weight=weight,
reduction=reduction,
smoothing=smoothing,
loss_weight=loss_weight,
)
self.logits_convert_fn = nn.Sigmoid()
class CEClassifier(BaseClassifier):
def __init__(self,
conv,
in_channels: int,
internal_channels: int,
num_classes: int,
anchors_per_pos: int,
num_levels: int,
num_convs: int = 3,
add_norm: bool = True,
prior_prob: Optional[float] = None,
weight: Optional[Tensor] = None,
reduction: str = "mean",
loss_weight: float = 1.,
**kwargs
):
"""
Classifier Head with sigmoid based BCE loss computation and prio
prob weight init
conv(in, internal) -> num_convs x conv(internal, internal) ->
conv(internal, out)
Args:
conv: Convolution modules which handles a single layer
in_channels: number of input channels
internal_channels: number of channels internally used
num_classes: number of foreground classes
anchors_per_pos: number of anchors per position
num_levels: number of decoder levels which are passed through the
classifier
num_convs: number of convolutions
input_conv -> num_convs -> output_convs
add_norm: en-/disable normalization layers in internal layers
prior_prob: initialize final conv with given prior probability
weight: weight in cross entrpoy loss (see pytorch for more info)
reduction: reduction to apply to loss. 'sum' | 'mean' | 'none'
loss_weight: scalar to balance multiple losses
kwargs: keyword arguments passed to first and internal convolutions
"""
self.prior_prob = prior_prob
super().__init__(
conv=conv,
in_channels=in_channels,
num_convs=num_convs,
add_norm=add_norm,
internal_channels=internal_channels,
num_classes=num_classes + 1, # add one channel for background
anchors_per_pos=anchors_per_pos,
num_levels=num_levels,
**kwargs,
)
self.loss = CrossEntropyLoss(
weight=weight,
reduction=reduction,
loss_weight=loss_weight,
)
self.logits_convert_fn = nn.Softmax(dim=1)
def box_logits_to_probs(self, box_logits: Tensor) -> Tensor:
"""
Convert bounding box logits to probabilities
Args:
box_logits (Tensor): bounding box logits [N, C], C=number of classes
Returns:
Tensor: probabilities
"""
return self.logits_convert_fn(box_logits)[:, 1:] # remove background predictions
class FocalClassifier(BaseClassifier):
def __init__(self,
conv,
in_channels: int,
internal_channels: int,
num_classes: int,
anchors_per_pos: int,
num_levels: int,
num_convs: int = 3,
add_norm: bool = True,
prior_prob: Optional[float] = None,
gamma: float = 2,
alpha: float = -1,
reduction: str = "sum",
loss_weight: float = 1.,
**kwargs
):
"""
Classifier Head with sigmoid based BCE loss computation and
prio prob weight init
conv(in, internal) -> num_convs x conv(internal, internal) ->
conv(internal, out)
Args:
conv: Convolution modules which handles a single layer
in_channels: number of input channels
internal_channels: number of channels internally used
num_classes: number of foreground classes
anchors_per_pos: number of anchors per position
num_levels: number of decoder levels which are passed through the
classifier
num_convs: number of convolutions
input_conv -> num_convs -> output_convs
add_norm: en-/disable normalization layers in internal layers
prior_prob: initialize final conv with given prior probability
gamma: focal loss gamma
alpha: focal loss alpha
reduction: reduction to apply to loss. 'sum' | 'mean' | 'none'
loss_weight: scalar to balance multiple losses
kwargs: keyword arguments passed to first and internal convolutions
"""
self.prior_prob = prior_prob
super().__init__(
conv=conv,
in_channels=in_channels,
num_convs=num_convs,
add_norm=add_norm,
internal_channels=internal_channels,
num_classes=num_classes,
anchors_per_pos=anchors_per_pos,
num_levels=num_levels,
**kwargs,
)
self.loss = FocalLossWithLogits(
gamma=gamma,
alpha=alpha,
reduction=reduction,
loss_weight=loss_weight,
)
self.logits_convert_fn = nn.Sigmoid()
ClassifierType = TypeVar('ClassifierType', bound=Classifier)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from nndet.arch.layers.interpolation import (
Interpolate,
InterpolateToShapes,
InterpolateToShape,
MaxPoolToShapes,
)
from nndet.arch.layers.norm import GroupNorm
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment