{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "524620c1",
"metadata": {},
"outputs": [],
"source": [
"#| default_exp common._base_windows"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "15392f6f",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "markdown",
"id": "1e0f9607-d12d-44e5-b2be-91a57a0bca79",
"metadata": {},
"source": [
"# BaseWindows\n",
"\n",
"> The `BaseWindows` class contains standard methods shared across window-based neural networks; in contrast to recurrent neural networks these models commit to a fixed sequence length input. The class is represented by `MLP`, and other more sophisticated architectures like `NBEATS`, and `NHITS`."
]
},
{
"cell_type": "markdown",
"id": "1730a556-1574-40ad-92a2-23b924ceb398",
"metadata": {},
"source": [
"The standard methods include data preprocessing `_normalization`, optimization utilities like parameter initialization, `training_step`, `validation_step`, and shared `fit` and `predict` methods.These shared methods enable all the `neuralforecast.models` compatibility with the `core.NeuralForecast` wrapper class. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2508f7a9-1433-4ad8-8f2f-0078c6ed6c3c",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"from fastcore.test import test_eq\n",
"from nbdev.showdoc import show_doc"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "44065066-e72a-431f-938f-1528adef9fe8",
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"import numpy as np\n",
"import torch\n",
"import torch.nn as nn\n",
"import pytorch_lightning as pl\n",
"\n",
"from neuralforecast.common._base_model import BaseModel\n",
"from neuralforecast.common._scalers import TemporalNorm\n",
"from neuralforecast.tsdataset import TimeSeriesDataModule\n",
"from neuralforecast.utils import get_indexer_raise_missing"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ce70cd14-ecb1-4205-8511-fecbd26c8408",
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"class BaseWindows(BaseModel):\n",
" \"\"\" Base Windows\n",
" \n",
" Base class for all windows-based models. The forecasts are produced separately \n",
" for each window, which are randomly sampled during training.\n",
" \n",
" This class implements the basic functionality for all windows-based models, including:\n",
" - PyTorch Lightning's methods training_step, validation_step, predict_step.
\n",
" - fit and predict methods used by NeuralForecast.core class.
\n",
" - sampling and wrangling methods to generate windows.\n",
" \"\"\"\n",
" def __init__(self,\n",
" h,\n",
" input_size,\n",
" loss,\n",
" valid_loss,\n",
" learning_rate,\n",
" max_steps,\n",
" val_check_steps,\n",
" batch_size,\n",
" valid_batch_size,\n",
" windows_batch_size,\n",
" inference_windows_batch_size,\n",
" start_padding_enabled,\n",
" step_size=1,\n",
" num_lr_decays=0,\n",
" early_stop_patience_steps=-1,\n",
" scaler_type='identity',\n",
" futr_exog_list=None,\n",
" hist_exog_list=None,\n",
" stat_exog_list=None,\n",
" exclude_insample_y=False,\n",
" num_workers_loader=0,\n",
" drop_last_loader=False,\n",
" random_seed=1,\n",
" alias=None,\n",
" optimizer=None,\n",
" optimizer_kwargs=None,\n",
" **trainer_kwargs):\n",
" super().__init__(\n",
" random_seed=random_seed,\n",
" loss=loss,\n",
" valid_loss=valid_loss,\n",
" optimizer=optimizer,\n",
" optimizer_kwargs=optimizer_kwargs,\n",
" futr_exog_list=futr_exog_list,\n",
" hist_exog_list=hist_exog_list,\n",
" stat_exog_list=stat_exog_list,\n",
" max_steps=max_steps,\n",
" early_stop_patience_steps=early_stop_patience_steps, \n",
" **trainer_kwargs,\n",
" )\n",
"\n",
" # Padder to complete train windows, \n",
" # example y=[1,2,3,4,5] h=3 -> last y_output = [5,0,0]\n",
" self.h = h\n",
" self.input_size = input_size\n",
" self.windows_batch_size = windows_batch_size\n",
" self.start_padding_enabled = start_padding_enabled\n",
" if start_padding_enabled:\n",
" self.padder_train = nn.ConstantPad1d(padding=(self.input_size-1, self.h), value=0)\n",
" else:\n",
" self.padder_train = nn.ConstantPad1d(padding=(0, self.h), value=0)\n",
"\n",
" # Batch sizes\n",
" self.batch_size = batch_size\n",
" if valid_batch_size is None:\n",
" self.valid_batch_size = batch_size\n",
" else:\n",
" self.valid_batch_size = valid_batch_size\n",
" if inference_windows_batch_size is None:\n",
" self.inference_windows_batch_size = windows_batch_size\n",
" else:\n",
" self.inference_windows_batch_size = inference_windows_batch_size\n",
"\n",
" # Optimization \n",
" self.learning_rate = learning_rate\n",
" self.max_steps = max_steps\n",
" self.num_lr_decays = num_lr_decays\n",
" self.lr_decay_steps = (\n",
" max(max_steps // self.num_lr_decays, 1) if self.num_lr_decays > 0 else 10e7\n",
" )\n",
" self.early_stop_patience_steps = early_stop_patience_steps\n",
" self.val_check_steps = val_check_steps\n",
" self.windows_batch_size = windows_batch_size\n",
" self.step_size = step_size\n",
" \n",
" self.exclude_insample_y = exclude_insample_y\n",
"\n",
" # Scaler\n",
" self.scaler = TemporalNorm(\n",
" scaler_type=scaler_type,\n",
" dim=1, # Time dimension is 1.\n",
" num_features=1+len(self.hist_exog_list)+len(self.futr_exog_list)\n",
" )\n",
"\n",
" # Fit arguments\n",
" self.val_size = 0\n",
" self.test_size = 0\n",
"\n",
" # Model state\n",
" self.decompose_forecast = False\n",
"\n",
" # DataModule arguments\n",
" self.num_workers_loader = num_workers_loader\n",
" self.drop_last_loader = drop_last_loader\n",
" # used by on_validation_epoch_end hook\n",
" self.validation_step_outputs = []\n",
" self.alias = alias\n",
"\n",
" def _create_windows(self, batch, step, w_idxs=None):\n",
" # Parse common data\n",
" window_size = self.input_size + self.h\n",
" temporal_cols = batch['temporal_cols']\n",
" temporal = batch['temporal']\n",
"\n",
" if step == 'train':\n",
" if self.val_size + self.test_size > 0:\n",
" cutoff = -self.val_size - self.test_size\n",
" temporal = temporal[:, :, :cutoff]\n",
"\n",
" temporal = self.padder_train(temporal)\n",
" if temporal.shape[-1] < window_size:\n",
" raise Exception('Time series is too short for training, consider setting a smaller input size or set start_padding_enabled=True')\n",
" windows = temporal.unfold(dimension=-1, \n",
" size=window_size, \n",
" step=self.step_size)\n",
"\n",
" # [B, C, Ws, L+H] 0, 1, 2, 3\n",
" # -> [B * Ws, L+H, C] 0, 2, 3, 1\n",
" windows_per_serie = windows.shape[2]\n",
" windows = windows.permute(0, 2, 3, 1).contiguous()\n",
" windows = windows.reshape(-1, window_size, len(temporal_cols))\n",
"\n",
" # Sample and Available conditions\n",
" available_idx = temporal_cols.get_loc('available_mask')\n",
" available_condition = windows[:, :self.input_size, available_idx]\n",
" available_condition = torch.sum(available_condition, axis=1)\n",
" final_condition = (available_condition > 0)\n",
" if self.h > 0:\n",
" sample_condition = windows[:, self.input_size:, available_idx]\n",
" sample_condition = torch.sum(sample_condition, axis=1)\n",
" final_condition = (sample_condition > 0) & (available_condition > 0)\n",
" windows = windows[final_condition]\n",
"\n",
" # Parse Static data to match windows\n",
" # [B, S_in] -> [B, Ws, S_in] -> [B*Ws, S_in]\n",
" static = batch.get('static', None)\n",
" static_cols=batch.get('static_cols', None)\n",
" if static is not None:\n",
" static = torch.repeat_interleave(static, \n",
" repeats=windows_per_serie, dim=0)\n",
" static = static[final_condition]\n",
"\n",
" # Protection of empty windows\n",
" if final_condition.sum() == 0:\n",
" raise Exception('No windows available for training')\n",
"\n",
" # Sample windows\n",
" n_windows = len(windows)\n",
" if self.windows_batch_size is not None:\n",
" w_idxs = np.random.choice(n_windows, \n",
" size=self.windows_batch_size,\n",
" replace=(n_windows < self.windows_batch_size))\n",
" windows = windows[w_idxs]\n",
" \n",
" if static is not None:\n",
" static = static[w_idxs]\n",
"\n",
" # think about interaction available * sample mask\n",
" # [B, C, Ws, L+H]\n",
" windows_batch = dict(temporal=windows,\n",
" temporal_cols=temporal_cols,\n",
" static=static,\n",
" static_cols=static_cols)\n",
" return windows_batch\n",
"\n",
" elif step in ['predict', 'val']:\n",
"\n",
" if step == 'predict':\n",
" initial_input = temporal.shape[-1] - self.test_size\n",
" if initial_input <= self.input_size: # There is not enough data to predict first timestamp\n",
" padder_left = nn.ConstantPad1d(padding=(self.input_size-initial_input, 0), value=0)\n",
" temporal = padder_left(temporal)\n",
" predict_step_size = self.predict_step_size\n",
" cutoff = - self.input_size - self.test_size\n",
" temporal = temporal[:, :, cutoff:]\n",
"\n",
" elif step == 'val':\n",
" predict_step_size = self.step_size\n",
" cutoff = -self.input_size - self.val_size - self.test_size\n",
" if self.test_size > 0:\n",
" temporal = batch['temporal'][:, :, cutoff:-self.test_size]\n",
" else:\n",
" temporal = batch['temporal'][:, :, cutoff:]\n",
" if temporal.shape[-1] < window_size:\n",
" initial_input = temporal.shape[-1] - self.val_size\n",
" padder_left = nn.ConstantPad1d(padding=(self.input_size-initial_input, 0), value=0)\n",
" temporal = padder_left(temporal)\n",
"\n",
" if (step=='predict') and (self.test_size==0) and (len(self.futr_exog_list)==0):\n",
" padder_right = nn.ConstantPad1d(padding=(0, self.h), value=0)\n",
" temporal = padder_right(temporal)\n",
"\n",
" windows = temporal.unfold(dimension=-1,\n",
" size=window_size,\n",
" step=predict_step_size)\n",
"\n",
" # [batch, channels, windows, window_size] 0, 1, 2, 3\n",
" # -> [batch * windows, window_size, channels] 0, 2, 3, 1\n",
" windows_per_serie = windows.shape[2]\n",
" windows = windows.permute(0, 2, 3, 1).contiguous()\n",
" windows = windows.reshape(-1, window_size, len(temporal_cols))\n",
"\n",
" static = batch.get('static', None)\n",
" static_cols=batch.get('static_cols', None)\n",
" if static is not None:\n",
" static = torch.repeat_interleave(static, \n",
" repeats=windows_per_serie, dim=0)\n",
" \n",
" # Sample windows for batched prediction\n",
" if w_idxs is not None:\n",
" windows = windows[w_idxs]\n",
" if static is not None:\n",
" static = static[w_idxs]\n",
" \n",
" windows_batch = dict(temporal=windows,\n",
" temporal_cols=temporal_cols,\n",
" static=static,\n",
" static_cols=static_cols)\n",
" return windows_batch\n",
" else:\n",
" raise ValueError(f'Unknown step {step}')\n",
"\n",
" def _normalization(self, windows, y_idx):\n",
" # windows are already filtered by train/validation/test\n",
" # from the `create_windows_method` nor leakage risk\n",
" temporal = windows['temporal'] # B, L+H, C\n",
" temporal_cols = windows['temporal_cols'].copy() # B, L+H, C\n",
"\n",
" # To avoid leakage uses only the lags\n",
" #temporal_data_cols = temporal_cols.drop('available_mask').tolist()\n",
" temporal_data_cols = self._get_temporal_exogenous_cols(temporal_cols=temporal_cols)\n",
" temporal_idxs = get_indexer_raise_missing(temporal_cols, temporal_data_cols)\n",
" temporal_idxs = np.append(y_idx, temporal_idxs)\n",
" temporal_data = temporal[:, :, temporal_idxs]\n",
" temporal_mask = temporal[:, :, temporal_cols.get_loc('available_mask')].clone()\n",
" if self.h > 0:\n",
" temporal_mask[:, -self.h:] = 0.0\n",
"\n",
" # Normalize. self.scaler stores the shift and scale for inverse transform\n",
" temporal_mask = temporal_mask.unsqueeze(-1) # Add channel dimension for scaler.transform.\n",
" temporal_data = self.scaler.transform(x=temporal_data, mask=temporal_mask)\n",
"\n",
" # Replace values in windows dict\n",
" temporal[:, :, temporal_idxs] = temporal_data\n",
" windows['temporal'] = temporal\n",
"\n",
" return windows\n",
"\n",
" def _inv_normalization(self, y_hat, temporal_cols, y_idx):\n",
" # Receives window predictions [B, H, output]\n",
" # Broadcasts outputs and inverts normalization\n",
"\n",
" # Add C dimension\n",
" if y_hat.ndim == 2:\n",
" remove_dimension = True\n",
" y_hat = y_hat.unsqueeze(-1)\n",
" else:\n",
" remove_dimension = False\n",
"\n",
" y_scale = self.scaler.x_scale[:, :, [y_idx]]\n",
" y_loc = self.scaler.x_shift[:, :, [y_idx]]\n",
"\n",
" y_scale = torch.repeat_interleave(y_scale, repeats=y_hat.shape[-1], dim=-1).to(y_hat.device)\n",
" y_loc = torch.repeat_interleave(y_loc, repeats=y_hat.shape[-1], dim=-1).to(y_hat.device)\n",
"\n",
" y_hat = self.scaler.inverse_transform(z=y_hat, x_scale=y_scale, x_shift=y_loc)\n",
" y_loc = y_loc.to(y_hat.device)\n",
" y_scale = y_scale.to(y_hat.device)\n",
" \n",
" if remove_dimension:\n",
" y_hat = y_hat.squeeze(-1)\n",
" y_loc = y_loc.squeeze(-1)\n",
" y_scale = y_scale.squeeze(-1)\n",
"\n",
" return y_hat, y_loc, y_scale\n",
"\n",
" def _parse_windows(self, batch, windows):\n",
" # Filter insample lags from outsample horizon\n",
" y_idx = batch['y_idx']\n",
" mask_idx = batch['temporal_cols'].get_loc('available_mask')\n",
"\n",
" insample_y = windows['temporal'][:, :self.input_size, y_idx]\n",
" insample_mask = windows['temporal'][:, :self.input_size, mask_idx]\n",
"\n",
" # Declare additional information\n",
" outsample_y = None\n",
" outsample_mask = None\n",
" hist_exog = None\n",
" futr_exog = None\n",
" stat_exog = None\n",
"\n",
" if self.h > 0:\n",
" outsample_y = windows['temporal'][:, self.input_size:, y_idx]\n",
" outsample_mask = windows['temporal'][:, self.input_size:, mask_idx]\n",
"\n",
" if len(self.hist_exog_list):\n",
" hist_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.hist_exog_list)\n",
" hist_exog = windows['temporal'][:, :self.input_size, hist_exog_idx]\n",
"\n",
" if len(self.futr_exog_list):\n",
" futr_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.futr_exog_list)\n",
" futr_exog = windows['temporal'][:, :, futr_exog_idx]\n",
"\n",
" if len(self.stat_exog_list):\n",
" static_idx = get_indexer_raise_missing(windows['static_cols'], self.stat_exog_list)\n",
" stat_exog = windows['static'][:, static_idx]\n",
"\n",
" # TODO: think a better way of removing insample_y features\n",
" if self.exclude_insample_y:\n",
" insample_y = insample_y * 0\n",
"\n",
" return insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" # Create and normalize windows [Ws, L+H, C]\n",
" windows = self._create_windows(batch, step='train')\n",
" y_idx = batch['y_idx']\n",
" original_outsample_y = torch.clone(windows['temporal'][:,-self.h:,y_idx])\n",
" windows = self._normalization(windows=windows, y_idx=y_idx)\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
"\n",
" windows_batch = dict(insample_y=insample_y, # [Ws, L]\n",
" insample_mask=insample_mask, # [Ws, L]\n",
" futr_exog=futr_exog, # [Ws, L+H]\n",
" hist_exog=hist_exog, # [Ws, L]\n",
" stat_exog=stat_exog) # [Ws, 1]\n",
"\n",
" # Model Predictions\n",
" output = self(windows_batch)\n",
" if self.loss.is_distribution_output:\n",
" _, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=y_idx)\n",
" outsample_y = original_outsample_y\n",
" distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n",
" loss = self.loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)\n",
" else:\n",
" loss = self.loss(y=outsample_y, y_hat=output, mask=outsample_mask)\n",
"\n",
" if torch.isnan(loss):\n",
" print('Model Parameters', self.hparams)\n",
" print('insample_y', torch.isnan(insample_y).sum())\n",
" print('outsample_y', torch.isnan(outsample_y).sum())\n",
" print('output', torch.isnan(output).sum())\n",
" raise Exception('Loss is NaN, training stopped.')\n",
"\n",
" self.log(\n",
" 'train_loss',\n",
" loss.item(),\n",
" batch_size=outsample_y.size(0),\n",
" prog_bar=True,\n",
" on_epoch=True,\n",
" )\n",
" self.train_trajectories.append((self.global_step, loss.item()))\n",
" return loss\n",
"\n",
" def _compute_valid_loss(self, outsample_y, output, outsample_mask, temporal_cols, y_idx):\n",
" if self.loss.is_distribution_output:\n",
" _, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,\n",
" temporal_cols=temporal_cols,\n",
" y_idx=y_idx)\n",
" distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n",
" _, sample_mean, quants = self.loss.sample(distr_args=distr_args)\n",
"\n",
" if str(type(self.valid_loss)) in\\\n",
" [\"\", \"\"]:\n",
" output = quants\n",
" elif str(type(self.valid_loss)) in [\"\"]:\n",
" output = torch.unsqueeze(sample_mean, dim=-1) # [N,H,1] -> [N,H]\n",
"\n",
" # Validation Loss evaluation\n",
" if self.valid_loss.is_distribution_output:\n",
" valid_loss = self.valid_loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)\n",
" else:\n",
" output, _, _ = self._inv_normalization(y_hat=output,\n",
" temporal_cols=temporal_cols,\n",
" y_idx=y_idx)\n",
" valid_loss = self.valid_loss(y=outsample_y, y_hat=output, mask=outsample_mask)\n",
" return valid_loss\n",
" \n",
" def validation_step(self, batch, batch_idx):\n",
" if self.val_size == 0:\n",
" return np.nan\n",
"\n",
" # TODO: Hack to compute number of windows\n",
" windows = self._create_windows(batch, step='val')\n",
" n_windows = len(windows['temporal'])\n",
" y_idx = batch['y_idx']\n",
"\n",
" # Number of windows in batch\n",
" windows_batch_size = self.inference_windows_batch_size\n",
" if windows_batch_size < 0:\n",
" windows_batch_size = n_windows\n",
" n_batches = int(np.ceil(n_windows/windows_batch_size))\n",
"\n",
" valid_losses = []\n",
" batch_sizes = []\n",
" for i in range(n_batches):\n",
" # Create and normalize windows [Ws, L+H, C]\n",
" w_idxs = np.arange(i*windows_batch_size, \n",
" min((i+1)*windows_batch_size, n_windows))\n",
" windows = self._create_windows(batch, step='val', w_idxs=w_idxs)\n",
" original_outsample_y = torch.clone(windows['temporal'][:,-self.h:,y_idx])\n",
" windows = self._normalization(windows=windows, y_idx=y_idx)\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, _, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
" windows_batch = dict(insample_y=insample_y, # [Ws, L]\n",
" insample_mask=insample_mask, # [Ws, L]\n",
" futr_exog=futr_exog, # [Ws, L+H]\n",
" hist_exog=hist_exog, # [Ws, L]\n",
" stat_exog=stat_exog) # [Ws, 1]\n",
" \n",
" # Model Predictions\n",
" output_batch = self(windows_batch)\n",
" valid_loss_batch = self._compute_valid_loss(outsample_y=original_outsample_y,\n",
" output=output_batch, outsample_mask=outsample_mask,\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=batch['y_idx'])\n",
" valid_losses.append(valid_loss_batch)\n",
" batch_sizes.append(len(output_batch))\n",
" \n",
" valid_loss = torch.stack(valid_losses)\n",
" batch_sizes = torch.tensor(batch_sizes, device=valid_loss.device)\n",
" batch_size = torch.sum(batch_sizes)\n",
" valid_loss = torch.sum(valid_loss * batch_sizes) / batch_size\n",
"\n",
" if torch.isnan(valid_loss):\n",
" raise Exception('Loss is NaN, training stopped.')\n",
"\n",
" self.log(\n",
" 'valid_loss',\n",
" valid_loss.item(),\n",
" batch_size=batch_size,\n",
" prog_bar=True,\n",
" on_epoch=True,\n",
" )\n",
" self.validation_step_outputs.append(valid_loss)\n",
" return valid_loss\n",
"\n",
" def predict_step(self, batch, batch_idx):\n",
"\n",
" # TODO: Hack to compute number of windows\n",
" windows = self._create_windows(batch, step='predict')\n",
" n_windows = len(windows['temporal'])\n",
" y_idx = batch['y_idx']\n",
"\n",
" # Number of windows in batch\n",
" windows_batch_size = self.inference_windows_batch_size\n",
" if windows_batch_size < 0:\n",
" windows_batch_size = n_windows\n",
" n_batches = int(np.ceil(n_windows/windows_batch_size))\n",
"\n",
" y_hats = []\n",
" for i in range(n_batches):\n",
" # Create and normalize windows [Ws, L+H, C]\n",
" w_idxs = np.arange(i*windows_batch_size, \n",
" min((i+1)*windows_batch_size, n_windows))\n",
" windows = self._create_windows(batch, step='predict', w_idxs=w_idxs)\n",
" windows = self._normalization(windows=windows, y_idx=y_idx)\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, _, _, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
" windows_batch = dict(insample_y=insample_y, # [Ws, L]\n",
" insample_mask=insample_mask, # [Ws, L]\n",
" futr_exog=futr_exog, # [Ws, L+H]\n",
" hist_exog=hist_exog, # [Ws, L]\n",
" stat_exog=stat_exog) # [Ws, 1]\n",
" \n",
" # Model Predictions\n",
" output_batch = self(windows_batch)\n",
" # Inverse normalization and sampling\n",
" if self.loss.is_distribution_output:\n",
" _, y_loc, y_scale = self._inv_normalization(y_hat=output_batch[0],\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=y_idx)\n",
" distr_args = self.loss.scale_decouple(output=output_batch, loc=y_loc, scale=y_scale)\n",
" _, sample_mean, quants = self.loss.sample(distr_args=distr_args)\n",
" y_hat = torch.concat((sample_mean, quants), axis=2)\n",
"\n",
" if self.loss.return_params:\n",
" distr_args = torch.stack(distr_args, dim=-1)\n",
" distr_args = torch.reshape(distr_args, (len(windows[\"temporal\"]), self.h, -1))\n",
" y_hat = torch.concat((y_hat, distr_args), axis=2)\n",
" else:\n",
" y_hat, _, _ = self._inv_normalization(y_hat=output_batch,\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=y_idx)\n",
" y_hats.append(y_hat)\n",
" y_hat = torch.cat(y_hats, dim=0)\n",
" return y_hat\n",
" \n",
" def fit(self, dataset, val_size=0, test_size=0, random_seed=None, distributed_config=None):\n",
" \"\"\" Fit.\n",
"\n",
" The `fit` method, optimizes the neural network's weights using the\n",
" initialization parameters (`learning_rate`, `windows_batch_size`, ...)\n",
" and the `loss` function as defined during the initialization. \n",
" Within `fit` we use a PyTorch Lightning `Trainer` that\n",
" inherits the initialization's `self.trainer_kwargs`, to customize\n",
" its inputs, see [PL's trainer arguments](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).\n",
"\n",
" The method is designed to be compatible with SKLearn-like classes\n",
" and in particular to be compatible with the StatsForecast library.\n",
"\n",
" By default the `model` is not saving training checkpoints to protect \n",
" disk memory, to get them change `enable_checkpointing=True` in `__init__`.\n",
"\n",
" **Parameters:**
\n",
" `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n",
" `val_size`: int, validation size for temporal cross-validation.
\n",
" `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.
\n",
" `test_size`: int, test size for temporal cross-validation.
\n",
" \"\"\"\n",
" return self._fit(\n",
" dataset=dataset,\n",
" batch_size=self.batch_size,\n",
" valid_batch_size=self.valid_batch_size,\n",
" val_size=val_size,\n",
" test_size=test_size,\n",
" random_seed=random_seed,\n",
" distributed_config=distributed_config,\n",
" )\n",
"\n",
" def predict(self, dataset, test_size=None, step_size=1,\n",
" random_seed=None, **data_module_kwargs):\n",
" \"\"\" Predict.\n",
"\n",
" Neural network prediction with PL's `Trainer` execution of `predict_step`.\n",
"\n",
" **Parameters:**
\n",
" `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n",
" `test_size`: int=None, test size for temporal cross-validation.
\n",
" `step_size`: int=1, Step size between each window.
\n",
" `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.
\n",
" `**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).\n",
" \"\"\"\n",
" self._check_exog(dataset)\n",
" self._restart_seed(random_seed)\n",
"\n",
" self.predict_step_size = step_size\n",
" self.decompose_forecast = False\n",
" datamodule = TimeSeriesDataModule(dataset=dataset,\n",
" valid_batch_size=self.valid_batch_size,\n",
" **data_module_kwargs)\n",
"\n",
" # Protect when case of multiple gpu. PL does not support return preds with multiple gpu.\n",
" pred_trainer_kwargs = self.trainer_kwargs.copy()\n",
" if (pred_trainer_kwargs.get('accelerator', None) == \"gpu\") and (torch.cuda.device_count() > 1):\n",
" pred_trainer_kwargs['devices'] = [0]\n",
"\n",
" trainer = pl.Trainer(**pred_trainer_kwargs)\n",
" fcsts = trainer.predict(self, datamodule=datamodule) \n",
" fcsts = torch.vstack(fcsts).numpy().flatten()\n",
" fcsts = fcsts.reshape(-1, len(self.loss.output_names))\n",
" return fcsts\n",
"\n",
" def decompose(self, dataset, step_size=1, random_seed=None, **data_module_kwargs):\n",
" \"\"\" Decompose Predictions.\n",
"\n",
" Decompose the predictions through the network's layers.\n",
" Available methods are `ESRNN`, `NHITS`, `NBEATS`, and `NBEATSx`.\n",
"\n",
" **Parameters:**
\n",
" `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation here](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n",
" `step_size`: int=1, step size between each window of temporal data.
\n",
" `**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).\n",
" \"\"\"\n",
" # Restart random seed\n",
" if random_seed is None:\n",
" random_seed = self.random_seed\n",
" torch.manual_seed(random_seed)\n",
"\n",
" self.predict_step_size = step_size\n",
" self.decompose_forecast = True\n",
" datamodule = TimeSeriesDataModule(dataset=dataset,\n",
" valid_batch_size=self.valid_batch_size,\n",
" **data_module_kwargs)\n",
" trainer = pl.Trainer(**self.trainer_kwargs)\n",
" fcsts = trainer.predict(self, datamodule=datamodule)\n",
" self.decompose_forecast = False # Default decomposition back to false\n",
" return torch.vstack(fcsts).numpy()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1712ea15",
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseWindows, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "48063f70",
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseWindows.fit, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "75529be6",
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseWindows.predict, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a1f8315d",
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseWindows.decompose, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8927f2e5-f376-4c99-bb8f-8cbb73efe01e",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"from neuralforecast.losses.pytorch import MAE\n",
"from neuralforecast.utils import AirPassengersDF\n",
"from neuralforecast.tsdataset import TimeSeriesDataset, TimeSeriesDataModule"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "61490e69-f014-4087-83c5-540d5bd7d458",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# add h=0,1 unit test for _parse_windows \n",
"# Declare batch\n",
"AirPassengersDF['x'] = np.array(len(AirPassengersDF))\n",
"AirPassengersDF['x2'] = np.array(len(AirPassengersDF)) * 2\n",
"dataset, indices, dates, ds = TimeSeriesDataset.from_df(df=AirPassengersDF)\n",
"data = TimeSeriesDataModule(dataset=dataset, batch_size=1, drop_last=True)\n",
"\n",
"train_loader = data.train_dataloader()\n",
"batch = next(iter(train_loader))\n",
"\n",
"# Instantiate BaseWindows to test _parse_windows method h in [0,1]\n",
"for h in [0, 1]:\n",
" basewindows = BaseWindows(h=h,\n",
" input_size=len(AirPassengersDF)-h,\n",
" hist_exog_list=['x'],\n",
" loss=MAE(),\n",
" valid_loss=MAE(),\n",
" learning_rate=0.001,\n",
" max_steps=1,\n",
" val_check_steps=0,\n",
" batch_size=1,\n",
" valid_batch_size=1,\n",
" windows_batch_size=1,\n",
" inference_windows_batch_size=1,\n",
" start_padding_enabled=False)\n",
"\n",
" windows = basewindows._create_windows(batch, step='train')\n",
" original_outsample_y = torch.clone(windows['temporal'][:,-basewindows.h:,0])\n",
" windows = basewindows._normalization(windows=windows, y_idx=0)\n",
"\n",
" insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)\n",
"\n",
" # Check equality of parsed and original insample_y\n",
" parsed_insample_y = insample_y.numpy().flatten()\n",
" original_insample_y = AirPassengersDF.y.values\n",
" test_eq(parsed_insample_y, original_insample_y[:basewindows.input_size])\n",
"\n",
" # Check equality of parsed and original hist_exog\n",
" parsed_hist_exog = hist_exog.numpy().flatten()\n",
" original_hist_exog = AirPassengersDF.x.values\n",
" test_eq(parsed_hist_exog, original_hist_exog[:basewindows.input_size])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "86ab58a9",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# Test that start_padding_enabled=True solves the problem of short series\n",
"h = 12\n",
"basewindows = BaseWindows(h=h,\n",
" input_size=500,\n",
" hist_exog_list=['x'],\n",
" loss=MAE(),\n",
" valid_loss=MAE(),\n",
" learning_rate=0.001,\n",
" max_steps=1,\n",
" val_check_steps=0,\n",
" batch_size=1,\n",
" valid_batch_size=1,\n",
" windows_batch_size=10,\n",
" inference_windows_batch_size=2,\n",
" start_padding_enabled=True)\n",
"\n",
"windows = basewindows._create_windows(batch, step='train')\n",
"windows = basewindows._normalization(windows=windows, y_idx=0)\n",
"insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)\n",
"\n",
"basewindows.val_size = 12\n",
"windows = basewindows._create_windows(batch, step='val')\n",
"windows = basewindows._normalization(windows=windows, y_idx=0)\n",
"insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)\n",
"\n",
"basewindows.test_size = 12\n",
"basewindows.predict_step_size = 1\n",
"windows = basewindows._create_windows(batch, step='predict')\n",
"windows = basewindows._normalization(windows=windows, y_idx=0)\n",
"insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "54d2e850",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"\n",
"# Test that hist_exog_list and futr_exog_list correctly filter data.\n",
"# that is sent to scaler.\n",
"basewindows = BaseWindows(h=12,\n",
" input_size=500,\n",
" hist_exog_list=['x', 'x2'],\n",
" futr_exog_list=['x'],\n",
" loss=MAE(),\n",
" valid_loss=MAE(),\n",
" learning_rate=0.001,\n",
" max_steps=1,\n",
" val_check_steps=0,\n",
" batch_size=1,\n",
" valid_batch_size=1,\n",
" windows_batch_size=10,\n",
" inference_windows_batch_size=2,\n",
" start_padding_enabled=True)\n",
"\n",
"windows = basewindows._create_windows(batch, step='train')\n",
"\n",
"temporal_cols = windows['temporal_cols'].copy() # B, L+H, C\n",
"temporal_data_cols = basewindows._get_temporal_exogenous_cols(temporal_cols=temporal_cols)\n",
"\n",
"test_eq(set(temporal_data_cols), set(['x', 'x2']))\n",
"test_eq(windows['temporal'].shape, torch.Size([10,500+12,len(['y', 'x', 'x2', 'available_mask'])]))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bf493ff9",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "python3",
"language": "python",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}