# AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/models.deepar.ipynb.
# %% auto 0
__all__ = ['Decoder', 'DeepAR']
# %% ../../nbs/models.deepar.ipynb 4
import numpy as np
import torch
import torch.nn as nn
from typing import Optional
from ..common._base_windows import BaseWindows
from ..losses.pytorch import DistributionLoss, MQLoss
# %% ../../nbs/models.deepar.ipynb 7
class Decoder(nn.Module):
"""Multi-Layer Perceptron Decoder
**Parameters:**
`in_features`: int, dimension of input.
`out_features`: int, dimension of output.
`hidden_size`: int, dimension of hidden layers.
`num_layers`: int, number of hidden layers.
"""
def __init__(self, in_features, out_features, hidden_size, hidden_layers):
super().__init__()
if hidden_layers == 0:
# Input layer
layers = [nn.Linear(in_features=in_features, out_features=out_features)]
else:
# Input layer
layers = [
nn.Linear(in_features=in_features, out_features=hidden_size),
nn.ReLU(),
]
# Hidden layers
for i in range(hidden_layers - 2):
layers += [
nn.Linear(in_features=hidden_size, out_features=hidden_size),
nn.ReLU(),
]
# Output layer
layers += [nn.Linear(in_features=hidden_size, out_features=out_features)]
# Store in layers as ModuleList
self.layers = nn.Sequential(*layers)
def forward(self, x):
return self.layers(x)
# %% ../../nbs/models.deepar.ipynb 8
class DeepAR(BaseWindows):
"""DeepAR
**Parameters:**
`h`: int, Forecast horizon.
`input_size`: int, autorregresive inputs size, y=[1,2,3,4] input_size=2 -> y_[t-2:t]=[1,2].
`lstm_n_layers`: int=2, number of LSTM layers.
`lstm_hidden_size`: int=128, LSTM hidden size.
`lstm_dropout`: float=0.1, LSTM dropout.
`decoder_hidden_layers`: int=0, number of decoder MLP hidden layers. Default: 0 for linear layer.
`decoder_hidden_size`: int=0, decoder MLP hidden size. Default: 0 for linear layer.
`trajectory_samples`: int=100, number of Monte Carlo trajectories during inference.
`stat_exog_list`: str list, static exogenous columns.
`hist_exog_list`: str list, historic exogenous columns.
`futr_exog_list`: str list, future exogenous columns.
`exclude_insample_y`: bool=False, the model skips the autoregressive features y[t-input_size:t] if True.
`loss`: PyTorch module, instantiated train loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).
`valid_loss`: PyTorch module=`loss`, instantiated valid loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).
`max_steps`: int=1000, maximum number of training steps.
`learning_rate`: float=1e-3, Learning rate between (0, 1).
`num_lr_decays`: int=-1, Number of learning rate decays, evenly distributed across max_steps.
`early_stop_patience_steps`: int=-1, Number of validation iterations before early stopping.
`val_check_steps`: int=100, Number of training steps between every validation loss check.
`batch_size`: int=32, number of different series in each batch.
`valid_batch_size`: int=None, number of different series in each validation and test batch, if None uses batch_size.
`windows_batch_size`: int=1024, number of windows to sample in each training batch, default uses all.
`inference_windows_batch_size`: int=-1, number of windows to sample in each inference batch, -1 uses all.
`start_padding_enabled`: bool=False, if True, the model will pad the time series with zeros at the beginning, by input size.
`step_size`: int=1, step size between each window of temporal data.
`scaler_type`: str='identity', type of scaler for temporal inputs normalization see [temporal scalers](https://nixtla.github.io/neuralforecast/common.scalers.html).
`random_seed`: int, random_seed for pytorch initializer and numpy generators.
`num_workers_loader`: int=os.cpu_count(), workers to be used by `TimeSeriesDataLoader`.
`drop_last_loader`: bool=False, if True `TimeSeriesDataLoader` drops last non-full batch.
`alias`: str, optional, Custom name of the model.
`optimizer`: Subclass of 'torch.optim.Optimizer', optional, user specified optimizer instead of the default choice (Adam).
`optimizer_kwargs`: dict, optional, list of parameters used by the user specified `optimizer`.
`**trainer_kwargs`: int, keyword trainer arguments inherited from [PyTorch Lighning's trainer](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).
**References**
- [David Salinas, Valentin Flunkert, Jan Gasthaus, Tim Januschowski (2020). "DeepAR: Probabilistic forecasting with autoregressive recurrent networks". International Journal of Forecasting.](https://www.sciencedirect.com/science/article/pii/S0169207019301888)
- [Alexander Alexandrov et. al (2020). "GluonTS: Probabilistic and Neural Time Series Modeling in Python". Journal of Machine Learning Research.](https://www.jmlr.org/papers/v21/19-820.html)
"""
# Class attributes
SAMPLING_TYPE = "windows"
def __init__(
self,
h,
input_size: int = -1,
lstm_n_layers: int = 2,
lstm_hidden_size: int = 128,
lstm_dropout: float = 0.1,
decoder_hidden_layers: int = 0,
decoder_hidden_size: int = 0,
trajectory_samples: int = 100,
futr_exog_list=None,
hist_exog_list=None,
stat_exog_list=None,
exclude_insample_y=False,
loss=DistributionLoss(
distribution="StudentT", level=[80, 90], return_params=False
),
valid_loss=MQLoss(level=[80, 90]),
max_steps: int = 1000,
learning_rate: float = 1e-3,
num_lr_decays: int = 3,
early_stop_patience_steps: int = -1,
val_check_steps: int = 100,
batch_size: int = 32,
valid_batch_size: Optional[int] = None,
windows_batch_size: int = 1024,
inference_windows_batch_size: int = -1,
start_padding_enabled=False,
step_size: int = 1,
scaler_type: str = "identity",
random_seed: int = 1,
num_workers_loader=0,
drop_last_loader=False,
optimizer=None,
optimizer_kwargs=None,
**trainer_kwargs
):
# DeepAR does not support historic exogenous variables
if hist_exog_list is not None:
raise Exception("DeepAR does not support historic exogenous variables.")
if exclude_insample_y:
raise Exception("DeepAR has no possibility for excluding y.")
if not loss.is_distribution_output:
raise Exception("DeepAR only supports distributional outputs.")
if str(type(valid_loss)) not in [
""
]:
raise Exception("DeepAR only supports MQLoss as validation loss.")
if loss.return_params:
raise Exception(
"DeepAR does not return distribution parameters due to Monte Carlo sampling."
)
# Inherit BaseWindows class
super(DeepAR, self).__init__(
h=h,
input_size=input_size,
futr_exog_list=futr_exog_list,
hist_exog_list=hist_exog_list,
stat_exog_list=stat_exog_list,
exclude_insample_y=exclude_insample_y,
loss=loss,
valid_loss=valid_loss,
max_steps=max_steps,
learning_rate=learning_rate,
num_lr_decays=num_lr_decays,
early_stop_patience_steps=early_stop_patience_steps,
val_check_steps=val_check_steps,
batch_size=batch_size,
windows_batch_size=windows_batch_size,
valid_batch_size=valid_batch_size,
inference_windows_batch_size=inference_windows_batch_size,
start_padding_enabled=start_padding_enabled,
step_size=step_size,
scaler_type=scaler_type,
num_workers_loader=num_workers_loader,
drop_last_loader=drop_last_loader,
random_seed=random_seed,
optimizer=optimizer,
optimizer_kwargs=optimizer_kwargs,
**trainer_kwargs
)
self.horizon_backup = self.h # Used because h=0 during training
self.trajectory_samples = trajectory_samples
# LSTM
self.encoder_n_layers = lstm_n_layers
self.encoder_hidden_size = lstm_hidden_size
self.encoder_dropout = lstm_dropout
self.futr_exog_size = len(self.futr_exog_list)
self.hist_exog_size = 0
self.stat_exog_size = len(self.stat_exog_list)
# LSTM input size (1 for target variable y)
input_encoder = 1 + self.futr_exog_size + self.stat_exog_size
# Instantiate model
self.hist_encoder = nn.LSTM(
input_size=input_encoder,
hidden_size=self.encoder_hidden_size,
num_layers=self.encoder_n_layers,
dropout=self.encoder_dropout,
batch_first=True,
)
# Decoder MLP
self.decoder = Decoder(
in_features=lstm_hidden_size,
out_features=self.loss.outputsize_multiplier,
hidden_size=decoder_hidden_size,
hidden_layers=decoder_hidden_layers,
)
# Override BaseWindows method
def training_step(self, batch, batch_idx):
# During training h=0
self.h = 0
y_idx = batch["y_idx"]
# Create and normalize windows [Ws, L, C]
windows = self._create_windows(batch, step="train")
original_insample_y = windows["temporal"][
:, :, y_idx
].clone() # windows: [B, L, Feature] -> [B, L]
original_insample_y = original_insample_y[
:, 1:
] # Remove first (shift in DeepAr, cell at t outputs t+1)
windows = self._normalization(windows=windows, y_idx=y_idx)
# Parse windows
insample_y, insample_mask, _, _, _, futr_exog, stat_exog = self._parse_windows(
batch, windows
)
windows_batch = dict(
insample_y=insample_y, # [Ws, L]
insample_mask=insample_mask, # [Ws, L]
futr_exog=futr_exog, # [Ws, L+H]
hist_exog=None, # None
stat_exog=stat_exog,
y_idx=y_idx,
) # [Ws, 1]
# Model Predictions
output = self.train_forward(windows_batch)
if self.loss.is_distribution_output:
_, y_loc, y_scale = self._inv_normalization(
y_hat=original_insample_y,
temporal_cols=batch["temporal_cols"],
y_idx=y_idx,
)
outsample_y = original_insample_y
distr_args = self.loss.scale_decouple(
output=output, loc=y_loc, scale=y_scale
)
mask = insample_mask[
:, 1:
].clone() # Remove first (shift in DeepAr, cell at t outputs t+1)
loss = self.loss(y=outsample_y, distr_args=distr_args, mask=mask)
else:
raise Exception("DeepAR only supports distributional outputs.")
if torch.isnan(loss):
print("Model Parameters", self.hparams)
print("insample_y", torch.isnan(insample_y).sum())
print("outsample_y", torch.isnan(outsample_y).sum())
print("output", torch.isnan(output).sum())
raise Exception("Loss is NaN, training stopped.")
self.log(
"train_loss",
loss.item(),
batch_size=outsample_y.size(0),
prog_bar=True,
on_epoch=True,
)
self.train_trajectories.append((self.global_step, loss.item()))
self.h = self.horizon_backup # Restore horizon
return loss
def validation_step(self, batch, batch_idx):
self.h == self.horizon_backup
if self.val_size == 0:
return np.nan
# TODO: Hack to compute number of windows
windows = self._create_windows(batch, step="val")
n_windows = len(windows["temporal"])
y_idx = batch["y_idx"]
# Number of windows in batch
windows_batch_size = self.inference_windows_batch_size
if windows_batch_size < 0:
windows_batch_size = n_windows
n_batches = int(np.ceil(n_windows / windows_batch_size))
valid_losses = []
batch_sizes = []
for i in range(n_batches):
# Create and normalize windows [Ws, L+H, C]
w_idxs = np.arange(
i * windows_batch_size, min((i + 1) * windows_batch_size, n_windows)
)
windows = self._create_windows(batch, step="val", w_idxs=w_idxs)
original_outsample_y = torch.clone(windows["temporal"][:, -self.h :, 0])
windows = self._normalization(windows=windows, y_idx=y_idx)
# Parse windows
insample_y, insample_mask, _, outsample_mask, _, futr_exog, stat_exog = (
self._parse_windows(batch, windows)
)
windows_batch = dict(
insample_y=insample_y,
insample_mask=insample_mask,
futr_exog=futr_exog,
hist_exog=None,
stat_exog=stat_exog,
temporal_cols=batch["temporal_cols"],
y_idx=y_idx,
)
# Model Predictions
output_batch = self(windows_batch)
# Monte Carlo already returns y_hat with mean and quantiles
output_batch = output_batch[:, :, 1:] # Remove mean
valid_loss_batch = self.valid_loss(
y=original_outsample_y, y_hat=output_batch, mask=outsample_mask
)
valid_losses.append(valid_loss_batch)
batch_sizes.append(len(output_batch))
valid_loss = torch.stack(valid_losses)
batch_sizes = torch.tensor(batch_sizes, device=valid_loss.device)
batch_size = torch.sum(batch_sizes)
valid_loss = torch.sum(valid_loss * batch_sizes) / batch_size
if torch.isnan(valid_loss):
raise Exception("Loss is NaN, training stopped.")
self.log(
"valid_loss",
valid_loss.item(),
batch_size=batch_size,
prog_bar=True,
on_epoch=True,
)
self.validation_step_outputs.append(valid_loss)
return valid_loss
def predict_step(self, batch, batch_idx):
self.h == self.horizon_backup
# TODO: Hack to compute number of windows
windows = self._create_windows(batch, step="predict")
n_windows = len(windows["temporal"])
y_idx = batch["y_idx"]
# Number of windows in batch
windows_batch_size = self.inference_windows_batch_size
if windows_batch_size < 0:
windows_batch_size = n_windows
n_batches = int(np.ceil(n_windows / windows_batch_size))
y_hats = []
for i in range(n_batches):
# Create and normalize windows [Ws, L+H, C]
w_idxs = np.arange(
i * windows_batch_size, min((i + 1) * windows_batch_size, n_windows)
)
windows = self._create_windows(batch, step="predict", w_idxs=w_idxs)
windows = self._normalization(windows=windows, y_idx=y_idx)
# Parse windows
insample_y, insample_mask, _, _, _, futr_exog, stat_exog = (
self._parse_windows(batch, windows)
)
windows_batch = dict(
insample_y=insample_y, # [Ws, L]
insample_mask=insample_mask, # [Ws, L]
futr_exog=futr_exog, # [Ws, L+H]
stat_exog=stat_exog,
temporal_cols=batch["temporal_cols"],
y_idx=y_idx,
)
# Model Predictions
y_hat = self(windows_batch)
# Monte Carlo already returns y_hat with mean and quantiles
y_hats.append(y_hat)
y_hat = torch.cat(y_hats, dim=0)
return y_hat
def train_forward(self, windows_batch):
# Parse windows_batch
encoder_input = windows_batch["insample_y"][:, :, None] # <- [B,T,1]
futr_exog = windows_batch["futr_exog"]
stat_exog = windows_batch["stat_exog"]
# [B, input_size-1, X]
encoder_input = encoder_input[
:, :-1, :
] # Remove last (shift in DeepAr, cell at t outputs t+1)
_, input_size = encoder_input.shape[:2]
if self.futr_exog_size > 0:
# Shift futr_exog (t predicts t+1, last output is outside insample_y)
encoder_input = torch.cat((encoder_input, futr_exog[:, 1:, :]), dim=2)
if self.stat_exog_size > 0:
stat_exog = stat_exog.unsqueeze(1).repeat(
1, input_size, 1
) # [B, S] -> [B, input_size-1, S]
encoder_input = torch.cat((encoder_input, stat_exog), dim=2)
# RNN forward
hidden_state, _ = self.hist_encoder(
encoder_input
) # [B, input_size-1, rnn_hidden_state]
# Decoder forward
output = self.decoder(hidden_state) # [B, input_size-1, output_size]
output = self.loss.domain_map(output)
return output
def forward(self, windows_batch):
# Parse windows_batch
encoder_input = windows_batch["insample_y"][:, :, None] # <- [B,L,1]
futr_exog = windows_batch["futr_exog"] # <- [B,L+H, n_f]
stat_exog = windows_batch["stat_exog"]
y_idx = windows_batch["y_idx"]
# [B, seq_len, X]
batch_size, input_size = encoder_input.shape[:2]
if self.futr_exog_size > 0:
futr_exog_input_window = futr_exog[
:, 1 : input_size + 1, :
] # Align y_t with futr_exog_t+1
encoder_input = torch.cat((encoder_input, futr_exog_input_window), dim=2)
if self.stat_exog_size > 0:
stat_exog_input_window = stat_exog.unsqueeze(1).repeat(
1, input_size, 1
) # [B, S] -> [B, input_size, S]
encoder_input = torch.cat((encoder_input, stat_exog_input_window), dim=2)
# Use input_size history to predict first h of the forecasting window
_, h_c_tuple = self.hist_encoder(encoder_input)
h_n = h_c_tuple[0] # [n_layers, B, lstm_hidden_state]
c_n = h_c_tuple[1] # [n_layers, B, lstm_hidden_state]
# Vectorizes trajectory samples in batch dimension [1]
h_n = torch.repeat_interleave(
h_n, self.trajectory_samples, 1
) # [n_layers, B*trajectory_samples, rnn_hidden_state]
c_n = torch.repeat_interleave(
c_n, self.trajectory_samples, 1
) # [n_layers, B*trajectory_samples, rnn_hidden_state]
# Scales for inverse normalization
y_scale = (
self.scaler.x_scale[:, 0, [y_idx]].squeeze(-1).to(encoder_input.device)
)
y_loc = self.scaler.x_shift[:, 0, [y_idx]].squeeze(-1).to(encoder_input.device)
y_scale = torch.repeat_interleave(y_scale, self.trajectory_samples, 0)
y_loc = torch.repeat_interleave(y_loc, self.trajectory_samples, 0)
# Recursive strategy prediction
quantiles = self.loss.quantiles.to(encoder_input.device)
y_hat = torch.zeros(
batch_size, self.h, len(quantiles) + 1, device=encoder_input.device
)
for tau in range(self.h):
# Decoder forward
last_layer_h = h_n[-1] # [B*trajectory_samples, lstm_hidden_state]
output = self.decoder(last_layer_h)
output = self.loss.domain_map(output)
# Inverse normalization
distr_args = self.loss.scale_decouple(
output=output, loc=y_loc, scale=y_scale
)
# Add horizon (1) dimension
distr_args = list(distr_args)
for i in range(len(distr_args)):
distr_args[i] = distr_args[i].unsqueeze(-1)
distr_args = tuple(distr_args)
samples_tau, _, _ = self.loss.sample(distr_args=distr_args, num_samples=1)
samples_tau = samples_tau.reshape(batch_size, self.trajectory_samples)
sample_mean = torch.mean(samples_tau, dim=-1).to(encoder_input.device)
quants = torch.quantile(input=samples_tau, q=quantiles, dim=-1).to(
encoder_input.device
)
y_hat[:, tau, 0] = sample_mean
y_hat[:, tau, 1:] = quants.permute((1, 0)) # [Q, B] -> [B, Q]
# Stop if already in the last step (no need to predict next step)
if tau + 1 == self.h:
continue
# Normalize to use as input
encoder_input = self.scaler.scaler(
samples_tau.flatten(), y_loc, y_scale
) # [B*n_samples]
encoder_input = encoder_input[:, None, None] # [B*n_samples, 1, 1]
# Update input
if self.futr_exog_size > 0:
futr_exog_tau = futr_exog[:, [input_size + tau + 1], :] # [B, 1, n_f]
futr_exog_tau = torch.repeat_interleave(
futr_exog_tau, self.trajectory_samples, 0
) # [B*n_samples, 1, n_f]
encoder_input = torch.cat(
(encoder_input, futr_exog_tau), dim=2
) # [B*n_samples, 1, 1+n_f]
if self.stat_exog_size > 0:
stat_exog_tau = torch.repeat_interleave(
stat_exog, self.trajectory_samples, 0
) # [B*n_samples, n_s]
encoder_input = torch.cat(
(encoder_input, stat_exog_tau[:, None, :]), dim=2
) # [B*n_samples, 1, 1+n_f+n_s]
_, h_c_tuple = self.hist_encoder(encoder_input, (h_n, c_n))
h_n = h_c_tuple[0] # [n_layers, B, rnn_hidden_state]
c_n = h_c_tuple[1] # [n_layers, B, rnn_hidden_state]
return y_hat