# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Unit tests for load predictor classes in dynamo.planner.utils.load_predictor.""" import math from datetime import datetime, timedelta from unittest.mock import MagicMock, patch import pandas as pd import pytest from dynamo.planner.utils.load_predictor import ( ConstantPredictor, KalmanPredictor, ProphetPredictor, ) pytestmark = [ pytest.mark.gpu_0, pytest.mark.pre_merge, pytest.mark.unit, pytest.mark.planner, ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_config( load_predictor_log1p: bool = False, step_size: int = 60, prophet_window_size: int = 50, kalman_q_level: float = 1e-4, kalman_q_trend: float = 1e-5, kalman_r: float = 0.1, kalman_min_points: int = 3, ): """Create a minimal mock PlannerConfig for predictor instantiation.""" cfg = MagicMock() cfg.load_predictor_log1p = load_predictor_log1p cfg.throughput_adjustment_interval = step_size cfg.prophet_window_size = prophet_window_size cfg.kalman_q_level = kalman_q_level cfg.kalman_q_trend = kalman_q_trend cfg.kalman_r = kalman_r cfg.kalman_min_points = kalman_min_points return cfg # --------------------------------------------------------------------------- # ConstantPredictor tests # --------------------------------------------------------------------------- class TestConstantPredictor: """Tests for ConstantPredictor behaviour.""" def test_returns_zero_with_no_data(self): """predict_next() returns 0 when no data points have been added.""" predictor = ConstantPredictor(_make_config()) assert predictor.predict_next() == 0 def test_returns_last_added_value(self): """predict_next() always returns the most recently added value.""" predictor = ConstantPredictor(_make_config()) for v in [10.0, 20.0, 30.0]: predictor.add_data_point(v) assert predictor.predict_next() == 30.0 def test_skips_leading_zeros(self): """Leading zeros (idle period) are excluded from the buffer.""" predictor = ConstantPredictor(_make_config()) predictor.add_data_point(0) predictor.add_data_point(0) assert predictor.predict_next() == 0 assert len(predictor.data_buffer) == 0 def test_does_not_skip_zeros_after_nonzero(self): """Once a nonzero value has been seen, zeros are retained.""" predictor = ConstantPredictor(_make_config()) predictor.add_data_point(5.0) predictor.add_data_point(0) assert predictor.predict_next() == 0 assert len(predictor.data_buffer) == 2 def test_nan_treated_as_zero(self): """NaN values are coerced to 0 before being stored.""" predictor = ConstantPredictor(_make_config()) predictor.add_data_point(10.0) predictor.add_data_point(float("nan")) assert predictor.predict_next() == 0 # --------------------------------------------------------------------------- # ProphetPredictor timestamp bug regression tests # --------------------------------------------------------------------------- class TestProphetPredictorTimestamp: """Regression tests for the Prophet predictor next-step timestamp bug. Bug: predict_next() used ``timedelta(seconds=self.curr_step)`` instead of ``timedelta(seconds=self.curr_step * self.step_size)`` When step_size > 1 (which is always the case in production, default=180s) the predicted timestamp fell far below the last training sample, causing Prophet to extrapolate into the past rather than one step into the future. """ def _make_prophet(self, step_size=180, log1p=False): """Instantiate a ProphetPredictor with a given step_size.""" return ProphetPredictor( _make_config(step_size=step_size, load_predictor_log1p=log1p) ) def _mock_forecast_df(self, yhat: float): """Return a minimal forecast DataFrame like Prophet would produce.""" return pd.DataFrame( {"yhat": [yhat], "yhat_lower": [yhat], "yhat_upper": [yhat]} ) # ------------------------------------------------------------------ # Core timestamp correctness test (the actual bug) # ------------------------------------------------------------------ def test_next_timestamp_uses_step_size(self): """The timestamp passed to Prophet.predict() must equal start_date + curr_step * step_size, NOT start_date + curr_step. With step_size=180, after 6 data points curr_step==6, so the expected next timestamp is start_date + 1080 seconds, not start_date + 6 seconds. """ step_size = 180 predictor = self._make_prophet(step_size=step_size) for v in [10.0, 12.0, 14.0, 13.0, 15.0, 11.0]: predictor.add_data_point(v) assert predictor.curr_step == 6 expected_next_ts = predictor.start_date + timedelta( seconds=predictor.curr_step * step_size ) buggy_next_ts = predictor.start_date + timedelta(seconds=predictor.curr_step) assert ( expected_next_ts != buggy_next_ts ), "Sanity check: the two timestamps must differ" captured_future_df: list[pd.DataFrame] = [] def fake_predict(df): captured_future_df.append(df.copy()) return self._mock_forecast_df(10.0) mock_model = MagicMock() mock_model.predict.side_effect = fake_predict with patch( "dynamo.planner.utils.load_predictor.Prophet", return_value=mock_model, ): predictor.predict_next() assert len(captured_future_df) == 1 actual_ts = captured_future_df[0]["ds"].iloc[0] assert actual_ts == expected_next_ts, ( f"predict_next() passed wrong timestamp to Prophet.\n" f" Expected (correct): {expected_next_ts}\n" f" Got: {actual_ts}\n" f" Buggy value would be: {buggy_next_ts}" ) def test_next_timestamp_consistent_with_add_data_point(self): """The timestamp used for the next prediction must be exactly one step_size ahead of the last data-point timestamp in the buffer.""" step_size = 60 predictor = self._make_prophet(step_size=step_size) for v in [5.0, 6.0, 7.0, 8.0, 9.0, 10.0]: predictor.add_data_point(v) last_data_ts: datetime = predictor.data_buffer[-1]["ds"] expected_next_ts = last_data_ts + timedelta(seconds=step_size) captured: list[pd.DataFrame] = [] def fake_predict(df): captured.append(df.copy()) return self._mock_forecast_df(5.0) mock_model = MagicMock() mock_model.predict.side_effect = fake_predict with patch( "dynamo.planner.utils.load_predictor.Prophet", return_value=mock_model, ): predictor.predict_next() actual_ts = captured[0]["ds"].iloc[0] assert actual_ts == expected_next_ts, ( f"Next-step timestamp must be one step ahead of the last training point.\n" f" Last training ts: {last_data_ts}\n" f" Expected next ts: {expected_next_ts}\n" f" Actual next ts: {actual_ts}" ) def test_next_timestamp_step_size_1_unchanged(self): """Edge-case: step_size=1 was accidentally correct before the fix. Verify the fix does not regress this case.""" step_size = 1 predictor = self._make_prophet(step_size=step_size) for v in [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]: predictor.add_data_point(v) expected_next_ts = predictor.start_date + timedelta( seconds=predictor.curr_step * step_size ) captured: list[pd.DataFrame] = [] def fake_predict(df): captured.append(df.copy()) return self._mock_forecast_df(3.0) mock_model = MagicMock() mock_model.predict.side_effect = fake_predict with patch( "dynamo.planner.utils.load_predictor.Prophet", return_value=mock_model, ): predictor.predict_next() actual_ts = captured[0]["ds"].iloc[0] assert actual_ts == expected_next_ts # ------------------------------------------------------------------ # Functional / correctness tests # ------------------------------------------------------------------ def test_returns_last_value_when_insufficient_data(self): """predict_next() falls back to last value when buffer < 5 points.""" predictor = self._make_prophet() for v in [10.0, 20.0, 30.0]: predictor.add_data_point(v) assert predictor.predict_next() == 30.0 def test_returns_zero_when_empty(self): """predict_next() returns 0 when no data points exist.""" predictor = self._make_prophet() assert predictor.predict_next() == 0 def test_predict_next_returns_non_negative_raw_mode(self): """In raw mode, predict_next() must never return a negative value.""" predictor = self._make_prophet(log1p=False) for v in [10.0, 12.0, 11.0, 13.0, 10.0, 9.0]: predictor.add_data_point(v) mock_model = MagicMock() mock_model.predict.return_value = self._mock_forecast_df(-5.0) with patch( "dynamo.planner.utils.load_predictor.Prophet", return_value=mock_model, ): result = predictor.predict_next() assert result >= 0.0, f"predict_next() returned negative value {result}" def test_predict_next_returns_non_negative_log1p_mode(self): """In log1p mode, predict_next() applies expm1 and clamps to >= 0.""" predictor = self._make_prophet(log1p=True) for v in [10.0, 12.0, 11.0, 13.0, 10.0, 9.0]: predictor.add_data_point(v) mock_model = MagicMock() mock_model.predict.return_value = self._mock_forecast_df(-100.0) with patch( "dynamo.planner.utils.load_predictor.Prophet", return_value=mock_model, ): result = predictor.predict_next() assert result >= 0.0, f"predict_next() returned negative value {result}" def test_log1p_mode_transforms_data_correctly(self): """In log1p mode, values stored in buffer are log1p-transformed, and get_last_value() returns the original scale.""" predictor = self._make_prophet(log1p=True) raw_value = 99.0 predictor.add_data_point(raw_value) buffered_y = predictor.data_buffer[-1]["y"] assert abs(buffered_y - math.log1p(raw_value)) < 1e-9 recovered = predictor.get_last_value() assert abs(recovered - raw_value) < 1e-6 def test_window_size_limits_buffer(self): """Buffer never exceeds prophet_window_size entries.""" window = 5 predictor = ProphetPredictor( _make_config(step_size=60, prophet_window_size=window) ) for v in range(1, 20): predictor.add_data_point(float(v)) assert len(predictor.data_buffer) <= window def test_curr_step_advances_on_each_non_skipped_point(self): """curr_step increments for every data point not idle-skipped.""" predictor = self._make_prophet() predictor.add_data_point(0) predictor.add_data_point(0) assert predictor.curr_step == 0 predictor.add_data_point(5.0) assert predictor.curr_step == 1 predictor.add_data_point(6.0) assert predictor.curr_step == 2 def test_skips_leading_zeros_idle_period(self): """Leading zeros (idle period) are excluded from data_buffer.""" predictor = self._make_prophet() predictor.add_data_point(0) predictor.add_data_point(0) assert len(predictor.data_buffer) == 0 predictor.add_data_point(10.0) assert len(predictor.data_buffer) == 1 def test_does_not_skip_zero_after_nonzero(self): """Once a nonzero has been seen, zeros are retained in the buffer.""" predictor = self._make_prophet() predictor.add_data_point(10.0) predictor.add_data_point(0) assert len(predictor.data_buffer) == 2 # --------------------------------------------------------------------------- # ProphetPredictor: parameterized step_size regression # --------------------------------------------------------------------------- class TestProphetPredictorMultipleStepSizes: """Parameterized check that the predict_next() timestamp is always exactly one step ahead of the last training timestamp.""" @pytest.mark.parametrize("step_size", [30, 60, 120, 180, 300]) def test_next_timestamp_is_one_step_ahead(self, step_size): """Verify correct timestamp for step_size={step_size}.""" predictor = ProphetPredictor(_make_config(step_size=step_size)) for v in [5.0, 10.0, 8.0, 12.0, 9.0, 11.0]: predictor.add_data_point(v) last_ts: datetime = predictor.data_buffer[-1]["ds"] expected_next_ts = last_ts + timedelta(seconds=step_size) captured: list[pd.DataFrame] = [] def fake_predict(df): captured.append(df.copy()) return pd.DataFrame( {"yhat": [5.0], "yhat_lower": [5.0], "yhat_upper": [5.0]} ) mock_model = MagicMock() mock_model.predict.side_effect = fake_predict with patch( "dynamo.planner.utils.load_predictor.Prophet", return_value=mock_model, ): predictor.predict_next() actual_ts = captured[0]["ds"].iloc[0] assert ( actual_ts == expected_next_ts ), f"step_size={step_size}: expected {expected_next_ts}, got {actual_ts}" # --------------------------------------------------------------------------- # KalmanPredictor sanity tests # --------------------------------------------------------------------------- class TestKalmanPredictor: """Basic sanity checks for KalmanPredictor.""" def test_returns_zero_before_first_observation(self): """predict_next() returns 0 when uninitialised.""" predictor = KalmanPredictor(_make_config()) assert predictor.predict_next() == 0 def test_predict_returns_non_negative(self): """Predictions are non-negative after feeding positive data.""" predictor = KalmanPredictor(_make_config()) for v in [10.0, 15.0, 20.0, 15.0, 10.0]: predictor.add_data_point(v) result = predictor.predict_next() assert result >= 0.0 def test_caches_prediction_between_observations(self): """Calling predict_next() twice without new data returns the same value.""" predictor = KalmanPredictor(_make_config()) for v in [10.0, 12.0, 11.0]: predictor.add_data_point(v) first = predictor.predict_next() second = predictor.predict_next() assert first == second def test_log1p_mode_returns_non_negative(self): """Log1p mode still returns non-negative predictions.""" predictor = KalmanPredictor(_make_config(load_predictor_log1p=True)) for v in [10.0, 15.0, 20.0, 15.0, 10.0]: predictor.add_data_point(v) result = predictor.predict_next() assert result >= 0.0