# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/tsdataset.ipynb. # %% auto 0 __all__ = ['TimeSeriesLoader', 'TimeSeriesDataset', 'TimeSeriesDataModule'] # %% ../nbs/tsdataset.ipynb 4 import warnings from collections.abc import Mapping from typing import List, Optional, Union import numpy as np import pandas as pd import pytorch_lightning as pl import torch import utilsforecast.processing as ufp from torch.utils.data import Dataset, DataLoader from utilsforecast.compat import DataFrame, pl_Series # %% ../nbs/tsdataset.ipynb 5 class TimeSeriesLoader(DataLoader): """TimeSeriesLoader DataLoader. [Source code](https://github.com/Nixtla/neuralforecast1/blob/main/neuralforecast/tsdataset.py). Small change to PyTorch's Data loader. Combines a dataset and a sampler, and provides an iterable over the given dataset. The class `~torch.utils.data.DataLoader` supports both map-style and iterable-style datasets with single- or multi-process loading, customizing loading order and optional automatic batching (collation) and memory pinning. **Parameters:**
`batch_size`: (int, optional): how many samples per batch to load (default: 1).
`shuffle`: (bool, optional): set to `True` to have the data reshuffled at every epoch (default: `False`).
`sampler`: (Sampler or Iterable, optional): defines the strategy to draw samples from the dataset.
Can be any `Iterable` with `__len__` implemented. If specified, `shuffle` must not be specified.
""" def __init__(self, dataset, **kwargs): if "collate_fn" in kwargs: kwargs.pop("collate_fn") kwargs_ = {**kwargs, **dict(collate_fn=self._collate_fn)} DataLoader.__init__(self, dataset=dataset, **kwargs_) def _collate_fn(self, batch): elem = batch[0] elem_type = type(elem) if isinstance(elem, torch.Tensor): out = None if torch.utils.data.get_worker_info() is not None: # If we're in a background process, concatenate directly into a # shared memory tensor to avoid an extra copy numel = sum(x.numel() for x in batch) storage = elem.storage()._new_shared(numel, device=elem.device) out = elem.new(storage).resize_(len(batch), *list(elem.size())) return torch.stack(batch, 0, out=out) elif isinstance(elem, Mapping): if elem["static"] is None: return dict( temporal=self.collate_fn([d["temporal"] for d in batch]), temporal_cols=elem["temporal_cols"], y_idx=elem["y_idx"], ) return dict( static=self.collate_fn([d["static"] for d in batch]), static_cols=elem["static_cols"], temporal=self.collate_fn([d["temporal"] for d in batch]), temporal_cols=elem["temporal_cols"], y_idx=elem["y_idx"], ) raise TypeError(f"Unknown {elem_type}") # %% ../nbs/tsdataset.ipynb 7 class TimeSeriesDataset(Dataset): def __init__( self, temporal, temporal_cols, indptr, max_size: int, min_size: int, y_idx: int, static=None, static_cols=None, sorted=False, ): super().__init__() self.temporal = self._as_torch_copy(temporal) self.temporal_cols = pd.Index(list(temporal_cols)) if static is not None: self.static = self._as_torch_copy(static) self.static_cols = static_cols else: self.static = static self.static_cols = static_cols self.indptr = indptr self.n_groups = self.indptr.size - 1 self.max_size = max_size self.min_size = min_size self.y_idx = y_idx # Upadated flag. To protect consistency, dataset can only be updated once self.updated = False self.sorted = sorted def __getitem__(self, idx): if isinstance(idx, int): # Parse temporal data and pad its left temporal = torch.zeros( size=(len(self.temporal_cols), self.max_size), dtype=torch.float32 ) ts = self.temporal[self.indptr[idx] : self.indptr[idx + 1], :] temporal[: len(self.temporal_cols), -len(ts) :] = ts.permute(1, 0) # Add static data if available static = None if self.static is None else self.static[idx, :] item = dict( temporal=temporal, temporal_cols=self.temporal_cols, static=static, static_cols=self.static_cols, y_idx=self.y_idx, ) return item raise ValueError(f"idx must be int, got {type(idx)}") def __len__(self): return self.n_groups def __repr__(self): return f"TimeSeriesDataset(n_data={self.temporal.shape[0]:,}, n_groups={self.n_groups:,})" def __eq__(self, other): if not hasattr(other, "data") or not hasattr(other, "indptr"): return False return np.allclose(self.data, other.data) and np.array_equal( self.indptr, other.indptr ) def _as_torch_copy( self, x: Union[np.ndarray, torch.Tensor], dtype: torch.dtype = torch.float32, ) -> torch.Tensor: if isinstance(x, np.ndarray): x = torch.from_numpy(x) return x.to(dtype, copy=False).clone() def align( self, df: DataFrame, id_col: str, time_col: str, target_col: str ) -> "TimeSeriesDataset": # Protect consistency df = ufp.copy_if_pandas(df, deep=False) # Add Nones to missing columns (without available_mask) temporal_cols = self.temporal_cols.copy() for col in temporal_cols: if col not in df.columns: df = ufp.assign_columns(df, col, np.nan) if col == "available_mask": df = ufp.assign_columns(df, col, 1.0) # Sort columns to match self.temporal_cols (without available_mask) df = df[[id_col, time_col] + temporal_cols.tolist()] # Process future_df dataset, *_ = TimeSeriesDataset.from_df( df=df, sort_df=self.sorted, id_col=id_col, time_col=time_col, target_col=target_col, ) return dataset def append(self, futr_dataset: "TimeSeriesDataset") -> "TimeSeriesDataset": """Add future observations to the dataset. Returns a copy""" if self.indptr.size != futr_dataset.indptr.size: raise ValueError( "Cannot append `futr_dataset` with different number of groups." ) # Define and fill new temporal with updated information len_temporal, col_temporal = self.temporal.shape len_futr = futr_dataset.temporal.shape[0] new_temporal = torch.empty(size=(len_temporal + len_futr, col_temporal)) new_sizes = np.diff(self.indptr) + np.diff(futr_dataset.indptr) new_indptr = np.append(0, new_sizes.cumsum()).astype(np.int32) new_max_size = np.max(new_sizes) for i in range(self.n_groups): curr_slice = slice(self.indptr[i], self.indptr[i + 1]) curr_size = curr_slice.stop - curr_slice.start futr_slice = slice(futr_dataset.indptr[i], futr_dataset.indptr[i + 1]) new_temporal[new_indptr[i] : new_indptr[i] + curr_size] = self.temporal[ curr_slice ] new_temporal[new_indptr[i] + curr_size : new_indptr[i + 1]] = ( futr_dataset.temporal[futr_slice] ) # Define new dataset updated_dataset = TimeSeriesDataset( temporal=new_temporal, temporal_cols=self.temporal_cols.copy(), indptr=new_indptr, max_size=new_max_size, min_size=self.min_size, static=self.static, y_idx=self.y_idx, static_cols=self.static_cols, sorted=self.sorted, ) return updated_dataset @staticmethod def update_dataset( dataset, futr_df, id_col="unique_id", time_col="ds", target_col="y" ): futr_dataset = dataset.align( futr_df, id_col=id_col, time_col=time_col, target_col=target_col ) return dataset.append(futr_dataset) @staticmethod def trim_dataset(dataset, left_trim: int = 0, right_trim: int = 0): """ Trim temporal information from a dataset. Returns temporal indexes [t+left:t-right] for all series. """ if dataset.min_size <= left_trim + right_trim: raise Exception( f"left_trim + right_trim ({left_trim} + {right_trim}) \ must be lower than the shorter time series ({dataset.min_size})" ) # Define and fill new temporal with trimmed information len_temporal, col_temporal = dataset.temporal.shape total_trim = (left_trim + right_trim) * dataset.n_groups new_temporal = torch.zeros(size=(len_temporal - total_trim, col_temporal)) new_indptr = [0] acum = 0 for i in range(dataset.n_groups): series_length = dataset.indptr[i + 1] - dataset.indptr[i] new_length = series_length - left_trim - right_trim new_temporal[acum : (acum + new_length), :] = dataset.temporal[ dataset.indptr[i] + left_trim : dataset.indptr[i + 1] - right_trim, : ] acum += new_length new_indptr.append(acum) new_max_size = dataset.max_size - left_trim - right_trim new_min_size = dataset.min_size - left_trim - right_trim # Define new dataset updated_dataset = TimeSeriesDataset( temporal=new_temporal, temporal_cols=dataset.temporal_cols.copy(), indptr=np.array(new_indptr, dtype=np.int32), max_size=new_max_size, min_size=new_min_size, y_idx=dataset.y_idx, static=dataset.static, static_cols=dataset.static_cols, sorted=dataset.sorted, ) return updated_dataset @staticmethod def from_df( df, static_df=None, sort_df=False, id_col="unique_id", time_col="ds", target_col="y", ): # TODO: protect on equality of static_df + df indexes if isinstance(df, pd.DataFrame) and df.index.name == id_col: warnings.warn( "Passing the id as index is deprecated, please provide it as a column instead.", FutureWarning, ) df = df.reset_index(id_col) # Define indexes if not given if static_df is not None: if isinstance(static_df, pd.DataFrame) and static_df.index.name == id_col: warnings.warn( "Passing the id as index is deprecated, please provide it as a column instead.", FutureWarning, ) if sort_df: static_df = ufp.sort(static_df, by=id_col) ids, times, data, indptr, sort_idxs = ufp.process_df( df, id_col, time_col, target_col ) # processor sets y as the first column temporal_cols = pd.Index( [target_col] + [c for c in df.columns if c not in (id_col, time_col, target_col)] ) temporal = data.astype(np.float32, copy=False) indices = ids if isinstance(df, pd.DataFrame): dates = pd.Index(times, name=time_col) else: dates = pl_Series(time_col, times) sizes = np.diff(indptr) max_size = max(sizes) min_size = min(sizes) # Add Available mask efficiently (without adding column to df) if "available_mask" not in df.columns: available_mask = np.ones((len(temporal), 1), dtype=np.float32) temporal = np.append(temporal, available_mask, axis=1) temporal_cols = temporal_cols.append(pd.Index(["available_mask"])) # Static features if static_df is not None: static_cols = [col for col in static_df.columns if col != id_col] static = ufp.to_numpy(static_df[static_cols]) static_cols = pd.Index(static_cols) else: static = None static_cols = None dataset = TimeSeriesDataset( temporal=temporal, temporal_cols=temporal_cols, static=static, static_cols=static_cols, indptr=indptr, max_size=max_size, min_size=min_size, sorted=sort_df, y_idx=0, ) ds = df[time_col].to_numpy() if sort_idxs is not None: ds = ds[sort_idxs] return dataset, indices, dates, ds # %% ../nbs/tsdataset.ipynb 10 class _FilesDataset: def __init__( self, files: List[str], temporal_cols: List[str], static_cols: Optional[List[str]], id_col: str, time_col: str, target_col: str, min_size: int, ): self.files = files self.temporal_cols = pd.Index(temporal_cols) self.static_cols = pd.Index(static_cols) if static_cols is not None else None self.id_col = id_col self.time_col = time_col self.target_col = target_col self.min_size = min_size # %% ../nbs/tsdataset.ipynb 11 class TimeSeriesDataModule(pl.LightningDataModule): def __init__( self, dataset: TimeSeriesDataset, batch_size=32, valid_batch_size=1024, num_workers=0, drop_last=False, shuffle_train=True, ): super().__init__() self.dataset = dataset self.batch_size = batch_size self.valid_batch_size = valid_batch_size self.num_workers = num_workers self.drop_last = drop_last self.shuffle_train = shuffle_train def train_dataloader(self): loader = TimeSeriesLoader( self.dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=self.shuffle_train, drop_last=self.drop_last, ) return loader def val_dataloader(self): loader = TimeSeriesLoader( self.dataset, batch_size=self.valid_batch_size, num_workers=self.num_workers, shuffle=False, drop_last=self.drop_last, ) return loader def predict_dataloader(self): loader = TimeSeriesLoader( self.dataset, batch_size=self.valid_batch_size, num_workers=self.num_workers, shuffle=False, ) return loader # %% ../nbs/tsdataset.ipynb 25 class _DistributedTimeSeriesDataModule(TimeSeriesDataModule): def __init__( self, dataset: _FilesDataset, batch_size=32, valid_batch_size=1024, num_workers=0, drop_last=False, shuffle_train=True, ): super(TimeSeriesDataModule, self).__init__() self.files_ds = dataset self.batch_size = batch_size self.valid_batch_size = valid_batch_size self.num_workers = num_workers self.drop_last = drop_last self.shuffle_train = shuffle_train def setup(self, stage): import torch.distributed as dist df = pd.read_parquet(self.files_ds.files[dist.get_rank()]) if self.files_ds.static_cols is not None: static_df = ( df[[self.files_ds.id_col] + self.files_ds.static_cols.tolist()] .groupby(self.files_ds.id_col, observed=True) .head(1) ) df = df.drop(columns=self.files_ds.static_cols) else: static_df = None self.dataset, *_ = TimeSeriesDataset.from_df( df=df, static_df=static_df, sort_df=True, id_col=self.files_ds.id_col, time_col=self.files_ds.time_col, target_col=self.files_ds.target_col, )