{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| default_exp common._base_recurrent"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# BaseRecurrent"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> The `BaseRecurrent` class contains standard methods shared across recurrent neural networks; these models possess the ability to process variable-length sequences of inputs through their internal memory states. The class is represented by `LSTM`, `GRU`, and `RNN`, along with other more sophisticated architectures like `MQCNN`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The standard methods include `TemporalNorm` preprocessing, optimization utilities like parameter initialization, `training_step`, `validation_step`, and shared `fit` and `predict` methods.These shared methods enable all the `neuralforecast.models` compatibility with the `core.NeuralForecast` wrapper class."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"from fastcore.test import test_eq\n",
"from nbdev.showdoc import show_doc"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"import numpy as np\n",
"import torch\n",
"import torch.nn as nn\n",
"import pytorch_lightning as pl\n",
"\n",
"from neuralforecast.common._base_model import BaseModel\n",
"from neuralforecast.common._scalers import TemporalNorm\n",
"from neuralforecast.tsdataset import TimeSeriesDataModule\n",
"from neuralforecast.utils import get_indexer_raise_missing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"class BaseRecurrent(BaseModel):\n",
" \"\"\" Base Recurrent\n",
" \n",
" Base class for all recurrent-based models. The forecasts are produced sequentially between \n",
" windows.\n",
" \n",
" This class implements the basic functionality for all windows-based models, including:\n",
" - PyTorch Lightning's methods training_step, validation_step, predict_step.
\n",
" - fit and predict methods used by NeuralForecast.core class.
\n",
" - sampling and wrangling methods to sequential windows.
\n",
" \"\"\"\n",
" def __init__(self,\n",
" h,\n",
" input_size,\n",
" inference_input_size,\n",
" loss,\n",
" valid_loss,\n",
" learning_rate,\n",
" max_steps,\n",
" val_check_steps,\n",
" batch_size,\n",
" valid_batch_size,\n",
" scaler_type='robust',\n",
" num_lr_decays=0,\n",
" early_stop_patience_steps=-1,\n",
" futr_exog_list=None,\n",
" hist_exog_list=None,\n",
" stat_exog_list=None,\n",
" num_workers_loader=0,\n",
" drop_last_loader=False,\n",
" random_seed=1, \n",
" alias=None,\n",
" optimizer=None,\n",
" optimizer_kwargs=None,\n",
" **trainer_kwargs):\n",
" super().__init__(\n",
" random_seed=random_seed,\n",
" loss=loss,\n",
" valid_loss=valid_loss,\n",
" optimizer=optimizer,\n",
" optimizer_kwargs=optimizer_kwargs,\n",
" futr_exog_list=futr_exog_list,\n",
" hist_exog_list=hist_exog_list,\n",
" stat_exog_list=stat_exog_list,\n",
" max_steps=max_steps,\n",
" early_stop_patience_steps=early_stop_patience_steps, \n",
" **trainer_kwargs,\n",
" )\n",
"\n",
" # Padder to complete train windows, \n",
" # example y=[1,2,3,4,5] h=3 -> last y_output = [5,0,0]\n",
" self.h = h\n",
" self.input_size = input_size\n",
" self.inference_input_size = inference_input_size\n",
" self.padder = nn.ConstantPad1d(padding=(0, self.h), value=0)\n",
"\n",
"\n",
" if str(type(self.loss)) == \"\" and\\\n",
" self.loss.distribution=='Bernoulli':\n",
" raise Exception('Temporal Classification not yet available for Recurrent-based models')\n",
"\n",
" # Valid batch_size\n",
" self.batch_size = batch_size\n",
" if valid_batch_size is None:\n",
" self.valid_batch_size = batch_size\n",
" else:\n",
" self.valid_batch_size = valid_batch_size\n",
"\n",
" # Optimization\n",
" self.learning_rate = learning_rate\n",
" self.max_steps = max_steps\n",
" self.num_lr_decays = num_lr_decays\n",
" self.lr_decay_steps = max(max_steps // self.num_lr_decays, 1) if self.num_lr_decays > 0 else 10e7\n",
" self.early_stop_patience_steps = early_stop_patience_steps\n",
" self.val_check_steps = val_check_steps\n",
"\n",
" # Scaler\n",
" self.scaler = TemporalNorm(\n",
" scaler_type=scaler_type,\n",
" dim=-1, # Time dimension is -1.\n",
" num_features=1+len(self.hist_exog_list)+len(self.futr_exog_list)\n",
" )\n",
"\n",
" # Fit arguments\n",
" self.val_size = 0\n",
" self.test_size = 0\n",
"\n",
" # DataModule arguments\n",
" self.num_workers_loader = num_workers_loader\n",
" self.drop_last_loader = drop_last_loader\n",
" # used by on_validation_epoch_end hook\n",
" self.validation_step_outputs = []\n",
" self.alias = alias\n",
"\n",
" def _normalization(self, batch, val_size=0, test_size=0):\n",
" temporal = batch['temporal'] # B, C, T\n",
" temporal_cols = batch['temporal_cols'].copy()\n",
" y_idx = batch['y_idx']\n",
"\n",
" # Separate data and mask\n",
" temporal_data_cols = self._get_temporal_exogenous_cols(temporal_cols=temporal_cols)\n",
" temporal_idxs = get_indexer_raise_missing(temporal_cols, temporal_data_cols)\n",
" temporal_idxs = np.append(y_idx, temporal_idxs)\n",
" temporal_data = temporal[:, temporal_idxs, :]\n",
" temporal_mask = temporal[:, temporal_cols.get_loc('available_mask'), :].clone()\n",
"\n",
" # Remove validation and test set to prevent leakeage\n",
" if val_size + test_size > 0:\n",
" cutoff = val_size + test_size\n",
" temporal_mask[:, -cutoff:] = 0\n",
"\n",
" # Normalize. self.scaler stores the shift and scale for inverse transform\n",
" temporal_mask = temporal_mask.unsqueeze(1) # Add channel dimension for scaler.transform.\n",
" temporal_data = self.scaler.transform(x=temporal_data, mask=temporal_mask)\n",
"\n",
" # Replace values in windows dict\n",
" temporal[:, temporal_idxs, :] = temporal_data\n",
" batch['temporal'] = temporal\n",
"\n",
" return batch\n",
"\n",
" def _inv_normalization(self, y_hat, temporal_cols, y_idx):\n",
" # Receives window predictions [B, seq_len, H, output]\n",
" # Broadcasts outputs and inverts normalization\n",
"\n",
" # Get 'y' scale and shift, and add W dimension\n",
" y_loc = self.scaler.x_shift[:, [y_idx], 0].flatten() #[B,C,T] -> [B] \n",
" y_scale = self.scaler.x_scale[:, [y_idx], 0].flatten() #[B,C,T] -> [B]\n",
"\n",
" # Expand scale and shift to y_hat dimensions\n",
" y_loc = y_loc.view(*y_loc.shape, *(1,)*(y_hat.ndim-1))#.expand(y_hat) \n",
" y_scale = y_scale.view(*y_scale.shape, *(1,)*(y_hat.ndim-1))#.expand(y_hat)\n",
"\n",
" y_hat = self.scaler.inverse_transform(z=y_hat, x_scale=y_scale, x_shift=y_loc)\n",
"\n",
" return y_hat, y_loc, y_scale\n",
"\n",
" def _create_windows(self, batch, step):\n",
" temporal = batch['temporal']\n",
" temporal_cols = batch['temporal_cols']\n",
"\n",
" if step == 'train':\n",
" if self.val_size + self.test_size > 0:\n",
" cutoff = -self.val_size - self.test_size\n",
" temporal = temporal[:, :, :cutoff]\n",
" temporal = self.padder(temporal)\n",
"\n",
" # Truncate batch to shorter time-series \n",
" av_condition = torch.nonzero(torch.min(temporal[:, temporal_cols.get_loc('available_mask')], axis=0).values)\n",
" min_time_stamp = int(av_condition.min())\n",
" \n",
" available_ts = temporal.shape[-1] - min_time_stamp\n",
" if available_ts < 1 + self.h:\n",
" raise Exception(\n",
" 'Time series too short for given input and output size. \\n'\n",
" f'Available timestamps: {available_ts}'\n",
" )\n",
"\n",
" temporal = temporal[:, :, min_time_stamp:]\n",
"\n",
" if step == 'val':\n",
" if self.test_size > 0:\n",
" temporal = temporal[:, :, :-self.test_size]\n",
" temporal = self.padder(temporal)\n",
"\n",
" if step == 'predict':\n",
" if (self.test_size == 0) and (len(self.futr_exog_list)==0):\n",
" temporal = self.padder(temporal)\n",
"\n",
" # Test size covers all data, pad left one timestep with zeros\n",
" if temporal.shape[-1] == self.test_size:\n",
" padder_left = nn.ConstantPad1d(padding=(1, 0), value=0)\n",
" temporal = padder_left(temporal)\n",
"\n",
" # Parse batch\n",
" window_size = 1 + self.h # 1 for current t and h for future\n",
" windows = temporal.unfold(dimension=-1,\n",
" size=window_size,\n",
" step=1)\n",
"\n",
" # Truncated backprogatation/inference (shorten sequence where RNNs unroll)\n",
" n_windows = windows.shape[2]\n",
" input_size = -1\n",
" if (step == 'train') and (self.input_size>0):\n",
" input_size = self.input_size\n",
" if (input_size > 0) and (n_windows > input_size):\n",
" max_sampleable_time = n_windows-self.input_size+1\n",
" start = np.random.choice(max_sampleable_time)\n",
" windows = windows[:, :, start:(start+input_size), :]\n",
"\n",
" if (step == 'val') and (self.inference_input_size>0):\n",
" cutoff = self.inference_input_size + self.val_size\n",
" windows = windows[:, :, -cutoff:, :]\n",
"\n",
" if (step == 'predict') and (self.inference_input_size>0):\n",
" cutoff = self.inference_input_size + self.test_size\n",
" windows = windows[:, :, -cutoff:, :]\n",
" \n",
" # [B, C, input_size, 1+H]\n",
" windows_batch = dict(temporal=windows,\n",
" temporal_cols=temporal_cols,\n",
" static=batch.get('static', None),\n",
" static_cols=batch.get('static_cols', None))\n",
"\n",
" return windows_batch\n",
"\n",
" def _parse_windows(self, batch, windows):\n",
" # [B, C, seq_len, 1+H]\n",
" # Filter insample lags from outsample horizon\n",
" mask_idx = batch['temporal_cols'].get_loc('available_mask')\n",
" y_idx = batch['y_idx'] \n",
" insample_y = windows['temporal'][:, y_idx, :, :-self.h]\n",
" insample_mask = windows['temporal'][:, mask_idx, :, :-self.h]\n",
" outsample_y = windows['temporal'][:, y_idx, :, -self.h:].contiguous()\n",
" outsample_mask = windows['temporal'][:, mask_idx, :, -self.h:].contiguous()\n",
"\n",
" # Filter historic exogenous variables\n",
" if len(self.hist_exog_list):\n",
" hist_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.hist_exog_list)\n",
" hist_exog = windows['temporal'][:, hist_exog_idx, :, :-self.h]\n",
" else:\n",
" hist_exog = None\n",
" \n",
" # Filter future exogenous variables\n",
" if len(self.futr_exog_list):\n",
" futr_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.futr_exog_list)\n",
" futr_exog = windows['temporal'][:, futr_exog_idx, :, :]\n",
" else:\n",
" futr_exog = None\n",
" # Filter static variables\n",
" if len(self.stat_exog_list):\n",
" static_idx = get_indexer_raise_missing(windows['static_cols'], self.stat_exog_list)\n",
" stat_exog = windows['static'][:, static_idx]\n",
" else:\n",
" stat_exog = None\n",
"\n",
" return insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" # Create and normalize windows [Ws, L+H, C]\n",
" batch = self._normalization(batch, val_size=self.val_size, test_size=self.test_size)\n",
" windows = self._create_windows(batch, step='train')\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
"\n",
" windows_batch = dict(insample_y=insample_y, # [B, seq_len, 1]\n",
" insample_mask=insample_mask, # [B, seq_len, 1]\n",
" futr_exog=futr_exog, # [B, F, seq_len, 1+H]\n",
" hist_exog=hist_exog, # [B, C, seq_len]\n",
" stat_exog=stat_exog) # [B, S]\n",
"\n",
" # Model predictions\n",
" output = self(windows_batch) # tuple([B, seq_len, H, output])\n",
" if self.loss.is_distribution_output:\n",
" outsample_y, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=batch['y_idx'])\n",
" B = output[0].size()[0]\n",
" T = output[0].size()[1]\n",
" H = output[0].size()[2]\n",
" output = [arg.view(-1, *(arg.size()[2:])) for arg in output]\n",
" outsample_y = outsample_y.view(B*T,H)\n",
" outsample_mask = outsample_mask.view(B*T,H)\n",
" y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)\n",
" y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)\n",
" distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n",
" loss = self.loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)\n",
" else:\n",
" loss = self.loss(y=outsample_y, y_hat=output, mask=outsample_mask)\n",
"\n",
" if torch.isnan(loss):\n",
" print('Model Parameters', self.hparams)\n",
" print('insample_y', torch.isnan(insample_y).sum())\n",
" print('outsample_y', torch.isnan(outsample_y).sum())\n",
" print('output', torch.isnan(output).sum())\n",
" raise Exception('Loss is NaN, training stopped.')\n",
"\n",
" self.log(\n",
" 'train_loss',\n",
" loss.item(),\n",
" batch_size=outsample_y.size(0),\n",
" prog_bar=True,\n",
" on_epoch=True,\n",
" )\n",
" self.train_trajectories.append((self.global_step, loss.item()))\n",
" return loss\n",
"\n",
" def validation_step(self, batch, batch_idx):\n",
" if self.val_size == 0:\n",
" return np.nan\n",
"\n",
" # Create and normalize windows [Ws, L+H, C]\n",
" batch = self._normalization(batch, val_size=self.val_size, test_size=self.test_size)\n",
" windows = self._create_windows(batch, step='val')\n",
" y_idx = batch['y_idx']\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, outsample_y, outsample_mask, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
"\n",
" windows_batch = dict(insample_y=insample_y, # [B, seq_len, 1]\n",
" insample_mask=insample_mask, # [B, seq_len, 1]\n",
" futr_exog=futr_exog, # [B, F, seq_len, 1+H]\n",
" hist_exog=hist_exog, # [B, C, seq_len]\n",
" stat_exog=stat_exog) # [B, S]\n",
"\n",
" # Remove train y_hat (+1 and -1 for padded last window with zeros)\n",
" # tuple([B, seq_len, H, output]) -> tuple([B, validation_size, H, output])\n",
" val_windows = (self.val_size) + 1\n",
" outsample_y = outsample_y[:, -val_windows:-1, :]\n",
" outsample_mask = outsample_mask[:, -val_windows:-1, :] \n",
"\n",
" # Model predictions\n",
" output = self(windows_batch) # tuple([B, seq_len, H, output])\n",
" if self.loss.is_distribution_output:\n",
" output = [arg[:, -val_windows:-1] for arg in output]\n",
" outsample_y, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=y_idx)\n",
" B = output[0].size()[0]\n",
" T = output[0].size()[1]\n",
" H = output[0].size()[2]\n",
" output = [arg.reshape(-1, *(arg.size()[2:])) for arg in output]\n",
" outsample_y = outsample_y.reshape(B*T,H)\n",
" outsample_mask = outsample_mask.reshape(B*T,H)\n",
" y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)\n",
" y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)\n",
" distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n",
" _, sample_mean, quants = self.loss.sample(distr_args=distr_args)\n",
"\n",
" if str(type(self.valid_loss)) in\\\n",
" [\"\", \"\"]:\n",
" output = quants\n",
" elif str(type(self.valid_loss)) in [\"\"]:\n",
" output = torch.unsqueeze(sample_mean, dim=-1) # [N,H,1] -> [N,H]\n",
" \n",
" else:\n",
" output = output[:, -val_windows:-1, :]\n",
"\n",
" # Validation Loss evaluation\n",
" if self.valid_loss.is_distribution_output:\n",
" valid_loss = self.valid_loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)\n",
" else:\n",
" outsample_y, _, _ = self._inv_normalization(y_hat=outsample_y, temporal_cols=batch['temporal_cols'], y_idx=y_idx)\n",
" output, _, _ = self._inv_normalization(y_hat=output, temporal_cols=batch['temporal_cols'], y_idx=y_idx)\n",
" valid_loss = self.valid_loss(y=outsample_y, y_hat=output, mask=outsample_mask)\n",
"\n",
" if torch.isnan(valid_loss):\n",
" raise Exception('Loss is NaN, training stopped.')\n",
"\n",
" self.log(\n",
" 'valid_loss',\n",
" valid_loss.item(),\n",
" batch_size=outsample_y.size(0),\n",
" prog_bar=True,\n",
" on_epoch=True,\n",
" )\n",
" self.validation_step_outputs.append(valid_loss)\n",
" return valid_loss\n",
"\n",
" def predict_step(self, batch, batch_idx):\n",
" # Create and normalize windows [Ws, L+H, C]\n",
" batch = self._normalization(batch, val_size=0, test_size=self.test_size)\n",
" windows = self._create_windows(batch, step='predict')\n",
" y_idx = batch['y_idx']\n",
"\n",
" # Parse windows\n",
" insample_y, insample_mask, _, _, \\\n",
" hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)\n",
"\n",
" windows_batch = dict(insample_y=insample_y, # [B, seq_len, 1]\n",
" insample_mask=insample_mask, # [B, seq_len, 1]\n",
" futr_exog=futr_exog, # [B, F, seq_len, 1+H]\n",
" hist_exog=hist_exog, # [B, C, seq_len]\n",
" stat_exog=stat_exog) # [B, S]\n",
"\n",
" # Model Predictions\n",
" output = self(windows_batch) # tuple([B, seq_len, H], ...)\n",
" if self.loss.is_distribution_output:\n",
" _, y_loc, y_scale = self._inv_normalization(y_hat=output[0],\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=y_idx)\n",
" B = output[0].size()[0]\n",
" T = output[0].size()[1]\n",
" H = output[0].size()[2]\n",
" output = [arg.reshape(-1, *(arg.size()[2:])) for arg in output]\n",
" y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)\n",
" y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)\n",
" distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)\n",
" _, sample_mean, quants = self.loss.sample(distr_args=distr_args)\n",
" y_hat = torch.concat((sample_mean, quants), axis=2)\n",
" y_hat = y_hat.view(B, T, H, -1)\n",
"\n",
" if self.loss.return_params:\n",
" distr_args = torch.stack(distr_args, dim=-1)\n",
" distr_args = torch.reshape(distr_args, (B, T, H, -1))\n",
" y_hat = torch.concat((y_hat, distr_args), axis=3)\n",
" else:\n",
" y_hat, _, _ = self._inv_normalization(y_hat=output,\n",
" temporal_cols=batch['temporal_cols'],\n",
" y_idx=y_idx)\n",
" return y_hat\n",
"\n",
" def fit(self, dataset, val_size=0, test_size=0, random_seed=None, distributed_config=None):\n",
" \"\"\" Fit.\n",
"\n",
" The `fit` method, optimizes the neural network's weights using the\n",
" initialization parameters (`learning_rate`, `batch_size`, ...)\n",
" and the `loss` function as defined during the initialization. \n",
" Within `fit` we use a PyTorch Lightning `Trainer` that\n",
" inherits the initialization's `self.trainer_kwargs`, to customize\n",
" its inputs, see [PL's trainer arguments](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).\n",
"\n",
" The method is designed to be compatible with SKLearn-like classes\n",
" and in particular to be compatible with the StatsForecast library.\n",
"\n",
" By default the `model` is not saving training checkpoints to protect \n",
" disk memory, to get them change `enable_checkpointing=True` in `__init__`. \n",
"\n",
" **Parameters:**
\n",
" `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n",
" `val_size`: int, validation size for temporal cross-validation.
\n",
" `test_size`: int, test size for temporal cross-validation.
\n",
" `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.
\n",
" \"\"\"\n",
" return self._fit(\n",
" dataset=dataset,\n",
" batch_size=self.batch_size,\n",
" valid_batch_size=self.valid_batch_size,\n",
" val_size=val_size,\n",
" test_size=test_size,\n",
" random_seed=random_seed,\n",
" distributed_config=distributed_config,\n",
" )\n",
"\n",
" def predict(self, dataset, step_size=1,\n",
" random_seed=None, **data_module_kwargs):\n",
" \"\"\" Predict.\n",
"\n",
" Neural network prediction with PL's `Trainer` execution of `predict_step`.\n",
"\n",
" **Parameters:**
\n",
" `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).
\n",
" `step_size`: int=1, Step size between each window.
\n",
" `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.
\n",
" `**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).\n",
" \"\"\"\n",
" self._check_exog(dataset)\n",
" self._restart_seed(random_seed)\n",
"\n",
" if step_size > 1:\n",
" raise Exception('Recurrent models do not support step_size > 1')\n",
"\n",
" # fcsts (window, batch, h)\n",
" # Protect when case of multiple gpu. PL does not support return preds with multiple gpu.\n",
" pred_trainer_kwargs = self.trainer_kwargs.copy()\n",
" if (pred_trainer_kwargs.get('accelerator', None) == \"gpu\") and (torch.cuda.device_count() > 1):\n",
" pred_trainer_kwargs['devices'] = [0]\n",
"\n",
" trainer = pl.Trainer(**pred_trainer_kwargs)\n",
"\n",
" datamodule = TimeSeriesDataModule(\n",
" dataset=dataset,\n",
" valid_batch_size=self.valid_batch_size,\n",
" num_workers=self.num_workers_loader,\n",
" **data_module_kwargs\n",
" )\n",
" fcsts = trainer.predict(self, datamodule=datamodule)\n",
" if self.test_size > 0:\n",
" # Remove warmup windows (from train and validation)\n",
" # [N,T,H,output], avoid indexing last dim for univariate output compatibility\n",
" fcsts = torch.vstack([fcst[:, -(1+self.test_size-self.h):,:] for fcst in fcsts])\n",
" fcsts = fcsts.numpy().flatten()\n",
" fcsts = fcsts.reshape(-1, len(self.loss.output_names))\n",
" else:\n",
" fcsts = torch.vstack([fcst[:,-1:,:] for fcst in fcsts]).numpy().flatten()\n",
" fcsts = fcsts.reshape(-1, len(self.loss.output_names))\n",
" return fcsts"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseRecurrent, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseRecurrent.fit, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"show_doc(BaseRecurrent.predict, title_level=3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"from neuralforecast.losses.pytorch import MAE\n",
"from neuralforecast.utils import AirPassengersDF\n",
"from neuralforecast.tsdataset import TimeSeriesDataset, TimeSeriesDataModule"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# add h=0,1 unit test for _parse_windows \n",
"# Declare batch\n",
"AirPassengersDF['x'] = np.array(len(AirPassengersDF))\n",
"AirPassengersDF['x2'] = np.array(len(AirPassengersDF)) * 2\n",
"dataset, indices, dates, ds = TimeSeriesDataset.from_df(df=AirPassengersDF)\n",
"data = TimeSeriesDataModule(dataset=dataset, batch_size=1, drop_last=True)\n",
"\n",
"train_loader = data.train_dataloader()\n",
"batch = next(iter(train_loader))\n",
"\n",
"# Test that hist_exog_list and futr_exog_list correctly filter data that is sent to scaler.\n",
"baserecurrent = BaseRecurrent(h=12,\n",
" input_size=117,\n",
" hist_exog_list=['x', 'x2'],\n",
" futr_exog_list=['x'],\n",
" loss=MAE(),\n",
" valid_loss=MAE(),\n",
" learning_rate=0.001,\n",
" max_steps=1,\n",
" val_check_steps=0,\n",
" batch_size=1,\n",
" valid_batch_size=1,\n",
" windows_batch_size=10,\n",
" inference_input_size=2,\n",
" start_padding_enabled=True)\n",
"\n",
"windows = baserecurrent._create_windows(batch, step='train')\n",
"\n",
"temporal_cols = windows['temporal_cols'].copy() # B, L+H, C\n",
"temporal_data_cols = baserecurrent._get_temporal_exogenous_cols(temporal_cols=temporal_cols)\n",
"\n",
"test_eq(set(temporal_data_cols), set(['x', 'x2']))\n",
"test_eq(windows['temporal'].shape, torch.Size([1,len(['y', 'x', 'x2', 'available_mask']),117,12+1]))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "python3",
"language": "python",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}