download_data.py 1.98 KB
Newer Older
bailuo's avatar
readme  
bailuo committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import logging
from concurrent.futures import ProcessPoolExecutor

import pandas as pd

logging.basicConfig(level=logging.INFO)
main_logger = logging.getLogger(__name__)


def read_parquet_and_assign(uid, url):
    df = pd.read_parquet(url)
    df["unique_id"] = uid
    df["ds"] = df["ds"].astype(str)
    return df[["unique_id", "ds", "y"]]


def download_data():
    catalogue_splits = pd.read_parquet("./data/catalogue_splits.parquet")
    catalogue_datasets = pd.read_parquet("./data/catalogue_datasets.parquet")
    catalogue_df = catalogue_splits.merge(
        catalogue_datasets,
        on=["dataset", "subdataset", "frequency"],
    )
    del catalogue_splits
    del catalogue_datasets
    catalogue_df = catalogue_df.query("split == 'test'")[
        [
            "unique_id",
            "frequency",
            "url",
            "pandas_frequency",
            "seasonality",
            "horizon",
        ]
    ]
    grouped_df = catalogue_df.groupby(["frequency", "pandas_frequency"])
    for (frequency, pandas_frequency), df in grouped_df:
        uids, urls = df["unique_id"].values, df["url"].values
        main_logger.info(
            f"frequency: {frequency}, pandas_frequency: {pandas_frequency}"
        )
        n_uids = len(uids)
        main_logger.info(f"number of uids: {n_uids}")
        max_workers = min(10, n_uids)
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            futures = [
                executor.submit(read_parquet_and_assign, uid, url)
                for uid, url in zip(uids, urls)
            ]
            results = [future.result() for future in futures]
        main_logger.info("dataset read")
        Y_df = pd.concat(results)
        Y_df = Y_df.merge(
            df.drop(columns="url"),
            on="unique_id",
            how="left",
        )
        Y_df.to_parquet(f"./data/{frequency}_{pandas_frequency}.parquet")
        del Y_df
        main_logger.info("dataset saved")


if __name__ == "__main__":
    download_data()