_layers.py 23 KB
Newer Older
Boris Bonev's avatar
Boris Bonev committed
1
2
3
4
# coding=utf-8

# SPDX-FileCopyrightText: Copyright (c) 2022 The torch-harmonics Authors. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
Boris Bonev's avatar
Boris Bonev committed
5
#
Boris Bonev's avatar
Boris Bonev committed
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#

Boris Bonev's avatar
Boris Bonev committed
32
33
34
import abc
import math

Boris Bonev's avatar
Boris Bonev committed
35
36
37
import torch
import torch.nn as nn
import torch.fft
38
from torch.utils.checkpoint import checkpoint
Boris Bonev's avatar
Boris Bonev committed
39

Boris Bonev's avatar
Boris Bonev committed
40
41
from torch_harmonics import InverseRealSHT

Boris Bonev's avatar
Boris Bonev committed
42
43

def _no_grad_trunc_normal_(tensor, mean, std, a, b):
apaaris's avatar
apaaris committed
44
    """
apaaris's avatar
apaaris committed
45
46
47
48
    Initialize tensor with truncated normal distribution without gradients.
    
    This is a helper function for trunc_normal_ that performs the actual initialization
    without requiring gradients to be tracked.
apaaris's avatar
apaaris committed
49
50
51
52
    
    Parameters
    -----------
    tensor : torch.Tensor
apaaris's avatar
apaaris committed
53
        Tensor to initialize
apaaris's avatar
apaaris committed
54
55
56
57
58
59
60
61
62
63
64
65
    mean : float
        Mean of the normal distribution
    std : float
        Standard deviation of the normal distribution
    a : float
        Lower bound for truncation
    b : float
        Upper bound for truncation
        
    Returns
    -------
    torch.Tensor
apaaris's avatar
apaaris committed
66
        Initialized tensor
apaaris's avatar
apaaris committed
67
    """
Boris Bonev's avatar
Boris Bonev committed
68
69
70
    # Cut & paste from PyTorch official master until it's in a few official releases - RW
    # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
    def norm_cdf(x):
apaaris's avatar
apaaris committed
71
72
73
74
75
76
77
78
79
80
81
82
83
        """
        Compute standard normal cumulative distribution function.
        
        Parameters
        -----------
        x : float
            Input value
            
        Returns
        -------
        float
            CDF value
        """
Boris Bonev's avatar
Boris Bonev committed
84
        # Computes standard normal cumulative distribution function
Boris Bonev's avatar
Boris Bonev committed
85
        return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0
Boris Bonev's avatar
Boris Bonev committed
86

Boris Bonev's avatar
Boris Bonev committed
87
    if (mean < a - 2 * std) or (mean > b + 2 * std):
Boris Bonev's avatar
Boris Bonev committed
88
        warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2)
Boris Bonev's avatar
Boris Bonev committed
89

Boris Bonev's avatar
Boris Bonev committed
90
91
92
93
94
95
    with torch.no_grad():
        # Values are generated by using a truncated uniform distribution and
        # then using the inverse CDF for the normal distribution.
        # Get upper and lower cdf values
        l = norm_cdf((a - mean) / std)
        u = norm_cdf((b - mean) / std)
Boris Bonev's avatar
Boris Bonev committed
96

Boris Bonev's avatar
Boris Bonev committed
97
98
99
        # Uniformly fill tensor with values from [l, u], then translate to
        # [2l-1, 2u-1].
        tensor.uniform_(2 * l - 1, 2 * u - 1)
Boris Bonev's avatar
Boris Bonev committed
100

Boris Bonev's avatar
Boris Bonev committed
101
102
103
        # Use inverse cdf transform for normal distribution to get truncated
        # standard normal
        tensor.erfinv_()
Boris Bonev's avatar
Boris Bonev committed
104

Boris Bonev's avatar
Boris Bonev committed
105
        # Transform to proper mean, std
Boris Bonev's avatar
Boris Bonev committed
106
        tensor.mul_(std * math.sqrt(2.0))
Boris Bonev's avatar
Boris Bonev committed
107
        tensor.add_(mean)
Boris Bonev's avatar
Boris Bonev committed
108

Boris Bonev's avatar
Boris Bonev committed
109
110
111
        # Clamp to ensure it's in the proper range
        tensor.clamp_(min=a, max=b)
        return tensor
Boris Bonev's avatar
Boris Bonev committed
112
113


Boris Bonev's avatar
Boris Bonev committed
114
def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0):
Boris Bonev's avatar
Boris Bonev committed
115
116
117
118
119
120
    r"""Fills the input Tensor with values drawn from a truncated
    normal distribution. The values are effectively drawn from the
    normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)`
    with values outside :math:`[a, b]` redrawn until they are within
    the bounds. The method used for generating the random values works
    best when :math:`a \leq \text{mean} \leq b`.
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    
    Parameters
    -----------
    tensor: torch.Tensor
        an n-dimensional `torch.Tensor`
    mean: float
        the mean of the normal distribution
    std: float
        the standard deviation of the normal distribution
    a: float
        the minimum cutoff value, by default -2.0
    b: float
        the maximum cutoff value
    Examples
    --------
Boris Bonev's avatar
Boris Bonev committed
136
137
138
139
140
141
142
    >>> w = torch.empty(3, 5)
    >>> nn.init.trunc_normal_(w)
    """
    return _no_grad_trunc_normal_(tensor, mean, std, a, b)


@torch.jit.script
Boris Bonev's avatar
Boris Bonev committed
143
def drop_path(x: torch.Tensor, drop_prob: float = 0.0, training: bool = False) -> torch.Tensor:
apaaris's avatar
apaaris committed
144
    """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
Boris Bonev's avatar
Boris Bonev committed
145
146
147
148
149
    This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
    the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
    See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
    changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
    'survival rate' as the argument.
150
151
152
153
154
155
156
157
158
159
160
161
162
163

    Parameters
    ----------
    x : torch.Tensor
        Input tensor
    drop_prob : float, optional
        Probability of dropping a path, by default 0.0
    training : bool, optional
        Whether the model is in training mode, by default False

    Returns
    -------
    torch.Tensor
        Output tensor
Boris Bonev's avatar
Boris Bonev committed
164
    """
Boris Bonev's avatar
Boris Bonev committed
165
    if drop_prob == 0.0 or not training:
Boris Bonev's avatar
Boris Bonev committed
166
        return x
Boris Bonev's avatar
Boris Bonev committed
167
    keep_prob = 1.0 - drop_prob
Boris Bonev's avatar
Boris Bonev committed
168
169
170
171
172
173
174
175
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2d ConvNets
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()  # binarize
    output = x.div(keep_prob) * random_tensor
    return output


class DropPath(nn.Module):
apaaris's avatar
apaaris committed
176
177
178
179
180
181
182
183
    """
    Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
    
    This module implements stochastic depth regularization by randomly dropping
    entire residual paths during training, which helps with regularization and
    training of very deep networks.
    
    Parameters
184
    ----------
apaaris's avatar
apaaris committed
185
186
187
188
    drop_prob : float, optional
        Probability of dropping a path, by default None
    """
    
Boris Bonev's avatar
Boris Bonev committed
189
190
191
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob
Boris Bonev's avatar
Boris Bonev committed
192

Boris Bonev's avatar
Boris Bonev committed
193
    def forward(self, x):
194

Boris Bonev's avatar
Boris Bonev committed
195
        return drop_path(x, self.drop_prob, self.training)
Boris Bonev's avatar
Boris Bonev committed
196

Boris Bonev's avatar
Boris Bonev committed
197
198

class PatchEmbed(nn.Module):
apaaris's avatar
apaaris committed
199
200
201
    """
    Patch embedding layer for vision transformers.
    
apaaris's avatar
apaaris committed
202
203
204
    This module splits input images into patches and projects them to a
    higher dimensional embedding space using convolutional layers.
    
apaaris's avatar
apaaris committed
205
    Parameters
206
    ----------
apaaris's avatar
apaaris committed
207
208
209
210
211
212
213
214
215
216
    img_size : tuple, optional
        Input image size (height, width), by default (224, 224)
    patch_size : tuple, optional
        Patch size (height, width), by default (16, 16)
    in_chans : int, optional
        Number of input channels, by default 3
    embed_dim : int, optional
        Embedding dimension, by default 768
    """
    
Boris Bonev's avatar
Boris Bonev committed
217
218
219
220
221
222
223
224
225
226
227
228
    def __init__(self, img_size=(224, 224), patch_size=(16, 16), in_chans=3, embed_dim=768):
        super(PatchEmbed, self).__init__()
        self.red_img_size = ((img_size[0] // patch_size[0]), (img_size[1] // patch_size[1]))
        num_patches = self.red_img_size[0] * self.red_img_size[1]
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = num_patches
        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size, bias=True)
        self.proj.weight.is_shared_mp = ["spatial"]
        self.proj.bias.is_shared_mp = ["spatial"]

    def forward(self, x):
229

Boris Bonev's avatar
Boris Bonev committed
230
231
232
233
234
235
236
237
        # gather input
        B, C, H, W = x.shape
        assert H == self.img_size[0] and W == self.img_size[1], f"Input image size ({H}*{W}) doesn't match model ({self.img_size[0]}*{self.img_size[1]})."
        # new: B, C, H*W
        x = self.proj(x).flatten(2)
        return x


Boris Bonev's avatar
Boris Bonev committed
238
class MLP(nn.Module):
apaaris's avatar
apaaris committed
239
240
241
    """
    Multi-layer perceptron with optional checkpointing.
    
apaaris's avatar
apaaris committed
242
243
244
    This module implements a feed-forward network with two linear layers
    and an activation function, with optional dropout and gradient checkpointing.
    
apaaris's avatar
apaaris committed
245
    Parameters
246
    ----------
apaaris's avatar
apaaris committed
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
    in_features : int
        Number of input features
    hidden_features : int, optional
        Number of hidden features, by default None (same as in_features)
    out_features : int, optional
        Number of output features, by default None (same as in_features)
    act_layer : nn.Module, optional
        Activation layer, by default nn.ReLU
    output_bias : bool, optional
        Whether to use bias in output layer, by default False
    drop_rate : float, optional
        Dropout rate, by default 0.0
    checkpointing : bool, optional
        Whether to use gradient checkpointing, by default False
    gain : float, optional
apaaris's avatar
apaaris committed
262
        Gain factor for weight initialization, by default 1.0
apaaris's avatar
apaaris committed
263
264
    """
    
Boris Bonev's avatar
Boris Bonev committed
265
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.ReLU, output_bias=False, drop_rate=0.0, checkpointing=False, gain=1.0):
Boris Bonev's avatar
Boris Bonev committed
266
267
268
269
270
        super(MLP, self).__init__()
        self.checkpointing = checkpointing
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features

271
        # Fist dense layer
Boris Bonev's avatar
Boris Bonev committed
272
        fc1 = nn.Conv2d(in_features, hidden_features, 1, bias=True)
273
274
        # initialize the weights correctly
        scale = math.sqrt(2.0 / in_features)
Boris Bonev's avatar
Boris Bonev committed
275
        nn.init.normal_(fc1.weight, mean=0.0, std=scale)
276
277
278
279
        if fc1.bias is not None:
            nn.init.constant_(fc1.bias, 0.0)

        # activation
Boris Bonev's avatar
Boris Bonev committed
280
        act = act_layer()
281
282
283
284
285

        # output layer
        fc2 = nn.Conv2d(hidden_features, out_features, 1, bias=output_bias)
        # gain factor for the output determines the scaling of the output init
        scale = math.sqrt(gain / hidden_features)
Boris Bonev's avatar
Boris Bonev committed
286
        nn.init.normal_(fc2.weight, mean=0.0, std=scale)
287
288
289
        if fc2.bias is not None:
            nn.init.constant_(fc2.bias, 0.0)

Boris Bonev's avatar
Boris Bonev committed
290
        if drop_rate > 0.0:
291
            drop = nn.Dropout2d(drop_rate)
Boris Bonev's avatar
Boris Bonev committed
292
293
294
            self.fwd = nn.Sequential(fc1, act, drop, fc2, drop)
        else:
            self.fwd = nn.Sequential(fc1, act, fc2)
Boris Bonev's avatar
Boris Bonev committed
295

Boris Bonev's avatar
Boris Bonev committed
296
297
    @torch.jit.ignore
    def checkpoint_forward(self, x):
298

Boris Bonev's avatar
Boris Bonev committed
299
        return checkpoint(self.fwd, x)
Boris Bonev's avatar
Boris Bonev committed
300

Boris Bonev's avatar
Boris Bonev committed
301
    def forward(self, x):
302

Boris Bonev's avatar
Boris Bonev committed
303
304
305
306
307
        if self.checkpointing:
            return self.checkpoint_forward(x)
        else:
            return self.fwd(x)

Boris Bonev's avatar
Boris Bonev committed
308

Boris Bonev's avatar
Boris Bonev committed
309
310
class RealFFT2(nn.Module):
    """
apaaris's avatar
apaaris committed
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
    Helper routine to wrap FFT similarly to the SHT.
    
    This module provides a wrapper around PyTorch's real FFT2D that mimics
    the interface of spherical harmonic transforms for consistency.
    
    Parameters
    -----------
    nlat : int
        Number of latitude points
    nlon : int
        Number of longitude points
    lmax : int, optional
        Maximum spherical harmonic degree, by default None (same as nlat)
    mmax : int, optional
        Maximum spherical harmonic order, by default None (nlon//2 + 1)
Boris Bonev's avatar
Boris Bonev committed
326
    """
apaaris's avatar
apaaris committed
327
    
Boris Bonev's avatar
Boris Bonev committed
328
    def __init__(self, nlat, nlon, lmax=None, mmax=None):
Boris Bonev's avatar
Boris Bonev committed
329
330
331
332
333
334
335
336
        super(RealFFT2, self).__init__()

        self.nlat = nlat
        self.nlon = nlon
        self.lmax = lmax or self.nlat
        self.mmax = mmax or self.nlon // 2 + 1

    def forward(self, x):
337

Boris Bonev's avatar
Boris Bonev committed
338
        y = torch.fft.rfft2(x, dim=(-2, -1), norm="ortho")
Boris Bonev's avatar
Boris Bonev committed
339
        y = torch.cat((y[..., : math.ceil(self.lmax / 2), : self.mmax], y[..., -math.floor(self.lmax / 2) :, : self.mmax]), dim=-2)
Boris Bonev's avatar
Boris Bonev committed
340
341
        return y

Boris Bonev's avatar
Boris Bonev committed
342

Boris Bonev's avatar
Boris Bonev committed
343
344
class InverseRealFFT2(nn.Module):
    """
apaaris's avatar
apaaris committed
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
    Helper routine to wrap inverse FFT similarly to the SHT.
    
    This module provides a wrapper around PyTorch's inverse real FFT2D that mimics
    the interface of inverse spherical harmonic transforms for consistency.
    
    Parameters
    -----------
    nlat : int
        Number of latitude points
    nlon : int
        Number of longitude points
    lmax : int, optional
        Maximum spherical harmonic degree, by default None (same as nlat)
    mmax : int, optional
        Maximum spherical harmonic order, by default None (nlon//2 + 1)
Boris Bonev's avatar
Boris Bonev committed
360
    """
apaaris's avatar
apaaris committed
361
    
Boris Bonev's avatar
Boris Bonev committed
362
    def __init__(self, nlat, nlon, lmax=None, mmax=None):
Boris Bonev's avatar
Boris Bonev committed
363
364
365
366
367
368
369
370
        super(InverseRealFFT2, self).__init__()

        self.nlat = nlat
        self.nlon = nlon
        self.lmax = lmax or self.nlat
        self.mmax = mmax or self.nlon // 2 + 1

    def forward(self, x):
371

Boris Bonev's avatar
Boris Bonev committed
372
        return torch.fft.irfft2(x, dim=(-2, -1), s=(self.nlat, self.nlon), norm="ortho")
Boris Bonev's avatar
Boris Bonev committed
373

Boris Bonev's avatar
Boris Bonev committed
374
375
376

class LayerNorm(nn.Module):
    """
apaaris's avatar
apaaris committed
377
378
379
380
381
382
383
    Wrapper class that moves the channel dimension to the end.
    
    This module provides a layer normalization that works with channel-first
    tensors by temporarily transposing the channel dimension to the end,
    applying normalization, and then transposing back.
    
    Parameters
384
    ----------
apaaris's avatar
apaaris committed
385
386
387
388
389
390
391
392
393
394
395
396
    in_channels : int
        Number of input channels
    eps : float, optional
        Epsilon for numerical stability, by default 1e-05
    elementwise_affine : bool, optional
        Whether to use learnable affine parameters, by default True
    bias : bool, optional
        Whether to use bias, by default True
    device : torch.device, optional
        Device to place the module on, by default None
    dtype : torch.dtype, optional
        Data type for the module, by default None
Boris Bonev's avatar
Boris Bonev committed
397
    """
apaaris's avatar
apaaris committed
398
    
Boris Bonev's avatar
Boris Bonev committed
399
400
401
402
403
404
405
406
    def __init__(self, in_channels, eps=1e-05, elementwise_affine=True, bias=True, device=None, dtype=None):
        super().__init__()

        self.channel_dim = -3

        self.norm = nn.LayerNorm(normalized_shape=in_channels, eps=1e-6, elementwise_affine=elementwise_affine, bias=bias, device=device, dtype=dtype)

    def forward(self, x):
407

Boris Bonev's avatar
Boris Bonev committed
408
409
410
        return self.norm(x.transpose(self.channel_dim, -1)).transpose(-1, self.channel_dim)


Boris Bonev's avatar
Boris Bonev committed
411
412
class SpectralConvS2(nn.Module):
    """
apaaris's avatar
apaaris committed
413
414
415
    Spectral Convolution according to Driscoll & Healy. Designed for convolutions on the two-sphere S2
    using the Spherical Harmonic Transforms in torch-harmonics, but supports convolutions on the periodic
    domain via the RealFFT2 and InverseRealFFT2 wrappers.
apaaris's avatar
apaaris committed
416
417
    
    Parameters
418
    ----------
apaaris's avatar
apaaris committed
419
    forward_transform : nn.Module
apaaris's avatar
apaaris committed
420
        Forward transform (SHT or FFT)
apaaris's avatar
apaaris committed
421
    inverse_transform : nn.Module
apaaris's avatar
apaaris committed
422
        Inverse transform (ISHT or IFFT)
apaaris's avatar
apaaris committed
423
424
425
426
427
428
429
    in_channels : int
        Number of input channels
    out_channels : int
        Number of output channels
    gain : float, optional
        Gain factor for weight initialization, by default 2.0
    operator_type : str, optional
apaaris's avatar
apaaris committed
430
        Type of spectral operator ("driscoll-healy", "diagonal", "block-diagonal"), by default "driscoll-healy"
apaaris's avatar
apaaris committed
431
    lr_scale_exponent : int, optional
apaaris's avatar
apaaris committed
432
        Learning rate scaling exponent, by default 0
apaaris's avatar
apaaris committed
433
434
    bias : bool, optional
        Whether to use bias, by default False
Boris Bonev's avatar
Boris Bonev committed
435
    """
apaaris's avatar
apaaris committed
436
    
Boris Bonev's avatar
Boris Bonev committed
437
    def __init__(self, forward_transform, inverse_transform, in_channels, out_channels, gain=2.0, operator_type="driscoll-healy", lr_scale_exponent=0, bias=False):
apaaris's avatar
apaaris committed
438
        super().__init__()
Boris Bonev's avatar
Boris Bonev committed
439
440
441

        self.forward_transform = forward_transform
        self.inverse_transform = inverse_transform
apaaris's avatar
apaaris committed
442
443
444
445
446
447
448

        self.modes_lat = self.inverse_transform.lmax
        self.modes_lon = self.inverse_transform.mmax

        self.scale_residual = (self.forward_transform.nlat != self.inverse_transform.nlat) or (self.forward_transform.nlon != self.inverse_transform.nlon)

        # remember factorization details
Boris Bonev's avatar
Boris Bonev committed
449
450
        self.operator_type = operator_type

apaaris's avatar
apaaris committed
451
452
        assert self.inverse_transform.lmax == self.modes_lat
        assert self.inverse_transform.mmax == self.modes_lon
apaaris's avatar
apaaris committed
453

apaaris's avatar
apaaris committed
454
455
456
457
458
459
460
461
462
463
464
        weight_shape = [out_channels, in_channels]

        if self.operator_type == "diagonal":
            weight_shape += [self.modes_lat, self.modes_lon]
            self.contract_func = "...ilm,oilm->...olm"
        elif self.operator_type == "block-diagonal":
            weight_shape += [self.modes_lat, self.modes_lon, self.modes_lon]
            self.contract_func = "...ilm,oilnm->...oln"
        elif self.operator_type == "driscoll-healy":
            weight_shape += [self.modes_lat]
            self.contract_func = "...ilm,oil->...olm"
apaaris's avatar
apaaris committed
465
        else:
apaaris's avatar
apaaris committed
466
467
468
469
470
471
472
            raise NotImplementedError(f"Unkonw operator type f{self.operator_type}")

        # form weight tensors
        scale = math.sqrt(gain / in_channels)
        self.weight = nn.Parameter(scale * torch.randn(*weight_shape, dtype=torch.complex64))
        if bias:
            self.bias = nn.Parameter(torch.zeros(1, out_channels, 1, 1))
Boris Bonev's avatar
Boris Bonev committed
473
474

    def forward(self, x):
475

apaaris's avatar
apaaris committed
476
477
478
        dtype = x.dtype
        x = x.float()
        residual = x
apaaris's avatar
apaaris committed
479

apaaris's avatar
apaaris committed
480
481
482
483
        with torch.autocast(device_type="cuda", enabled=False):
            x = self.forward_transform(x)
            if self.scale_residual:
                residual = self.inverse_transform(x)
apaaris's avatar
apaaris committed
484

apaaris's avatar
apaaris committed
485
        x = torch.einsum(self.contract_func, x, self.weight)
apaaris's avatar
apaaris committed
486

apaaris's avatar
apaaris committed
487
488
        with torch.autocast(device_type="cuda", enabled=False):
            x = self.inverse_transform(x)
Boris Bonev's avatar
Boris Bonev committed
489

apaaris's avatar
apaaris committed
490
491
492
493
494
        if hasattr(self, "bias"):
            x = x + self.bias
        x = x.type(dtype)

        return x, residual
Boris Bonev's avatar
Boris Bonev committed
495

Boris Bonev's avatar
Boris Bonev committed
496
497
498

class PositionEmbedding(nn.Module, metaclass=abc.ABCMeta):
    """
apaaris's avatar
apaaris committed
499
500
501
502
    Abstract base class for position embeddings.
    
    This class defines the interface for position embedding modules
    that add positional information to input tensors.
apaaris's avatar
apaaris committed
503
504
    
    Parameters
505
    ----------
apaaris's avatar
apaaris committed
506
507
508
509
510
511
    img_shape : tuple, optional
        Image shape (height, width), by default (480, 960)
    grid : str, optional
        Grid type, by default "equiangular"
    num_chans : int, optional
        Number of channels, by default 1
Boris Bonev's avatar
Boris Bonev committed
512
    """
apaaris's avatar
apaaris committed
513
    
Boris Bonev's avatar
Boris Bonev committed
514
    def __init__(self, img_shape=(480, 960), grid="equiangular", num_chans=1):
apaaris's avatar
apaaris committed
515
516
        super().__init__()

Boris Bonev's avatar
Boris Bonev committed
517
518
519
520
        self.img_shape = img_shape
        self.num_chans = num_chans

    def forward(self, x: torch.Tensor):
521

apaaris's avatar
apaaris committed
522
        return x + self.position_embeddings
Boris Bonev's avatar
Boris Bonev committed
523
524
525
526


class SequencePositionEmbedding(PositionEmbedding):
    """
apaaris's avatar
apaaris committed
527
    Standard sequence-based position embedding.
apaaris's avatar
apaaris committed
528
    
apaaris's avatar
apaaris committed
529
530
    This module implements sinusoidal position embeddings similar to those
    used in the original Transformer paper, adapted for 2D spatial data.
apaaris's avatar
apaaris committed
531
532
    
    Parameters
533
    ----------
apaaris's avatar
apaaris committed
534
535
536
537
538
539
    img_shape : tuple, optional
        Image shape (height, width), by default (480, 960)
    grid : str, optional
        Grid type, by default "equiangular"
    num_chans : int, optional
        Number of channels, by default 1
Boris Bonev's avatar
Boris Bonev committed
540
    """
apaaris's avatar
apaaris committed
541
    
Boris Bonev's avatar
Boris Bonev committed
542
    def __init__(self, img_shape=(480, 960), grid="equiangular", num_chans=1):
apaaris's avatar
apaaris committed
543
        super().__init__(img_shape=img_shape, grid=grid, num_chans=num_chans)
Boris Bonev's avatar
Boris Bonev committed
544

apaaris's avatar
apaaris committed
545
546
547
548
549
        with torch.no_grad():
            # alternating custom position embeddings
            pos = torch.arange(self.img_shape[0] * self.img_shape[1]).reshape(1, 1, *self.img_shape).repeat(1, self.num_chans, 1, 1)
            k = torch.arange(self.num_chans).reshape(1, self.num_chans, 1, 1)
            denom = torch.pow(10000, 2 * k / self.num_chans)
Boris Bonev's avatar
Boris Bonev committed
550

apaaris's avatar
apaaris committed
551
552
553
554
            pos_embed = torch.where(k % 2 == 0, torch.sin(pos / denom), torch.cos(pos / denom))

        # register tensor
        self.register_buffer("position_embeddings", pos_embed.float())
Boris Bonev's avatar
Boris Bonev committed
555
556
557


class SpectralPositionEmbedding(PositionEmbedding):
apaaris's avatar
apaaris committed
558
559
    """
    Spectral position embeddings for spherical transformers.
apaaris's avatar
apaaris committed
560
    
apaaris's avatar
apaaris committed
561
562
563
    This module creates position embeddings in the spectral domain using
    spherical harmonic functions, which are particularly suitable for
    spherical data processing.
apaaris's avatar
apaaris committed
564
565
566
567
568
569
570
571
572
    
    Parameters
    -----------
    img_shape : tuple, optional
        Image shape (height, width), by default (480, 960)
    grid : str, optional
        Grid type, by default "equiangular"
    num_chans : int, optional
        Number of channels, by default 1
Boris Bonev's avatar
Boris Bonev committed
573
    """
apaaris's avatar
apaaris committed
574
    
Boris Bonev's avatar
Boris Bonev committed
575
    def __init__(self, img_shape=(480, 960), grid="equiangular", num_chans=1):
apaaris's avatar
apaaris committed
576
        super().__init__(img_shape=img_shape, grid=grid, num_chans=num_chans)
Boris Bonev's avatar
Boris Bonev committed
577

apaaris's avatar
apaaris committed
578
579
580
        # compute maximum required frequency and prepare isht
        lmax = math.floor(math.sqrt(self.num_chans)) + 1
        isht = InverseRealSHT(nlat=self.img_shape[0], nlon=self.img_shape[1], lmax=lmax, mmax=lmax, grid=grid)
Boris Bonev's avatar
Boris Bonev committed
581

apaaris's avatar
apaaris committed
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
        # fill position embedding
        with torch.no_grad():
            pos_embed_freq = torch.zeros(1, self.num_chans, isht.lmax, isht.mmax, dtype=torch.complex64)

            for i in range(self.num_chans):
                l = math.floor(math.sqrt(i))
                m = i - l**2 - l

                if m < 0:
                    pos_embed_freq[0, i, l, -m] = 1.0j
                else:
                    pos_embed_freq[0, i, l, m] = 1.0

        # compute spatial position embeddings
        pos_embed = isht(pos_embed_freq)

        # normalization
        pos_embed = pos_embed / torch.amax(pos_embed.abs(), dim=(-1, -2), keepdim=True)

        # register tensor
        self.register_buffer("position_embeddings", pos_embed)
Boris Bonev's avatar
Boris Bonev committed
603
604
605


class LearnablePositionEmbedding(PositionEmbedding):
apaaris's avatar
apaaris committed
606
607
    """
    Learnable position embeddings for spherical transformers.
apaaris's avatar
apaaris committed
608
    
apaaris's avatar
apaaris committed
609
610
    This module provides learnable position embeddings that can be either
    latitude-only or full latitude-longitude embeddings.
apaaris's avatar
apaaris committed
611
612
    
    Parameters
613
    ----------
apaaris's avatar
apaaris committed
614
615
616
617
618
619
620
    img_shape : tuple, optional
        Image shape (height, width), by default (480, 960)
    grid : str, optional
        Grid type, by default "equiangular"
    num_chans : int, optional
        Number of channels, by default 1
    embed_type : str, optional
apaaris's avatar
apaaris committed
621
        Embedding type ("lat" or "latlon"), by default "lat"
Boris Bonev's avatar
Boris Bonev committed
622
    """
apaaris's avatar
apaaris committed
623
    
Boris Bonev's avatar
Boris Bonev committed
624
    def __init__(self, img_shape=(480, 960), grid="equiangular", num_chans=1, embed_type="lat"):
apaaris's avatar
apaaris committed
625
        super().__init__(img_shape=img_shape, grid=grid, num_chans=num_chans)
apaaris's avatar
apaaris committed
626

apaaris's avatar
apaaris committed
627
628
629
630
631
632
        if embed_type == "latlon":
            self.position_embeddings = nn.Parameter(torch.zeros(1, self.num_chans, self.img_shape[0], self.img_shape[1]))
        elif embed_type == "lat":
            self.position_embeddings = nn.Parameter(torch.zeros(1, self.num_chans, self.img_shape[0], 1))
        else:
            raise ValueError(f"Unknown learnable position embedding type {embed_type}")
Boris Bonev's avatar
Boris Bonev committed
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655

# class SpiralPositionEmbedding(PositionEmbedding):
#     """
#     Returns position embeddings on the torus
#     """

#     def __init__(self, img_shape=(480, 960), grid="equiangular", num_chans=1):

#         super().__init__(img_shape=img_shape, grid=grid, num_chans=num_chans)

#         with torch.no_grad():

#             # alternating custom position embeddings
#             lats, _ = _precompute_latitudes(img_shape[0], grid=grid)
#             lats = lats.reshape(-1, 1)
#             lons = torch.linspace(0, 2 * math.pi, img_shape[1] + 1)[:-1]
#             lons = lons.reshape(1, -1)

#             # channel index
#             k = torch.arange(self.num_chans).reshape(1, -1, 1, 1)
#             pos_embed = torch.where(k % 2 == 0, torch.sin(k * (lons + lats)), torch.cos(k * (lons - lats)))

#         # register tensor
apaaris's avatar
apaaris committed
656
#         self.register_buffer("position_embeddings", pos_embed.float())