"""
WildfireProximityScore: normalized wildfire risk for T&D infrastructure.
Wildfire is the fastest-growing cause of major U.S. transmission and
distribution outages. Key risk pathways:
- Direct flame contact with conductors (phase-to-phase faults)
- Smoke particles reducing air insulation strength (flashovers)
- Pole and structure ignition
- Post-fire debris flow and slope instability damaging underground lines
The score combines:
1. Distance to nearest active fire perimeter (from USFS WFIGS)
2. Optionally: NOAA HRRR-derived fire weather index (temperature, humidity, wind)
"""
from __future__ import annotations
import math
import numpy as np
import pandas as pd
from climagrid.sources.usfs_wfigs import compute_proximity
# Distance decay: risk drops to ~5% at this distance (km)
_HALF_DISTANCE_KM = 10.0
[docs]
class WildfireProximityScore:
"""
Computes a normalized wildfire proximity score [0, 1] for asset locations.
Parameters
----------
max_risk_distance_km:
Distance within which risk = 1.0 (fire perimeter adjacent). Default 1 km.
zero_risk_distance_km:
Distance beyond which risk ≈ 0. Default 200 km.
use_fire_weather:
If True and fire weather columns are present, modulate score by
fire weather conditions (temperature, humidity, wind).
temp_col:
Temperature column for fire weather modulation.
rh_col:
Relative humidity column.
wind_col:
Wind speed column.
Example
-------
>>> wfigs = WfigsAdapter()
>>> fires_df = wfigs.fetch(bbox, start_dt, end_dt)
>>> score = WildfireProximityScore()
>>> df = score.compute(asset_df, fires_df)
>>> df["feat_wildfire_proximity"]
"""
def __init__(
self,
max_risk_distance_km: float = 1.0,
zero_risk_distance_km: float = 200.0,
use_fire_weather: bool = True,
temp_col: str = "hrrr_temperature_2m",
rh_col: str = "hrrr_relative_humidity_2m",
wind_col: str = "hrrr_wind_speed_10m",
):
self._max_risk_km = max_risk_distance_km
self._zero_risk_km = zero_risk_distance_km
self._use_fire_weather = use_fire_weather
self._temp_col = temp_col
self._rh_col = rh_col
self._wind_col = wind_col
[docs]
def compute(self, df: pd.DataFrame, fires_df: pd.DataFrame) -> pd.DataFrame:
"""
Add feat_wildfire_proximity column to df.
Parameters
----------
df:
Asset-level environmental DataFrame with lat/lon columns.
fires_df:
DataFrame returned by WfigsAdapter.fetch() with fire perimeter data.
Returns
-------
pd.DataFrame
Input df with feat_wildfire_proximity [0,1] and
wfigs_nearest_fire_km, wfigs_fire_active, wfigs_fire_area_ha added.
"""
df = df.copy()
if fires_df.empty or "lat" not in df.columns:
df["feat_wildfire_proximity"] = 0.0
df["wfigs_nearest_fire_km"] = float("inf")
df["wfigs_fire_active"] = False
df["wfigs_fire_area_ha"] = 0.0
return df
# For each unique asset location, compute proximity
location_cache: dict[tuple[float, float], tuple[float, bool, float]] = {}
def _proximity(row: pd.Series) -> tuple[float, bool, float]:
key = (row["lat"], row["lon"])
if key not in location_cache:
location_cache[key] = compute_proximity(row["lat"], row["lon"], fires_df)
return location_cache[key]
proximities = df.apply(_proximity, axis=1)
df["wfigs_nearest_fire_km"] = [p[0] for p in proximities]
df["wfigs_fire_active"] = [p[1] for p in proximities]
df["wfigs_fire_area_ha"] = [p[2] for p in proximities]
# Distance-based score: exponential decay
dist = df["wfigs_nearest_fire_km"]
base_score = np.where(
dist <= self._max_risk_km,
1.0,
np.exp(
-math.log(20) * (dist - self._max_risk_km)
/ (self._zero_risk_km - self._max_risk_km)
),
)
base_score = np.clip(base_score, 0.0, 1.0)
# Fire weather modulation (optional)
if self._use_fire_weather:
weather_factor = self._fire_weather_factor(df)
base_score = np.clip(base_score * weather_factor, 0.0, 1.0)
df["feat_wildfire_proximity"] = base_score
return df
def _fire_weather_factor(self, df: pd.DataFrame) -> np.ndarray:
"""
Return a multiplicative modulation factor [0.5, 2.0] based on
fire weather conditions. Values > 1 amplify risk under extreme
conditions; values < 1 reduce risk under benign conditions.
"""
factor = np.ones(len(df))
# High temperature increases risk
if self._temp_col in df.columns:
temp = df[self._temp_col].fillna(20.0)
factor *= np.clip(1.0 + (temp - 20.0) / 40.0, 0.5, 2.0)
# Low humidity increases risk
if self._rh_col in df.columns:
rh = df[self._rh_col].fillna(50.0)
factor *= np.clip(2.0 - rh / 50.0, 0.5, 2.0)
# High wind increases risk
if self._wind_col in df.columns:
wind = df[self._wind_col].fillna(5.0)
factor *= np.clip(1.0 + wind / 20.0, 0.5, 2.0)
return np.clip(factor, 0.5, 2.0)