# coding=utf-8 # SPDX-FileCopyrightText: Copyright (c) 2022 The torch-harmonics Authors. All rights reserved. # SPDX-License-Identifier: BSD-3-Clause # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # 3. Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # from functools import partial from collections import OrderedDict from copy import Error, deepcopy from re import S from numpy.lib.arraypad import pad import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import torch.fft from torch.nn.modules.container import Sequential from torch.utils.checkpoint import checkpoint, checkpoint_sequential from torch.cuda import amp from typing import Optional import math from torch_harmonics import * from models.contractions import * from models.activations import * from models.factorizations import get_contract_fun # # import FactorizedTensor from tensorly for tensorized operations # import tensorly as tl # from tensorly.plugins import use_opt_einsum # tl.set_backend('pytorch') # use_opt_einsum('optimal') from tltorch.factorized_tensors.core import FactorizedTensor def _no_grad_trunc_normal_(tensor, mean, std, a, b): # Cut & paste from PyTorch official master until it's in a few official releases - RW # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf def norm_cdf(x): # Computes standard normal cumulative distribution function return (1. + math.erf(x / math.sqrt(2.))) / 2. if (mean < a - 2 * std) or (mean > b + 2 * std): warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2) with torch.no_grad(): # Values are generated by using a truncated uniform distribution and # then using the inverse CDF for the normal distribution. # Get upper and lower cdf values l = norm_cdf((a - mean) / std) u = norm_cdf((b - mean) / std) # Uniformly fill tensor with values from [l, u], then translate to # [2l-1, 2u-1]. tensor.uniform_(2 * l - 1, 2 * u - 1) # Use inverse cdf transform for normal distribution to get truncated # standard normal tensor.erfinv_() # Transform to proper mean, std tensor.mul_(std * math.sqrt(2.)) tensor.add_(mean) # Clamp to ensure it's in the proper range tensor.clamp_(min=a, max=b) return tensor def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.): r"""Fills the input Tensor with values drawn from a truncated normal distribution. The values are effectively drawn from the normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` with values outside :math:`[a, b]` redrawn until they are within the bounds. The method used for generating the random values works best when :math:`a \leq \text{mean} \leq b`. Args: tensor: an n-dimensional `torch.Tensor` mean: the mean of the normal distribution std: the standard deviation of the normal distribution a: the minimum cutoff value b: the maximum cutoff value Examples: >>> w = torch.empty(3, 5) >>> nn.init.trunc_normal_(w) """ return _no_grad_trunc_normal_(tensor, mean, std, a, b) @torch.jit.script def drop_path(x: torch.Tensor, drop_prob: float = 0., training: bool = False) -> torch.Tensor: """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the argument. """ if drop_prob == 0. or not training: return x keep_prob = 1. - drop_prob shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2d ConvNets random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) random_tensor.floor_() # binarize output = x.div(keep_prob) * random_tensor return output class DropPath(nn.Module): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). """ def __init__(self, drop_prob=None): super(DropPath, self).__init__() self.drop_prob = drop_prob def forward(self, x): return drop_path(x, self.drop_prob, self.training) class MLP(nn.Module): def __init__(self, in_features, hidden_features = None, out_features = None, act_layer = nn.GELU, output_bias = True, drop_rate = 0., checkpointing = False): super(MLP, self).__init__() self.checkpointing = checkpointing out_features = out_features or in_features hidden_features = hidden_features or in_features fc1 = nn.Conv2d(in_features, hidden_features, 1, bias=True) # ln1 = norm_layer(num_features=hidden_features) act = act_layer() fc2 = nn.Conv2d(hidden_features, out_features, 1, bias = output_bias) if drop_rate > 0.: drop = nn.Dropout(drop_rate) self.fwd = nn.Sequential(fc1, act, drop, fc2, drop) else: self.fwd = nn.Sequential(fc1, act, fc2) @torch.jit.ignore def checkpoint_forward(self, x): return checkpoint(self.fwd, x) def forward(self, x): if self.checkpointing: return self.checkpoint_forward(x) else: return self.fwd(x) class RealFFT2(nn.Module): """ Helper routine to wrap FFT similarly to the SHT """ def __init__(self, nlat, nlon, lmax = None, mmax = None): super(RealFFT2, self).__init__() self.nlat = nlat self.nlon = nlon self.lmax = lmax or self.nlat self.mmax = mmax or self.nlon // 2 + 1 def forward(self, x): y = torch.fft.rfft2(x, dim=(-2, -1), norm="ortho") y = torch.cat((y[..., :math.ceil(self.lmax/2), :self.mmax], y[..., -math.floor(self.lmax/2):, :self.mmax]), dim=-2) return y class InverseRealFFT2(nn.Module): """ Helper routine to wrap FFT similarly to the SHT """ def __init__(self, nlat, nlon, lmax = None, mmax = None): super(InverseRealFFT2, self).__init__() self.nlat = nlat self.nlon = nlon self.lmax = lmax or self.nlat self.mmax = mmax or self.nlon // 2 + 1 def forward(self, x): return torch.fft.irfft2(x, dim=(-2, -1), s=(self.nlat, self.nlon), norm="ortho") class SpectralConvS2(nn.Module): """ Spectral Convolution according to Driscoll & Healy. Designed for convolutions on the two-sphere S2 using the Spherical Harmonic Transforms in torch-harmonics, but supports convolutions on the periodic domain via the RealFFT2 and InverseRealFFT2 wrappers. """ def __init__(self, forward_transform, inverse_transform, in_channels, out_channels, scale = 'auto', operator_type = 'diagonal', rank = 0.2, factorization = None, separable = False, implementation = 'factorized', decomposition_kwargs=dict(), bias = False): super(SpectralConvS2, self).__init__() if scale == 'auto': scale = (1 / (in_channels * out_channels)) self.forward_transform = forward_transform self.inverse_transform = inverse_transform self.modes_lat = self.inverse_transform.lmax self.modes_lon = self.inverse_transform.mmax self.scale_residual = (self.forward_transform.nlat != self.inverse_transform.nlat) \ or (self.forward_transform.nlon != self.inverse_transform.nlon) # Make sure we are using a Complex Factorized Tensor if factorization is None: factorization = 'Dense' # No factorization if not factorization.lower().startswith('complex'): factorization = f'Complex{factorization}' # remember factorization details self.operator_type = operator_type self.rank = rank self.factorization = factorization self.separable = separable assert self.inverse_transform.lmax == self.modes_lat assert self.inverse_transform.mmax == self.modes_lon weight_shape = [in_channels] if not self.separable: weight_shape += [out_channels] if self.operator_type == 'diagonal': weight_shape += [self.modes_lat, self.modes_lon] elif self.operator_type == 'block-diagonal': weight_shape += [self.modes_lat, self.modes_lon, self.modes_lon] elif self.operator_type == 'vector': weight_shape += [self.modes_lat] else: raise NotImplementedError(f"Unkonw operator type f{self.operator_type}") # form weight tensors self.weight = FactorizedTensor.new(weight_shape, rank=self.rank, factorization=factorization, fixed_rank_modes=False, **decomposition_kwargs) # initialization of weights self.weight.normal_(0, scale) self._contract = get_contract_fun(self.weight, implementation=implementation, separable=separable) if bias: self.bias = nn.Parameter(scale * torch.randn(1, out_channels, 1, 1)) def forward(self, x): dtype = x.dtype x = x.float() residual = x B, C, H, W = x.shape with amp.autocast(enabled=False): x = self.forward_transform(x) if self.scale_residual: residual = self.inverse_transform(x) x = self._contract(x, self.weight, separable=self.separable, operator_type=self.operator_type) with amp.autocast(enabled=False): x = self.inverse_transform(x) if hasattr(self, 'bias'): x = x + self.bias x = x.type(dtype) return x, residual class SpectralAttention2d(nn.Module): """ geometrical Spectral Attention layer """ def __init__(self, forward_transform, inverse_transform, embed_dim, sparsity_threshold = 0.0, hidden_size_factor = 2, use_complex_kernels = False, complex_activation = 'real', bias = False, spectral_layers = 1, drop_rate = 0.): super(SpectralAttention2d, self).__init__() self.embed_dim = embed_dim self.sparsity_threshold = sparsity_threshold self.hidden_size = int(hidden_size_factor * self.embed_dim) self.scale = 1 / embed_dim**2 self.mul_add_handle = compl_muladd2d_fwd_c if use_complex_kernels else compl_muladd2d_fwd self.mul_handle = compl_mul2d_fwd_c if use_complex_kernels else compl_mul2d_fwd self.spectral_layers = spectral_layers self.modes_lat = forward_transform.lmax self.modes_lon = forward_transform.mmax # only storing the forward handle to be able to call it self.forward_transform = forward_transform self.inverse_transform = inverse_transform self.scale_residual = (self.forward_transform.nlat != self.inverse_transform.nlat) \ or (self.forward_transform.nlon != self.inverse_transform.nlon) assert inverse_transform.lmax == self.modes_lat assert inverse_transform.mmax == self.modes_lon # weights w = [self.scale * torch.randn(self.embed_dim, self.hidden_size, 2)] for l in range(1, self.spectral_layers): w.append(self.scale * torch.randn(self.hidden_size, self.hidden_size, 2)) self.w = nn.ParameterList(w) if bias: self.b = nn.ParameterList([self.scale * torch.randn(self.hidden_size, 1, 2) for _ in range(self.spectral_layers)]) self.wout = nn.Parameter(self.scale * torch.randn(self.hidden_size, self.embed_dim, 2)) self.drop = nn.Dropout(drop_rate) if drop_rate > 0. else nn.Identity() self.activations = nn.ModuleList([]) for l in range(0, self.spectral_layers): self.activations.append(ComplexReLU(mode=complex_activation, bias_shape=(self.hidden_size, 1, 1), scale=self.scale)) def forward_mlp(self, x): x = torch.view_as_real(x) xr = x for l in range(self.spectral_layers): if hasattr(self, 'b'): xr = self.mul_add_handle(xr, self.w[l], self.b[l]) else: xr = self.mul_handle(xr, self.w[l]) xr = torch.view_as_complex(xr) xr = self.activations[l](xr) xr = self.drop(xr) xr = torch.view_as_real(xr) x = self.mul_handle(xr, self.wout) x = torch.view_as_complex(x) return x def forward(self, x): dtype = x.dtype x = x.float() residual = x with amp.autocast(enabled=False): x = self.forward_transform(x) if self.scale_residual: residual = self.inverse_transform(x) x = self.forward_mlp(x) with amp.autocast(enabled=False): x = self.inverse_transform(x) x = x.type(dtype) return x, residual class SpectralAttentionS2(nn.Module): """ Spherical non-linear FNO layer """ def __init__(self, forward_transform, inverse_transform, embed_dim, operator_type = 'diagonal', sparsity_threshold = 0.0, hidden_size_factor = 2, complex_activation = 'real', scale = 'auto', bias = False, spectral_layers = 1, drop_rate = 0.): super(SpectralAttentionS2, self).__init__() self.embed_dim = embed_dim self.sparsity_threshold = sparsity_threshold self.operator_type = operator_type self.spectral_layers = spectral_layers if scale == 'auto': self.scale = (1 / (embed_dim * embed_dim)) self.modes_lat = forward_transform.lmax self.modes_lon = forward_transform.mmax # only storing the forward handle to be able to call it self.forward_transform = forward_transform self.inverse_transform = inverse_transform self.scale_residual = (self.forward_transform.nlat != self.inverse_transform.nlat) \ or (self.forward_transform.nlon != self.inverse_transform.nlon) assert inverse_transform.lmax == self.modes_lat assert inverse_transform.mmax == self.modes_lon hidden_size = int(hidden_size_factor * self.embed_dim) if operator_type == 'diagonal': self.mul_add_handle = compl_muladd2d_fwd self.mul_handle = compl_mul2d_fwd # weights w = [self.scale * torch.randn(self.embed_dim, hidden_size, 2)] for l in range(1, self.spectral_layers): w.append(self.scale * torch.randn(hidden_size, hidden_size, 2)) self.w = nn.ParameterList(w) self.wout = nn.Parameter(self.scale * torch.randn(hidden_size, self.embed_dim, 2)) if bias: self.b = nn.ParameterList([self.scale * torch.randn(hidden_size, 1, 1, 2) for _ in range(self.spectral_layers)]) self.activations = nn.ModuleList([]) for l in range(0, self.spectral_layers): self.activations.append(ComplexReLU(mode=complex_activation, bias_shape=(hidden_size, 1, 1), scale=self.scale)) elif operator_type == 'vector': self.mul_add_handle = compl_exp_muladd2d_fwd self.mul_handle = compl_exp_mul2d_fwd # weights w = [self.scale * torch.randn(self.modes_lat, self.embed_dim, hidden_size, 2)] for l in range(1, self.spectral_layers): w.append(self.scale * torch.randn(self.modes_lat, hidden_size, hidden_size, 2)) self.w = nn.ParameterList(w) if bias: self.b = nn.ParameterList([self.scale * torch.randn(hidden_size, 1, 1, 2) for _ in range(self.spectral_layers)]) self.wout = nn.Parameter(self.scale * torch.randn(self.modes_lat, hidden_size, self.embed_dim, 2)) self.activations = nn.ModuleList([]) for l in range(0, self.spectral_layers): self.activations.append(ComplexReLU(mode=complex_activation, bias_shape=(hidden_size, 1, 1), scale=self.scale)) else: raise ValueError('Unknown operator type') self.drop = nn.Dropout(drop_rate) if drop_rate > 0. else nn.Identity() def forward_mlp(self, x): B, C, H, W = x.shape xr = torch.view_as_real(x) for l in range(self.spectral_layers): if hasattr(self, 'b'): xr = self.mul_add_handle(xr, self.w[l], self.b[l]) else: xr = self.mul_handle(xr, self.w[l]) xr = torch.view_as_complex(xr) xr = self.activations[l](xr) xr = self.drop(xr) xr = torch.view_as_real(xr) # final MLP x = self.mul_handle(xr, self.wout) x = torch.view_as_complex(x) return x def forward(self, x): dtype = x.dtype x = x.to(torch.float32) residual = x # FWD transform with amp.autocast(enabled=False): x = self.forward_transform(x) if self.scale_residual: residual = self.inverse_transform(x) # MLP x = self.forward_mlp(x) # BWD transform with amp.autocast(enabled=False): x = self.inverse_transform(x) # cast back to initial precision x = x.to(dtype) return x, residual