"src/diffusers/pipelines/lumina/pipeline_lumina.py" did not exist on "3393c01c9d6e2982f712980b86e5942078caf192"
forecaster.py 4.01 KB
Newer Older
bailuo's avatar
readme  
bailuo committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import logging
from typing import Iterable, List

import numpy as np
import pandas as pd
import torch
from chronos import ChronosPipeline
from utilsforecast.processing import make_future_dataframe

logging.basicConfig(level=logging.INFO)
main_logger = logging.getLogger(__name__)


class TimeSeriesDataset:
    def __init__(
        self,
        data: torch.Tensor,
        uids: Iterable,
        last_times: Iterable,
        batch_size: int,
    ):
        self.data = data
        self.uids = uids
        self.last_times = last_times
        self.batch_size = batch_size
        self.n_batches = len(data) // self.batch_size + (
            0 if len(data) % self.batch_size == 0 else 1
        )
        self.current_batch = 0

    @classmethod
    def from_df(cls, df: pd.DataFrame, batch_size: int):
        num_unique_ids = df["unique_id"].nunique()
        max_series_length = df["unique_id"].value_counts().max()
        padded_tensor = torch.full(
            size=(num_unique_ids, max_series_length),
            fill_value=torch.nan,
            dtype=torch.bfloat16,
        )  # type: ignore
        df_sorted = df.sort_values(by=["unique_id", "ds"])
        for idx, (_, group) in enumerate(df_sorted.groupby("unique_id")):
            series_length = len(group)
            padded_tensor[idx, -series_length:] = torch.tensor(
                group["y"].values,
                dtype=torch.bfloat16,
            )
        uids = df_sorted["unique_id"].unique()
        last_times = df_sorted.groupby("unique_id")["ds"].tail(1)
        return cls(padded_tensor, uids, last_times, batch_size)

    def __len__(self):
        return len(self.data)

    def make_future_dataframe(self, h: int, freq: str) -> pd.DataFrame:
        return make_future_dataframe(
            uids=self.uids,
            last_times=pd.to_datetime(self.last_times),
            h=h,
            freq=freq,
        )  # type: ignore

    def __iter__(self):
        self.current_batch = 0  # Reset for new iteration
        return self

    def __next__(self):
        if self.current_batch < self.n_batches:
            start_idx = self.current_batch * self.batch_size
            end_idx = start_idx + self.batch_size
            self.current_batch += 1
            return self.data[start_idx:end_idx]
        else:
            raise StopIteration


class AmazonChronos:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.model = ChronosPipeline.from_pretrained(
            model_name,
            device_map="auto",
            torch_dtype=torch.bfloat16,
        )

    def forecast(
        self,
        df: pd.DataFrame,
        h: int,
        freq: str,
        batch_size: int = 32,
        quantiles: List[float] | None = None,
        **predict_kwargs,
    ) -> pd.DataFrame:
        main_logger.info("transforming dataframe to tensor")
        dataset = TimeSeriesDataset.from_df(df, batch_size=batch_size)
        main_logger.info("forecasting")
        fcsts = [self.model.predict(batch, prediction_length=h, **predict_kwargs) for batch in dataset]
        fcst = torch.cat(fcsts)
        main_logger.info("transforming forecast to dataframe")
        fcst = fcst.numpy()
        fcst_df = dataset.make_future_dataframe(h=h, freq=freq)
        fcst_df[self.model_name] = np.median(fcst, axis=1).reshape(-1, 1)
        if quantiles is not None:
            for q in quantiles:
                q_col = f"{self.model_name}-q-{q}"
                fcst_df[q_col] = np.quantile(fcst, q, axis=1).reshape(-1, 1)
        return fcst_df


if __name__ == "__main__":
    import pandas as pd

    df = pd.read_csv(
        "https://raw.githubusercontent.com/AileenNielsen/TimeSeriesAnalysisWithPython/master/data/AirPassengers.csv"
    )
    df = df.rename(columns={"#Passengers": "y", "Month": "ds"})
    df["ds"] = pd.to_datetime(df["ds"])
    df.insert(0, "unique_id", "AirPassengers")
    df = pd.concat([df, df.assign(unique_id="AirPassengers2")])
    model = AmazonChronos("amazon/chronos-t5-small")
    fcst_df = model.forecast(df, h=12, freq="MS")
    print(fcst_df)