"""
Rolling-origin backtest and skill scoring.
The honest core of the forecasting module: a trained model is only worth
shipping if it beats persistence and climatology on out-of-sample data. This
module provides:
- ``rolling_origin_splits``: expanding-window train/test splits over the origin
timeline with an embargo gap so a training target window never overlaps a
test predictor window.
- ``evaluate``: fits the LightGBM model and both baselines on each fold and
reports MAE, RMSE, skill scores, interval coverage and pinball loss per
(fold, target, horizon).
- ``history_ablation``: reruns ``evaluate`` over several history lengths so the
default history window is chosen on measured skill, not assumed.
"""
from __future__ import annotations
import logging
import numpy as np
import pandas as pd
from climagrid.forecasting.baselines import ClimatologyForecaster, PersistenceForecaster
from climagrid.forecasting.config import ForecastConfig
from climagrid.forecasting.dataset import build_supervised_frame
from climagrid.forecasting.models import LightGBMForecaster, quantile_column_names
logger = logging.getLogger(__name__)
[docs]
def rolling_origin_splits(
dates: list[pd.Timestamp],
config: ForecastConfig,
*,
n_splits: int = 3,
test_size_days: int = 90,
) -> list[tuple[set[pd.Timestamp], set[pd.Timestamp]]]:
"""
Build expanding-window (train, test) origin-date splits.
Test windows are placed back-to-back at the end of the timeline. Each
fold's training set expands to include everything up to ``embargo`` days
before its test window starts, where ``embargo`` defaults to the max
horizon (so a training target at ``t + H`` never reaches a test origin).
"""
unique_dates = sorted({pd.Timestamp(d) for d in dates})
n = len(unique_dates)
embargo = config.effective_embargo_days
splits: list[tuple[set[pd.Timestamp], set[pd.Timestamp]]] = []
for k in range(n_splits):
test_end_idx = n - (n_splits - 1 - k) * test_size_days
test_start_idx = test_end_idx - test_size_days
train_end_idx = test_start_idx - embargo
if test_start_idx <= 0 or train_end_idx <= 0:
logger.warning(
"Skipping fold %d: not enough history for the requested split.", k
)
continue
train_dates = set(unique_dates[:train_end_idx])
test_dates = set(unique_dates[test_start_idx:test_end_idx])
if train_dates and test_dates:
splits.append((train_dates, test_dates))
return splits
def _pinball_loss(y_true: np.ndarray, y_pred: np.ndarray, quantile: float) -> float:
"""Mean pinball (quantile) loss for one quantile."""
diff = y_true - y_pred
return float(np.mean(np.maximum(quantile * diff, (quantile - 1.0) * diff)))
def _skill(mse_model: float, mse_baseline: float) -> float:
"""Skill score 1 - MSE_model / MSE_baseline (NaN if baseline MSE is zero)."""
if mse_baseline <= 0.0:
return float("nan")
return 1.0 - mse_model / mse_baseline
[docs]
def evaluate(
panel: pd.DataFrame,
config: ForecastConfig,
*,
n_splits: int = 3,
test_size_days: int = 90,
event_threshold: float = 0.0,
) -> pd.DataFrame:
"""
Backtest the LightGBM model against baselines on a daily panel.
Returns one row per (fold, target, horizon) with point-accuracy metrics,
skill scores versus both baselines, and interval calibration metrics.
``skill_vs_persistence_events`` is the skill computed on only the active
rows (where the actual value exceeds ``event_threshold``), and
``event_fraction`` is their share. For mostly-zero, sparse targets (e.g.
ice loading) the all-rows skill is inflated by the easy zero stretches; the
event-conditioned skill reflects how well the model predicts the rare
events that matter, and is the honest basis for a deploy decision.
"""
quantiles = config.quantiles
q_cols = quantile_column_names(config)
low_col, high_col = q_cols[0], q_cols[-1]
rows: list[dict[str, float | int | str]] = []
for target in config.targets:
sup = build_supervised_frame(panel, target, config)
if sup.empty:
continue
dates = [pd.Timestamp(d) for d in sup["date"].unique()]
splits = rolling_origin_splits(
dates, config, n_splits=n_splits, test_size_days=test_size_days
)
for fold, (train_dates, test_dates) in enumerate(splits):
train = sup[sup["date"].isin(train_dates)]
test = sup[sup["date"].isin(test_dates)]
if train.empty or test.empty:
continue
model = LightGBMForecaster(config).fit(train, target)
persistence = PersistenceForecaster(config).fit(train, target)
climatology = ClimatologyForecaster(config).fit(train, target)
model_pred = model.predict(test, target)
for h in range(1, config.horizon_days + 1):
actual = test[f"y_h{h}"]
valid = actual.notna().to_numpy()
if valid.sum() == 0:
continue
test_h = test[valid]
y_true = test_h[f"y_h{h}"].to_numpy(dtype=float)
y_pers = persistence.predict(test_h, h)
y_clim = climatology.predict(test_h, h)
pred_h = model_pred[model_pred["horizon_day"] == h]
merged = test_h[["asset_id", "date"]].merge(
pred_h,
left_on=["asset_id", "date"],
right_on=["asset_id", "origin_date"],
how="left",
)
p50 = merged["p50"].to_numpy(dtype=float)
p_low = merged[low_col].to_numpy(dtype=float)
p_high = merged[high_col].to_numpy(dtype=float)
mse_model = float(np.mean((p50 - y_true) ** 2))
mse_pers = float(np.mean((y_pers - y_true) ** 2))
mse_clim = float(np.mean((y_clim - y_true) ** 2))
# Event-conditioned skill: only the active (above-threshold) rows.
event = y_true > event_threshold
if event.any():
skill_events = _skill(
float(np.mean((p50[event] - y_true[event]) ** 2)),
float(np.mean((y_pers[event] - y_true[event]) ** 2)),
)
else:
skill_events = float("nan")
coverage = float(np.mean((y_true >= p_low) & (y_true <= p_high)))
pinball = float(
np.mean(
[
_pinball_loss(y_true, merged[col].to_numpy(dtype=float), q)
for col, q in zip(q_cols, quantiles, strict=True)
]
)
)
rows.append(
{
"fold": fold,
"target": target,
"horizon_day": h,
"n": int(len(y_true)),
"mae": float(np.mean(np.abs(p50 - y_true))),
"rmse": float(np.sqrt(mse_model)),
"mae_persistence": float(np.mean(np.abs(y_pers - y_true))),
"mae_climatology": float(np.mean(np.abs(y_clim - y_true))),
"skill_vs_persistence": _skill(mse_model, mse_pers),
"skill_vs_persistence_events": skill_events,
"event_fraction": float(np.mean(event)),
"skill_vs_climatology": _skill(mse_model, mse_clim),
"interval_coverage": coverage,
"pinball": pinball,
}
)
return pd.DataFrame(rows)
[docs]
def history_ablation(
panel: pd.DataFrame,
config: ForecastConfig,
*,
windows_years: list[int] | None = None,
n_splits: int = 3,
test_size_days: int = 90,
) -> pd.DataFrame:
"""
Rerun ``evaluate`` across several history-window lengths.
Lets the default history length be chosen on measured skill and calibration
rather than assumed. Returns the concatenated per-fold metrics with a
``history_years`` column.
"""
windows = windows_years or [10, 15, 25]
if panel.empty:
return pd.DataFrame()
max_date = panel["date"].max()
frames: list[pd.DataFrame] = []
for window in windows:
cutoff = max_date - pd.DateOffset(years=window)
subset = panel[panel["date"] >= cutoff]
result = evaluate(
subset, config, n_splits=n_splits, test_size_days=test_size_days
)
if not result.empty:
result = result.assign(history_years=window)
frames.append(result)
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True)