tools.py 3.41 KB
Newer Older
bailuo's avatar
readme  
bailuo committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
from typing import Optional, Tuple

import pandas as pd
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse


class ExperimentHandler:
    def __init__(self, file: str, method: str):
        self.file = file
        self.method = method

    @staticmethod
    def get_parameter(parameter: str, df: pd.DataFrame):
        parameter = df[parameter].unique()
        if len(parameter) > 1:
            raise ValueError(f"{parameter} is not unique: {parameter}")
        return parameter[0]

    def read_data(
        self,
        max_insample_length: int = 3_000,
    ) -> Tuple[pd.DataFrame, str, str, int, int]:
        df = pd.read_parquet(self.file)
        Y_df = df[["unique_id", "ds", "y"]].drop_duplicates(["unique_id", "ds"])
        Y_df = Y_df.sort_values(["unique_id", "ds"])
        Y_df = Y_df.groupby("unique_id").tail(
            max_insample_length
        )  # take only last 3_000 rows
        Y_df["ds"] = Y_df["ds"].str.replace(":01$", ":00", regex=True)
        freq = self.get_parameter("frequency", df)
        pandas_freq = self.get_parameter("pandas_frequency", df)
        h = self.get_parameter("horizon", df)
        seasonality = self.get_parameter("seasonality", df)
        return Y_df, freq, pandas_freq, int(h), int(seasonality)

    def evaluate_model(
        self,
        Y_hat_df: pd.DataFrame,
        model_name: str,
        total_time: float,
    ):
        if "cutoff" in Y_hat_df.columns:
            Y_hat_df = Y_hat_df.drop(columns="cutoff")
        eval_df = evaluate(
            df=Y_hat_df,
            metrics=[rmse, mae],
        )
        total_time_df = pd.DataFrame({"model": [model_name], "time": [total_time]})
        return eval_df, total_time_df

    @staticmethod
    def summarize_df(df: pd.DataFrame):
        n_unique_ids = df["unique_id"].nunique()
        mean_y = df["y"].mean()
        std_y = df["y"].std()
        lengths = df.groupby("unique_id").size()
        min_length = lengths.min()
        max_length = lengths.max()
        n_obs = len(df)
        summary = {
            "n_series": n_unique_ids,
            "mean": mean_y,
            "std": std_y,
            "min_length": min_length,
            "max_length": max_length,
            "n_obs": n_obs,
        }
        summary_df = pd.DataFrame.from_dict(summary, orient="index")
        summary_df = summary_df.transpose()
        return summary_df

    def save_results(
        self,
        freq: str,
        eval_df: pd.DataFrame,
        total_time_df: pd.DataFrame,
        df: Optional[pd.DataFrame] = None,
    ):
        eval_df["frequency"] = freq
        eval_df = eval_df.melt(
            id_vars=["frequency", "metric", "unique_id"],
            var_name="model",
            value_name="value",
        )
        total_time_df["frequency"] = freq
        dir = self.file.split("/")[-1].replace(".parquet", "")
        dir = f"./data/results/{dir}"
        os.makedirs(dir, exist_ok=True)
        eval_df.to_parquet(
            f"{dir}/{self.method}_metrics.parquet",
            index=False,
        )
        total_time_df.to_parquet(
            f"{dir}/{self.method}_time.parquet",
            index=False,
        )
        if df is not None:
            summary_df = self.summarize_df(df)
            summary_df["frequency"] = freq
            print(summary_df)
            summary_df.to_parquet(
                f"{dir}/series_summary.parquet",
                index=False,
            )