{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "524620c1", "metadata": {}, "outputs": [], "source": [ "#| default_exp common._base_windows" ] }, { "cell_type": "code", "execution_count": null, "id": "15392f6f", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "%load_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "markdown", "id": "1e0f9607-d12d-44e5-b2be-91a57a0bca79", "metadata": {}, "source": [ "# BaseWindows\n", "\n", "> The `BaseWindows` class contains standard methods shared across window-based neural networks; in contrast to recurrent neural networks these models commit to a fixed sequence length input. The class is represented by `MLP`, and other more sophisticated architectures like `NBEATS`, and `NHITS`." ] }, { "cell_type": "markdown", "id": "1730a556-1574-40ad-92a2-23b924ceb398", "metadata": {}, "source": [ "The standard methods include data preprocessing `_normalization`, optimization utilities like parameter initialization, `training_step`, `validation_step`, and shared `fit` and `predict` methods.These shared methods enable all the `neuralforecast.models` compatibility with the `core.NeuralForecast` wrapper class. " ] }, { "cell_type": "code", "execution_count": null, "id": "2508f7a9-1433-4ad8-8f2f-0078c6ed6c3c", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "from fastcore.test import test_eq\n", "from nbdev.showdoc import show_doc" ] }, { "cell_type": "code", "execution_count": null, "id": "44065066-e72a-431f-938f-1528adef9fe8", "metadata": {}, "outputs": [], "source": [ "#| export\n", "import numpy as np\n", "import torch\n", "import torch.nn as nn\n", "import pytorch_lightning as pl\n", "\n", "from neuralforecast.common._base_model import BaseModel\n", "from neuralforecast.common._scalers import TemporalNorm\n", "from neuralforecast.tsdataset import TimeSeriesDataModule\n", "from neuralforecast.utils import get_indexer_raise_missing" ] }, { "cell_type": "code", "execution_count": null, "id": "ce70cd14-ecb1-4205-8511-fecbd26c8408", "metadata": {}, "outputs": [], "source": [ "#| export\n", "class BaseWindows(BaseModel):\n", " \"\"\" Base Windows\n", " \n", " Base class for all windows-based models. The forecasts are produced separately \n", " for each window, which are randomly sampled during training.\n", " \n", " This class implements the basic functionality for all windows-based models, including:\n", " - PyTorch Lightning's methods training_step, validation_step, predict_step.
\n", " - fit and predict methods used by NeuralForecast.core class.
\n", " - sampling and wrangling methods to generate windows.\n", " \"\"\"\n", " def __init__(self,\n", " h,\n", " input_size,\n", " loss,\n", " valid_loss,\n", " learning_rate,\n", " max_steps,\n", " val_check_steps,\n", " batch_size,\n", " valid_batch_size,\n", " windows_batch_size,\n", " inference_windows_batch_size,\n", " start_padding_enabled,\n", " step_size=1,\n", " num_lr_decays=0,\n", " early_stop_patience_steps=-1,\n", " scaler_type='identity',\n", " futr_exog_list=None,\n", " hist_exog_list=None,\n", " stat_exog_list=None,\n", " exclude_insample_y=False,\n", " num_workers_loader=0,\n", " drop_last_loader=False,\n", " random_seed=1,\n", " alias=None,\n", " optimizer=None,\n", " optimizer_kwargs=None,\n", " **trainer_kwargs):\n", " super().__init__(\n", " random_seed=random_seed,\n", " loss=loss,\n", " valid_loss=valid_loss,\n", " optimizer=optimizer,\n", " optimizer_kwargs=optimizer_kwargs,\n", " futr_exog_list=futr_exog_list,\n", " hist_exog_list=hist_exog_list,\n", " stat_exog_list=stat_exog_list,\n", " max_steps=max_steps,\n", " early_stop_patience_steps=early_stop_patience_steps, \n", " **trainer_kwargs,\n", " )\n", "\n", " # Padder to complete train windows, \n", " # example y=[1,2,3,4,5] h=3 -> last y_output = [5,0,0]\n", " self.h = h\n", " self.input_size = input_size\n", " self.windows_batch_size = windows_batch_size\n", " self.start_padding_enabled = start_padding_enabled\n", " if start_padding_enabled:\n", " self.padder_train = nn.ConstantPad1d(padding=(self.input_size-1, self.h), value=0)\n", " else:\n", " self.padder_train = nn.ConstantPad1d(padding=(0, self.h), value=0)\n", "\n", " # Batch sizes\n", " self.batch_size = batch_size\n", " if valid_batch_size is None:\n", " self.valid_batch_size = batch_size\n", " else:\n", " self.valid_batch_size = valid_batch_size\n", " if inference_windows_batch_size is None:\n", " self.inference_windows_batch_size = windows_batch_size\n", " else:\n", " self.inference_windows_batch_size = inference_windows_batch_size\n", "\n", " # Optimization \n", " self.learning_rate = learning_rate\n", " self.max_steps = max_steps\n", " self.num_lr_decays = num_lr_decays\n", " self.lr_decay_steps = (\n", " max(max_steps // self.num_lr_decays, 1) if self.num_lr_decays > 0 else 10e7\n", " )\n", " self.early_stop_patience_steps = early_stop_patience_steps\n", " self.val_check_steps = val_check_steps\n", " self.windows_batch_size = windows_batch_size\n", " self.step_size = step_size\n", " \n", " self.exclude_insample_y = exclude_insample_y\n", "\n", " # Scaler\n", " self.scaler = TemporalNorm(\n", " scaler_type=scaler_type,\n", " dim=1, # Time dimension is 1.\n", " num_features=1+len(self.hist_exog_list)+len(self.futr_exog_list)\n", " )\n", "\n", " # Fit arguments\n", " self.val_size = 0\n", " self.test_size = 0\n", "\n", " # Model state\n", " self.decompose_forecast = False\n", "\n", " # DataModule arguments\n", " self.num_workers_loader = num_workers_loader\n", " self.drop_last_loader = drop_last_loader\n", " # used by on_validation_epoch_end hook\n", " self.validation_step_outputs = []\n", " self.alias = alias\n", "\n", " def _create_windows(self, batch, step, w_idxs=None):\n", " # Parse common data\n", " window_size = self.input_size + self.h\n", " temporal_cols = batch['temporal_cols']\n", " temporal = batch['temporal']\n", "\n", " if step == 'train':\n", " if self.val_size + self.test_size > 0:\n", " cutoff = -self.val_size - self.test_size\n", " temporal = temporal[:, :, :cutoff]\n", "\n", " temporal = self.padder_train(temporal)\n", " if temporal.shape[-1] < window_size:\n", " raise Exception('Time series is too short for training, consider setting a smaller input size or set start_padding_enabled=True')\n", " windows = temporal.unfold(dimension=-1, \n", " size=window_size, \n", " step=self.step_size)\n", "\n", " # [B, C, Ws, L+H] 0, 1, 2, 3\n", " # -> [B * Ws, L+H, C] 0, 2, 3, 1\n", " windows_per_serie = windows.shape[2]\n", " windows = windows.permute(0, 2, 3, 1).contiguous()\n", " windows = windows.reshape(-1, window_size, len(temporal_cols))\n", "\n", " # Sample and Available conditions\n", " available_idx = temporal_cols.get_loc('available_mask')\n", " available_condition = windows[:, :self.input_size, available_idx]\n", " available_condition = torch.sum(available_condition, axis=1)\n", " final_condition = (available_condition > 0)\n", " if self.h > 0:\n", " sample_condition = windows[:, self.input_size:, available_idx]\n", " sample_condition = torch.sum(sample_condition, axis=1)\n", " final_condition = (sample_condition > 0) & (available_condition > 0)\n", " windows = windows[final_condition]\n", "\n", " # Parse Static data to match windows\n", " # [B, S_in] -> [B, Ws, S_in] -> [B*Ws, S_in]\n", " static = batch.get('static', None)\n", " static_cols=batch.get('static_cols', None)\n", " if static is not None:\n", " static = torch.repeat_interleave(static, \n", " repeats=windows_per_serie, dim=0)\n", " static = static[final_condition]\n", "\n", " # Protection of empty windows\n", " if final_condition.sum() == 0:\n", " raise Exception('No windows available for training')\n", "\n", " # Sample windows\n", " n_windows = len(windows)\n", " if self.windows_batch_size is not None:\n", " w_idxs = np.random.choice(n_windows, \n", " size=self.windows_batch_size,\n", " replace=(n_windows < self.windows_batch_size))\n", " windows = windows[w_idxs]\n", " \n", " if static is not None:\n", " static = static[w_idxs]\n", "\n", " # think about interaction available * sample mask\n", " # [B, C, Ws, L+H]\n", " windows_batch = dict(temporal=windows,\n", " temporal_cols=temporal_cols,\n", " static=static,\n", " static_cols=static_cols)\n", " return windows_batch\n", "\n", " elif step in ['predict', 'val']:\n", "\n", " if step == 'predict':\n", " initial_input = temporal.shape[-1] - self.test_size\n", " if initial_input <= self.input_size: # There is not enough data to predict first timestamp\n", " padder_left = nn.ConstantPad1d(padding=(self.input_size-initial_input, 0), value=0)\n", " temporal = padder_left(temporal)\n", " predict_step_size = self.predict_step_size\n", " cutoff = - self.input_size - self.test_size\n", " temporal = temporal[:, :, cutoff:]\n", "\n", " elif step == 'val':\n", " predict_step_size = self.step_size\n", " cutoff = -self.input_size - self.val_size - self.test_size\n", " if self.test_size > 0:\n", " temporal = batch['temporal'][:, :, cutoff:-self.test_size]\n", " else:\n", " temporal = batch['temporal'][:, :, cutoff:]\n", " if temporal.shape[-1] < window_size:\n", " initial_input = temporal.shape[-1] - self.val_size\n", " padder_left = nn.ConstantPad1d(padding=(self.input_size-initial_input, 0), value=0)\n", " temporal = padder_left(temporal)\n", "\n", " if (step=='predict') and (self.test_size==0) and (len(self.futr_exog_list)==0):\n", " padder_right = nn.ConstantPad1d(padding=(0, self.h), value=0)\n", " temporal = padder_right(temporal)\n", "\n", " windows = temporal.unfold(dimension=-1,\n", " size=window_size,\n", " step=predict_step_size)\n", "\n", " # [batch, channels, windows, window_size] 0, 1, 2, 3\n", " # -> [batch * windows, window_size, channels] 0, 2, 3, 1\n", " windows_per_serie = windows.shape[2]\n", " windows = windows.permute(0, 2, 3, 1).contiguous()\n", " windows = windows.reshape(-1, window_size, len(temporal_cols))\n", "\n", " static = batch.get('static', None)\n", " static_cols=batch.get('static_cols', None)\n", " if static is not None:\n", " static = torch.repeat_interleave(static, \n", " repeats=windows_per_serie, dim=0)\n", " \n", " # Sample windows for batched prediction\n", " if w_idxs is not None:\n", " windows = windows[w_idxs]\n", " if static is not None:\n", " static = static[w_idxs]\n", " \n", " windows_batch = dict(temporal=windows,\n", " temporal_cols=temporal_cols,\n", " static=static,\n", " static_cols=static_cols)\n", " return windows_batch\n", " else:\n", " raise ValueError(f'Unknown step {step}')\n", "\n", " def _normalization(self, windows, y_idx):\n", " # windows are already filtered by train/validation/test\n", " # from the `create_windows_method` nor leakage risk\n", " temporal = windows['temporal'] # B, L+H, C\n", " temporal_cols = windows['temporal_cols'].copy() # B, L+H, C\n", "\n", " # To avoid leakage uses only the lags\n", " #temporal_data_cols = temporal_cols.drop('available_mask').tolist()\n", " temporal_data_cols = self._get_temporal_exogenous_cols(temporal_cols=temporal_cols)\n", " temporal_idxs = get_indexer_raise_missing(temporal_cols, temporal_data_cols)\n", " temporal_idxs = np.append(y_idx, temporal_idxs)\n", " temporal_data = temporal[:, :, temporal_idxs]\n", " temporal_mask = temporal[:, :, temporal_cols.get_loc('available_mask')].clone()\n", " if self.h > 0:\n", " temporal_mask[:, -self.h:] = 0.0\n", "\n", " # Normalize. self.scaler stores the shift and scale for inverse transform\n", " temporal_mask = temporal_mask.unsqueeze(-1) # Add channel dimension for scaler.transform.\n", " temporal_data = self.scaler.transform(x=temporal_data, mask=temporal_mask)\n", "\n", " # Replace values in windows dict\n", " temporal[:, :, temporal_idxs] = temporal_data\n", " windows['temporal'] = temporal\n", "\n", " return windows\n", "\n", " def _inv_normalization(self, y_hat, temporal_cols, y_idx):\n", " # Receives window predictions [B, H, output]\n", " # Broadcasts outputs and inverts normalization\n", "\n", " # Add C dimension\n", " if y_hat.ndim == 2:\n", " remove_dimension = True\n", " y_hat = y_hat.unsqueeze(-1)\n", " else:\n", " remove_dimension = False\n", "\n", " y_scale = self.scaler.x_scale[:, :, [y_idx]]\n", " y_loc = self.scaler.x_shift[:, :, [y_idx]]\n", "\n", " y_scale = torch.repeat_interleave(y_scale, repeats=y_hat.shape[-1], dim=-1).to(y_hat.device)\n", " y_loc = torch.repeat_interleave(y_loc, repeats=y_hat.shape[-1], dim=-1).to(y_hat.device)\n", "\n", " y_hat = self.scaler.inverse_transform(z=y_hat, x_scale=y_scale, x_shift=y_loc)\n", " y_loc = y_loc.to(y_hat.device)\n", " y_scale = y_scale.to(y_hat.device)\n", " \n", " if remove_dimension:\n", " y_hat = y_hat.squeeze(-1)\n", " y_loc = y_loc.squeeze(-1)\n", " y_scale = y_scale.squeeze(-1)\n", "\n", " return y_hat, y_loc, y_scale\n", "\n", " def _parse_windows(self, batch, windows):\n", " # Filter insample lags from outsample horizon\n", " y_idx = batch['y_idx']\n", " mask_idx = batch['temporal_cols'].get_loc('available_mask')\n", "\n", " insample_y = windows['temporal'][:, :self.input_size, y_idx]\n", " insample_mask = windows['temporal'][:, :self.input_size, mask_idx]\n", "\n", " # Declare additional information\n", " outsample_y = None\n", " outsample_mask = None\n", " hist_exog = None\n", " futr_exog = None\n", " stat_exog = None\n", "\n", " if self.h > 0:\n", " outsample_y = windows['temporal'][:, self.input_size:, y_idx]\n", " outsample_mask = windows['temporal'][:, self.input_size:, mask_idx]\n", "\n", " if len(self.hist_exog_list):\n", " hist_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.hist_exog_list)\n", " hist_exog = windows['temporal'][:, :self.input_size, hist_exog_idx]\n", "\n", " if len(self.futr_exog_list):\n", " futr_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.futr_exog_list)\n", " futr_exog = windows['temporal'][:, :, futr_exog_idx]\n", "\n", " if len(self.stat_exog_list):\n", " static_idx = get_indexer_raise_missing(windows['static_cols'], self.stat_exog_list)\n", " stat_exog = windows['static'][:, static_idx]\n", "\n", " # TODO: think a better way of removing insample_y features\n", " if self.exclude_insample_y:\n", " insample_y = insample_y * 0\n", "\n", " return insample_y, insample_mask, outsample_y, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog\n", "\n", " def training_step(self, batch, batch_idx):\n", " # Create and normalize windows [Ws, L+H, C]\n", " windows = self._create_windows(batch, step='train')\n", " y_idx = batch['y_idx']\n", " original_outsample_y = torch.clone(windows['temporal'][:,-self.h:,y_idx])\n", " windows = self._normalization(windows=windows, y_idx=y_idx)\n", "\n", " # Parse windows\n", " insample_y, insample_mask, outsample_y, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n", "\n", " windows_batch = dict(insample_y=insample_y, # [Ws, L]\n", " insample_mask=insample_mask, # [Ws, L]\n", " futr_exog=futr_exog, # [Ws, L+H]\n", " hist_exog=hist_exog, # [Ws, L]\n", " stat_exog=stat_exog) # [Ws, 1]\n", "\n", " # Model Predictions\n", " output = self(windows_batch)\n", " if self.loss.is_distribution_output:\n", " _, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,\n", " temporal_cols=batch['temporal_cols'],\n", " y_idx=y_idx)\n", " outsample_y = original_outsample_y\n", " distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n", " loss = self.loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)\n", " else:\n", " loss = self.loss(y=outsample_y, y_hat=output, mask=outsample_mask)\n", "\n", " if torch.isnan(loss):\n", " print('Model Parameters', self.hparams)\n", " print('insample_y', torch.isnan(insample_y).sum())\n", " print('outsample_y', torch.isnan(outsample_y).sum())\n", " print('output', torch.isnan(output).sum())\n", " raise Exception('Loss is NaN, training stopped.')\n", "\n", " self.log(\n", " 'train_loss',\n", " loss.item(),\n", " batch_size=outsample_y.size(0),\n", " prog_bar=True,\n", " on_epoch=True,\n", " )\n", " self.train_trajectories.append((self.global_step, loss.item()))\n", " return loss\n", "\n", " def _compute_valid_loss(self, outsample_y, output, outsample_mask, temporal_cols, y_idx):\n", " if self.loss.is_distribution_output:\n", " _, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,\n", " temporal_cols=temporal_cols,\n", " y_idx=y_idx)\n", " distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n", " _, sample_mean, quants = self.loss.sample(distr_args=distr_args)\n", "\n", " if str(type(self.valid_loss)) in\\\n", " [\"\", \"\"]:\n", " output = quants\n", " elif str(type(self.valid_loss)) in [\"\"]:\n", " output = torch.unsqueeze(sample_mean, dim=-1) # [N,H,1] -> [N,H]\n", "\n", " # Validation Loss evaluation\n", " if self.valid_loss.is_distribution_output:\n", " valid_loss = self.valid_loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)\n", " else:\n", " output, _, _ = self._inv_normalization(y_hat=output,\n", " temporal_cols=temporal_cols,\n", " y_idx=y_idx)\n", " valid_loss = self.valid_loss(y=outsample_y, y_hat=output, mask=outsample_mask)\n", " return valid_loss\n", " \n", " def validation_step(self, batch, batch_idx):\n", " if self.val_size == 0:\n", " return np.nan\n", "\n", " # TODO: Hack to compute number of windows\n", " windows = self._create_windows(batch, step='val')\n", " n_windows = len(windows['temporal'])\n", " y_idx = batch['y_idx']\n", "\n", " # Number of windows in batch\n", " windows_batch_size = self.inference_windows_batch_size\n", " if windows_batch_size < 0:\n", " windows_batch_size = n_windows\n", " n_batches = int(np.ceil(n_windows/windows_batch_size))\n", "\n", " valid_losses = []\n", " batch_sizes = []\n", " for i in range(n_batches):\n", " # Create and normalize windows [Ws, L+H, C]\n", " w_idxs = np.arange(i*windows_batch_size, \n", " min((i+1)*windows_batch_size, n_windows))\n", " windows = self._create_windows(batch, step='val', w_idxs=w_idxs)\n", " original_outsample_y = torch.clone(windows['temporal'][:,-self.h:,y_idx])\n", " windows = self._normalization(windows=windows, y_idx=y_idx)\n", "\n", " # Parse windows\n", " insample_y, insample_mask, _, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n", " windows_batch = dict(insample_y=insample_y, # [Ws, L]\n", " insample_mask=insample_mask, # [Ws, L]\n", " futr_exog=futr_exog, # [Ws, L+H]\n", " hist_exog=hist_exog, # [Ws, L]\n", " stat_exog=stat_exog) # [Ws, 1]\n", " \n", " # Model Predictions\n", " output_batch = self(windows_batch)\n", " valid_loss_batch = self._compute_valid_loss(outsample_y=original_outsample_y,\n", " output=output_batch, outsample_mask=outsample_mask,\n", " temporal_cols=batch['temporal_cols'],\n", " y_idx=batch['y_idx'])\n", " valid_losses.append(valid_loss_batch)\n", " batch_sizes.append(len(output_batch))\n", " \n", " valid_loss = torch.stack(valid_losses)\n", " batch_sizes = torch.tensor(batch_sizes, device=valid_loss.device)\n", " batch_size = torch.sum(batch_sizes)\n", " valid_loss = torch.sum(valid_loss * batch_sizes) / batch_size\n", "\n", " if torch.isnan(valid_loss):\n", " raise Exception('Loss is NaN, training stopped.')\n", "\n", " self.log(\n", " 'valid_loss',\n", " valid_loss.item(),\n", " batch_size=batch_size,\n", " prog_bar=True,\n", " on_epoch=True,\n", " )\n", " self.validation_step_outputs.append(valid_loss)\n", " return valid_loss\n", "\n", " def predict_step(self, batch, batch_idx):\n", "\n", " # TODO: Hack to compute number of windows\n", " windows = self._create_windows(batch, step='predict')\n", " n_windows = len(windows['temporal'])\n", " y_idx = batch['y_idx']\n", "\n", " # Number of windows in batch\n", " windows_batch_size = self.inference_windows_batch_size\n", " if windows_batch_size < 0:\n", " windows_batch_size = n_windows\n", " n_batches = int(np.ceil(n_windows/windows_batch_size))\n", "\n", " y_hats = []\n", " for i in range(n_batches):\n", " # Create and normalize windows [Ws, L+H, C]\n", " w_idxs = np.arange(i*windows_batch_size, \n", " min((i+1)*windows_batch_size, n_windows))\n", " windows = self._create_windows(batch, step='predict', w_idxs=w_idxs)\n", " windows = self._normalization(windows=windows, y_idx=y_idx)\n", "\n", " # Parse windows\n", " insample_y, insample_mask, _, _, \\\n", " hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n", " windows_batch = dict(insample_y=insample_y, # [Ws, L]\n", " insample_mask=insample_mask, # [Ws, L]\n", " futr_exog=futr_exog, # [Ws, L+H]\n", " hist_exog=hist_exog, # [Ws, L]\n", " stat_exog=stat_exog) # [Ws, 1]\n", " \n", " # Model Predictions\n", " output_batch = self(windows_batch)\n", " # Inverse normalization and sampling\n", " if self.loss.is_distribution_output:\n", " _, y_loc, y_scale = self._inv_normalization(y_hat=output_batch[0],\n", " temporal_cols=batch['temporal_cols'],\n", " y_idx=y_idx)\n", " distr_args = self.loss.scale_decouple(output=output_batch, loc=y_loc, scale=y_scale)\n", " _, sample_mean, quants = self.loss.sample(distr_args=distr_args)\n", " y_hat = torch.concat((sample_mean, quants), axis=2)\n", "\n", " if self.loss.return_params:\n", " distr_args = torch.stack(distr_args, dim=-1)\n", " distr_args = torch.reshape(distr_args, (len(windows[\"temporal\"]), self.h, -1))\n", " y_hat = torch.concat((y_hat, distr_args), axis=2)\n", " else:\n", " y_hat, _, _ = self._inv_normalization(y_hat=output_batch,\n", " temporal_cols=batch['temporal_cols'],\n", " y_idx=y_idx)\n", " y_hats.append(y_hat)\n", " y_hat = torch.cat(y_hats, dim=0)\n", " return y_hat\n", " \n", " def fit(self, dataset, val_size=0, test_size=0, random_seed=None, distributed_config=None):\n", " \"\"\" Fit.\n", "\n", " The `fit` method, optimizes the neural network's weights using the\n", " initialization parameters (`learning_rate`, `windows_batch_size`, ...)\n", " and the `loss` function as defined during the initialization. \n", " Within `fit` we use a PyTorch Lightning `Trainer` that\n", " inherits the initialization's `self.trainer_kwargs`, to customize\n", " its inputs, see [PL's trainer arguments](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).\n", "\n", " The method is designed to be compatible with SKLearn-like classes\n", " and in particular to be compatible with the StatsForecast library.\n", "\n", " By default the `model` is not saving training checkpoints to protect \n", " disk memory, to get them change `enable_checkpointing=True` in `__init__`.\n", "\n", " **Parameters:**
\n", " `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n", " `val_size`: int, validation size for temporal cross-validation.
\n", " `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.
\n", " `test_size`: int, test size for temporal cross-validation.
\n", " \"\"\"\n", " return self._fit(\n", " dataset=dataset,\n", " batch_size=self.batch_size,\n", " valid_batch_size=self.valid_batch_size,\n", " val_size=val_size,\n", " test_size=test_size,\n", " random_seed=random_seed,\n", " distributed_config=distributed_config,\n", " )\n", "\n", " def predict(self, dataset, test_size=None, step_size=1,\n", " random_seed=None, **data_module_kwargs):\n", " \"\"\" Predict.\n", "\n", " Neural network prediction with PL's `Trainer` execution of `predict_step`.\n", "\n", " **Parameters:**
\n", " `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n", " `test_size`: int=None, test size for temporal cross-validation.
\n", " `step_size`: int=1, Step size between each window.
\n", " `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.
\n", " `**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).\n", " \"\"\"\n", " self._check_exog(dataset)\n", " self._restart_seed(random_seed)\n", "\n", " self.predict_step_size = step_size\n", " self.decompose_forecast = False\n", " datamodule = TimeSeriesDataModule(dataset=dataset,\n", " valid_batch_size=self.valid_batch_size,\n", " **data_module_kwargs)\n", "\n", " # Protect when case of multiple gpu. PL does not support return preds with multiple gpu.\n", " pred_trainer_kwargs = self.trainer_kwargs.copy()\n", " if (pred_trainer_kwargs.get('accelerator', None) == \"gpu\") and (torch.cuda.device_count() > 1):\n", " pred_trainer_kwargs['devices'] = [0]\n", "\n", " trainer = pl.Trainer(**pred_trainer_kwargs)\n", " fcsts = trainer.predict(self, datamodule=datamodule) \n", " fcsts = torch.vstack(fcsts).numpy().flatten()\n", " fcsts = fcsts.reshape(-1, len(self.loss.output_names))\n", " return fcsts\n", "\n", " def decompose(self, dataset, step_size=1, random_seed=None, **data_module_kwargs):\n", " \"\"\" Decompose Predictions.\n", "\n", " Decompose the predictions through the network's layers.\n", " Available methods are `ESRNN`, `NHITS`, `NBEATS`, and `NBEATSx`.\n", "\n", " **Parameters:**
\n", " `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation here](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n", " `step_size`: int=1, step size between each window of temporal data.
\n", " `**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).\n", " \"\"\"\n", " # Restart random seed\n", " if random_seed is None:\n", " random_seed = self.random_seed\n", " torch.manual_seed(random_seed)\n", "\n", " self.predict_step_size = step_size\n", " self.decompose_forecast = True\n", " datamodule = TimeSeriesDataModule(dataset=dataset,\n", " valid_batch_size=self.valid_batch_size,\n", " **data_module_kwargs)\n", " trainer = pl.Trainer(**self.trainer_kwargs)\n", " fcsts = trainer.predict(self, datamodule=datamodule)\n", " self.decompose_forecast = False # Default decomposition back to false\n", " return torch.vstack(fcsts).numpy()" ] }, { "cell_type": "code", "execution_count": null, "id": "1712ea15", "metadata": {}, "outputs": [], "source": [ "show_doc(BaseWindows, title_level=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "48063f70", "metadata": {}, "outputs": [], "source": [ "show_doc(BaseWindows.fit, title_level=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "75529be6", "metadata": {}, "outputs": [], "source": [ "show_doc(BaseWindows.predict, title_level=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "a1f8315d", "metadata": {}, "outputs": [], "source": [ "show_doc(BaseWindows.decompose, title_level=3)" ] }, { "cell_type": "code", "execution_count": null, "id": "8927f2e5-f376-4c99-bb8f-8cbb73efe01e", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "from neuralforecast.losses.pytorch import MAE\n", "from neuralforecast.utils import AirPassengersDF\n", "from neuralforecast.tsdataset import TimeSeriesDataset, TimeSeriesDataModule" ] }, { "cell_type": "code", "execution_count": null, "id": "61490e69-f014-4087-83c5-540d5bd7d458", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "# add h=0,1 unit test for _parse_windows \n", "# Declare batch\n", "AirPassengersDF['x'] = np.array(len(AirPassengersDF))\n", "AirPassengersDF['x2'] = np.array(len(AirPassengersDF)) * 2\n", "dataset, indices, dates, ds = TimeSeriesDataset.from_df(df=AirPassengersDF)\n", "data = TimeSeriesDataModule(dataset=dataset, batch_size=1, drop_last=True)\n", "\n", "train_loader = data.train_dataloader()\n", "batch = next(iter(train_loader))\n", "\n", "# Instantiate BaseWindows to test _parse_windows method h in [0,1]\n", "for h in [0, 1]:\n", " basewindows = BaseWindows(h=h,\n", " input_size=len(AirPassengersDF)-h,\n", " hist_exog_list=['x'],\n", " loss=MAE(),\n", " valid_loss=MAE(),\n", " learning_rate=0.001,\n", " max_steps=1,\n", " val_check_steps=0,\n", " batch_size=1,\n", " valid_batch_size=1,\n", " windows_batch_size=1,\n", " inference_windows_batch_size=1,\n", " start_padding_enabled=False)\n", "\n", " windows = basewindows._create_windows(batch, step='train')\n", " original_outsample_y = torch.clone(windows['temporal'][:,-basewindows.h:,0])\n", " windows = basewindows._normalization(windows=windows, y_idx=0)\n", "\n", " insample_y, insample_mask, outsample_y, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)\n", "\n", " # Check equality of parsed and original insample_y\n", " parsed_insample_y = insample_y.numpy().flatten()\n", " original_insample_y = AirPassengersDF.y.values\n", " test_eq(parsed_insample_y, original_insample_y[:basewindows.input_size])\n", "\n", " # Check equality of parsed and original hist_exog\n", " parsed_hist_exog = hist_exog.numpy().flatten()\n", " original_hist_exog = AirPassengersDF.x.values\n", " test_eq(parsed_hist_exog, original_hist_exog[:basewindows.input_size])" ] }, { "cell_type": "code", "execution_count": null, "id": "86ab58a9", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "# Test that start_padding_enabled=True solves the problem of short series\n", "h = 12\n", "basewindows = BaseWindows(h=h,\n", " input_size=500,\n", " hist_exog_list=['x'],\n", " loss=MAE(),\n", " valid_loss=MAE(),\n", " learning_rate=0.001,\n", " max_steps=1,\n", " val_check_steps=0,\n", " batch_size=1,\n", " valid_batch_size=1,\n", " windows_batch_size=10,\n", " inference_windows_batch_size=2,\n", " start_padding_enabled=True)\n", "\n", "windows = basewindows._create_windows(batch, step='train')\n", "windows = basewindows._normalization(windows=windows, y_idx=0)\n", "insample_y, insample_mask, outsample_y, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)\n", "\n", "basewindows.val_size = 12\n", "windows = basewindows._create_windows(batch, step='val')\n", "windows = basewindows._normalization(windows=windows, y_idx=0)\n", "insample_y, insample_mask, outsample_y, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)\n", "\n", "basewindows.test_size = 12\n", "basewindows.predict_step_size = 1\n", "windows = basewindows._create_windows(batch, step='predict')\n", "windows = basewindows._normalization(windows=windows, y_idx=0)\n", "insample_y, insample_mask, outsample_y, outsample_mask, \\\n", " hist_exog, futr_exog, stat_exog = basewindows._parse_windows(batch, windows)" ] }, { "cell_type": "code", "execution_count": null, "id": "54d2e850", "metadata": {}, "outputs": [], "source": [ "#| hide\n", "\n", "# Test that hist_exog_list and futr_exog_list correctly filter data.\n", "# that is sent to scaler.\n", "basewindows = BaseWindows(h=12,\n", " input_size=500,\n", " hist_exog_list=['x', 'x2'],\n", " futr_exog_list=['x'],\n", " loss=MAE(),\n", " valid_loss=MAE(),\n", " learning_rate=0.001,\n", " max_steps=1,\n", " val_check_steps=0,\n", " batch_size=1,\n", " valid_batch_size=1,\n", " windows_batch_size=10,\n", " inference_windows_batch_size=2,\n", " start_padding_enabled=True)\n", "\n", "windows = basewindows._create_windows(batch, step='train')\n", "\n", "temporal_cols = windows['temporal_cols'].copy() # B, L+H, C\n", "temporal_data_cols = basewindows._get_temporal_exogenous_cols(temporal_cols=temporal_cols)\n", "\n", "test_eq(set(temporal_data_cols), set(['x', 'x2']))\n", "test_eq(windows['temporal'].shape, torch.Size([10,500+12,len(['y', 'x', 'x2', 'available_mask'])]))" ] }, { "cell_type": "code", "execution_count": null, "id": "bf493ff9", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "python3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }