Source code for climagrid.sources.usda_nrcs

"""
USDA NRCS adapter: SCAN and SNOTEL soil sensor data.

Fetches soil moisture, soil temperature, and snow water equivalent from
the USDA Natural Resources Conservation Service AWDB REST API.

No API key required. Public data.

Docs: https://wcc.sc.egov.usda.gov/awdbRestApi/swagger-ui/index.html
"""

from __future__ import annotations

import math
from datetime import datetime

import pandas as pd
import requests

from climagrid.sources.base import BaseEnvironmentalSource, BoundingBox

_BASE_URL = "https://wcc.sc.egov.usda.gov/awdbRestApi/services/v1"

# NRCS element codes → climagrid column names
# Elements with depth/height sensors require the ':*' wildcard in API requests
_ELEMENT_MAP = {
    "SMS": "nrcs_soil_moisture_pct",       # Soil Moisture % (volumetric)
    "STO": "nrcs_soil_temperature",        # Soil Temperature °C
    "WTEQ": "nrcs_snow_water_equivalent",  # Snow Water Equivalent (mm)
}

# API element query string: wildcard depth for sensor arrays, bare code for point sensors
_ELEMENTS_QUERY = "SMS:*,STO:*,WTEQ"


[docs] class NrcsAdapter(BaseEnvironmentalSource): """ Fetches soil and snow data from USDA NRCS SCAN/SNOTEL network. Finds the nearest active SCAN or SNOTEL station within the bounding box, fetches hourly readings, and returns a climagrid DataFrame. Parameters ---------- max_distance_km: Maximum search distance for nearest station (default 200 km). SCAN/SNOTEL networks are sparse in some regions. """ def __init__( self, max_distance_km: float = 200.0, timeout: int = 30, session: requests.Session | None = None, ): self._max_distance_km = max_distance_km self._timeout = timeout self._session = session or requests.Session() point_based = True @property def source_name(self) -> str: return "usda_nrcs"
[docs] def fetch_points( self, points: list[tuple[float, float]], start_dt: datetime, end_dt: datetime, ) -> pd.DataFrame: """Find and fetch the nearest SCAN/SNOTEL station to each asset.""" return self._fetch_points_via_bbox(points, start_dt, end_dt, self._max_distance_km)
[docs] def fetch( self, bbox: BoundingBox, start_dt: datetime, end_dt: datetime, ) -> pd.DataFrame: start_dt = self._ensure_utc(start_dt) end_dt = self._ensure_utc(end_dt) self._validate_time_range(start_dt, end_dt) lat, lon = bbox.center station = self._find_nearest_station(lat, lon) if station is None: return self._empty_df(lat, lon, start_dt) return self._fetch_station_data(station, lat, lon, start_dt, end_dt)
def _find_nearest_station( self, lat: float, lon: float ) -> dict | None: """Return the nearest active SCAN/SNOTEL station that has soil sensors.""" params = { "activeOnly": "true", "networkCds": "SCAN,SNTL", "elements": _ELEMENTS_QUERY, } try: resp = self._session.get( f"{_BASE_URL}/stations", params=params, timeout=self._timeout, ) resp.raise_for_status() stations = resp.json() except Exception: return None if not stations: return None def _dist(s: dict) -> float: return _haversine(lat, lon, float(s["latitude"]), float(s["longitude"])) nearest = min(stations, key=_dist) dist_km = _dist(nearest) if dist_km > self._max_distance_km: return None nearest["_distance_km"] = dist_km return nearest # type: ignore[return-value, no-any-return] def _fetch_station_data( self, station: dict, asset_lat: float, asset_lon: float, start_dt: datetime, end_dt: datetime, ) -> pd.DataFrame: triplet = station["stationTriplet"] dist_km = station.get("_distance_km", float("nan")) params = { "stationTriplets": triplet, "elements": _ELEMENTS_QUERY, "beginDate": start_dt.strftime("%Y-%m-%d"), "endDate": end_dt.strftime("%Y-%m-%d"), "duration": "HOURLY", } try: resp = self._session.get( f"{_BASE_URL}/data", params=params, timeout=self._timeout, ) resp.raise_for_status() payload = resp.json() except Exception: return self._empty_df(asset_lat, asset_lon, start_dt) return self._parse_response(payload, asset_lat, asset_lon, dist_km) def _parse_response( self, payload: list, lat: float, lon: float, dist_km: float, ) -> pd.DataFrame: if not payload: return self._empty_df(lat, lon) station_data = payload[0] if isinstance(payload, list) else payload rows: list[dict] = [] for element_data in station_data.get("data", []): elem = element_data.get("stationElement", {}) # Live API uses "elementCode"; mock used "elementCd": handle both element_cd = elem.get("elementCode") or elem.get("elementCd", "") col_name = _ELEMENT_MAP.get(element_cd) if col_name is None: continue for entry in element_data.get("values", []): date_str = entry.get("date") or entry.get("dateTime") value = entry.get("value") if date_str is None or value is None: continue try: ts = pd.to_datetime(date_str, utc=True) rows.append({"timestamp": ts, col_name: float(value)}) except (ValueError, TypeError): continue if not rows: return self._empty_df(lat, lon) df = pd.DataFrame(rows) # Average across sensor depths for the same element/timestamp df = df.groupby("timestamp").mean().reset_index() df["lat"] = lat df["lon"] = lon df["nrcs_station_distance_km"] = dist_km return df # type: ignore[no-any-return] @staticmethod def _empty_df(lat: float, lon: float, ts: datetime | None = None) -> pd.DataFrame: row: dict = {"lat": lat, "lon": lon} if ts is not None: row["timestamp"] = ts return pd.DataFrame([row])
def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Return great-circle distance in km.""" R = 6371.0 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return 2 * R * math.asin(math.sqrt(a))