data_handler.py 4.34 KB
Newer Older
bailuo's avatar
readme  
bailuo committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
import warnings
from dataclasses import dataclass, asdict
from functools import partial
from pathlib import Path

import pandas as pd
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import rmse, mae, mase

from src.utils.filter_data import DatasetParams

warnings.simplefilter(action="ignore", category=FutureWarning)
logging.basicConfig(level=logging.INFO)
main_logger = logging.getLogger(__name__)


@dataclass
class ExperimentDataset:
    Y_df_train: pd.DataFrame
    Y_df_test: pd.DataFrame
    horizon: int
    seasonality: int
    frequency: str
    pandas_frequency: str

    @classmethod
    def from_df(cls, df: pd.DataFrame) -> "ExperimentDataset":
        """
        Parameters
        ----------
        df : pd.DataFrame
            df should have columns: unique_id, ds, y, frequency, pandas_frequency, horizon, seasonality
        """
        ds_params = DatasetParams.from_df(df)
        df = df[["unique_id", "ds", "y"]]  # type: ignore
        Y_df_test = df.groupby("unique_id").tail(ds_params.horizon)
        Y_df_train = df.drop(Y_df_test.index)  # type: ignore
        return cls(
            Y_df_train=Y_df_train,
            Y_df_test=Y_df_test,
            **asdict(ds_params),
        )

    @classmethod
    def from_parquet(
        cls,
        parquet_path: str,
    ) -> "ExperimentDataset":
        df = pd.read_parquet(parquet_path)
        return cls.from_df(df=df)

    def evaluate_forecast_df(
        self,
        forecast_df: pd.DataFrame,
        model: str,
        total_time: float,
    ) -> pd.DataFrame:
        df_ = self.Y_df_test.copy(deep=True)
        if forecast_df.dtypes["ds"] != df_.dtypes["ds"]:
            df_["ds"] = df_["ds"].astype(forecast_df.dtypes["ds"])
        df = df_.merge(
            forecast_df[["unique_id", "ds", model]],
            on=["unique_id", "ds"],
            how="left",
        )
        if df[model].isna().sum() > 0:
            na_uids = df.loc[df[model].isna()]["unique_id"].unique()
            main_logger.warning(
                f"{model} contains NaN for {len(na_uids)} series: {na_uids}"
                "filling with last values"
            )
            from statsforecast import StatsForecast
            from statsforecast.models import SeasonalNaive

            sf = StatsForecast(
                models=[SeasonalNaive(season_length=self.seasonality)],
                freq=self.pandas_frequency,
            )
            sn_df = sf.forecast(
                df=self.Y_df_train,
                h=self.horizon,
            )
            df = df.merge(sn_df, on=["unique_id", "ds"], how="left")  # type: ignore
            df.loc[df["unique_id"].isin(na_uids), model] = df.loc[
                df["unique_id"].isin(na_uids), "SeasonalNaive"
            ]
            df = df.drop(columns=["SeasonalNaive"])
        partial_mase = partial(mase, seasonality=self.seasonality)
        eval_df = evaluate(
            df=df,
            metrics=[rmse, mae, partial_mase],
            train_df=self.Y_df_train,
            models=[model],
        )
        eval_df = eval_df.groupby("metric").mean(numeric_only=True).reset_index()  # type: ignore
        eval_time_df = pd.DataFrame(
            {
                "metric": ["total_time"],
                model: [total_time],
            }
        )
        eval_df = pd.concat(
            [eval_df, eval_time_df],
            ignore_index=True,
        )  # type: ignore
        return eval_df.set_index("metric")


@dataclass
class ForecastDataset:
    forecast_df: pd.DataFrame
    total_time: float

    @classmethod
    def from_dir(cls, dir: str | Path):
        dir_ = Path(dir)
        forecast_df = pd.read_parquet(dir_ / "forecast_df.parquet")
        with open(dir_ / "total_time.txt", "r") as file:
            total_time = float(file.read())
        return cls(forecast_df=forecast_df, total_time=total_time)

    @staticmethod
    def is_forecast_ready(dir: str | Path):
        dir_ = Path(dir)
        forecast_path = dir_ / "forecast_df.parquet"
        time_path = dir_ / "total_time.txt"
        return forecast_path.exists() and time_path.exists()

    def save_to_dir(self, dir: str | Path):
        dir_ = Path(dir)
        dir_.mkdir(parents=True, exist_ok=True)
        self.forecast_df.to_parquet(dir_ / "forecast_df.parquet")
        with open(dir_ / "total_time.txt", "w") as file:
            file.write(str(self.total_time))