Source code for climagrid.sources.noaa_ncei

"""
NOAA NCEI ISD adapter: Integrated Surface Database hourly observations.

Fetches hourly surface station data (temperature, wind, precipitation,
humidity) from NOAA's Climate Data Online API. Free API key required:
https://www.ncdc.noaa.gov/cdo-web/token

Docs: https://www.ncdc.noaa.gov/cdo-web/webservices/v2
"""

from __future__ import annotations

import os
from datetime import datetime

import pandas as pd
import requests

from climagrid.sources.base import BaseEnvironmentalSource, BoundingBox

_BASE_URL = "https://www.ncdc.noaa.gov/cdo-web/api/v2"

# GHCND data type IDs available from official airport/ASOS stations → climagrid column names
# GHCND is a daily dataset; these are daily summary values (not hourly observations)
_DTYPE_MAP = {
    "TMAX": "ncei_temperature_max",    # daily max temperature (°C)
    "TMIN": "ncei_temperature_min",    # daily min temperature (°C)
    "AWND": "ncei_wind_speed",         # average daily wind speed (m/s)
    "PRCP": "ncei_precipitation_daily",# daily precipitation (mm)
    "RHAV": "ncei_relative_humidity",  # average daily relative humidity (%)
}

# CDO dataset: GHCND has reliable daily data from official ASOS/airport stations
_DATASET = "GHCND"


[docs] class NceiAdapter(BaseEnvironmentalSource): """ Fetches hourly surface observations from NOAA NCEI CDO API. Finds the nearest NCEI station within the bounding box and returns its hourly observation record for the requested time range. Parameters ---------- api_token: NOAA CDO API token. If None, reads from NOAA_CDO_TOKEN env var. radius_km: Search radius for finding the nearest station (default 50 km). """ def __init__( self, api_token: str | None = None, radius_km: float = 50.0, timeout: int = 30, session: requests.Session | None = None, ): self._token = api_token or os.environ.get("NOAA_CDO_TOKEN", "") self._radius_km = radius_km self._timeout = timeout self._session = session or requests.Session() if self._token: self._session.headers["token"] = self._token point_based = True @property def source_name(self) -> str: return "noaa_ncei"
[docs] def fetch_points( self, points: list[tuple[float, float]], start_dt: datetime, end_dt: datetime, ) -> pd.DataFrame: """Find and fetch the nearest station to each asset location.""" return self._fetch_points_via_bbox(points, start_dt, end_dt, self._radius_km)
[docs] def fetch( self, bbox: BoundingBox, start_dt: datetime, end_dt: datetime, ) -> pd.DataFrame: start_dt = self._ensure_utc(start_dt) end_dt = self._ensure_utc(end_dt) self._validate_time_range(start_dt, end_dt) station_id = self._find_nearest_station(bbox, start_dt, end_dt) if station_id is None: lat, lon = bbox.center return pd.DataFrame({"lat": [lat], "lon": [lon], "timestamp": [start_dt]}) return self._fetch_station_data(station_id, bbox.center, start_dt, end_dt)
def _find_nearest_station( self, bbox: BoundingBox, start_dt: datetime, end_dt: datetime ) -> str | None: lat, lon = bbox.center params = { "datasetid": _DATASET, "extent": f"{bbox.min_lat},{bbox.min_lon},{bbox.max_lat},{bbox.max_lon}", "startdate": start_dt.strftime("%Y-%m-%d"), "enddate": end_dt.strftime("%Y-%m-%d"), "datatypeid": "TMAX", "limit": 25, } try: resp = self._session.get( f"{_BASE_URL}/stations", params=params, timeout=self._timeout # type: ignore[arg-type] ) resp.raise_for_status() data = resp.json() except Exception: return None results = data.get("results", []) if not results: return None # Pick the station with the highest data coverage best = max(results, key=lambda s: s.get("datacoverage", 0)) return str(best["id"]) def _fetch_station_data( self, station_id: str, center: tuple[float, float], start_dt: datetime, end_dt: datetime, ) -> pd.DataFrame: lat, lon = center params = { "datasetid": _DATASET, "stationid": station_id, "startdate": start_dt.strftime("%Y-%m-%d"), "enddate": end_dt.strftime("%Y-%m-%d"), "datatypeid": ",".join(_DTYPE_MAP.keys()), "limit": 1000, "units": "metric", } try: resp = self._session.get( f"{_BASE_URL}/data", params=params, timeout=self._timeout # type: ignore[arg-type] ) resp.raise_for_status() data = resp.json() except Exception: return pd.DataFrame({"lat": [lat], "lon": [lon], "timestamp": [start_dt]}) results = data.get("results", []) if not results: return pd.DataFrame({"lat": [lat], "lon": [lon], "timestamp": [start_dt]}) raw = pd.DataFrame(results) raw["timestamp"] = pd.to_datetime(raw["date"], utc=True) # Pivot so each datatype becomes a column pivoted = raw.pivot_table( index="timestamp", columns="datatype", values="value", aggfunc="mean" ).reset_index() pivoted = pivoted.rename(columns=_DTYPE_MAP) pivoted["lat"] = lat pivoted["lon"] = lon return pivoted # type: ignore[no-any-return]