Source code for climagrid.assets.joiner

"""
AssetEnvironmentJoiner: spatially joins environmental data to asset locations.

For each asset in the registry, finds the nearest data point (grid cell or
station) in the environmental DataFrame and extracts its time series.
"""

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd
from scipy.spatial import cKDTree

from climagrid.assets.registry import AssetRegistry


[docs] class AssetEnvironmentJoiner: """ Joins time-series environmental data to utility asset point locations. Strategy: nearest-neighbor match in Euclidean lat/lon space (valid for small regions, <500 km extents). For large extents consider haversine. Parameters ---------- max_distance_km: Reject matches farther than this distance. Points beyond this threshold will have NaN environmental values. Default 100 km. Example ------- >>> registry = AssetRegistry("assets.csv") >>> nasa = NasaPowerAdapter() >>> env_df = nasa.fetch(bbox, start_dt, end_dt) >>> joiner = AssetEnvironmentJoiner() >>> result = joiner.join(registry, env_df) >>> result.head() """ def __init__(self, max_distance_km: float = 100.0): self._max_distance_km = max_distance_km
[docs] def join( self, registry: AssetRegistry, env_df: pd.DataFrame, time_col: str = "timestamp", ) -> pd.DataFrame: """ Join environmental observations to each asset for every timestamp. Parameters ---------- registry: AssetRegistry with asset locations. env_df: DataFrame returned by any adapter's fetch() method. Must have 'lat', 'lon', and at least one timestamp. time_col: Name of the timestamp column in env_df. Returns ------- pd.DataFrame One row per (asset_id, timestamp) with index columns and all environmental columns present in env_df. """ assets = registry.assets if env_df.empty: return pd.DataFrame( {"asset_id": assets["asset_id"].values} ) if "lat" not in env_df.columns or "lon" not in env_df.columns: raise ValueError("env_df must contain 'lat' and 'lon' columns") # Build KD-tree from unique environmental grid points env_points = env_df[["lat", "lon"]].drop_duplicates().reset_index(drop=True) tree = cKDTree(env_points[["lat", "lon"]].values) asset_lats = assets["lat"].values asset_lons = assets["lon"].values asset_coords = np.column_stack([asset_lats, asset_lons]) # Query nearest grid point for each asset distances_deg, indices = tree.query(asset_coords, k=1) # Rough conversion: 1 degree ≈ 111 km distances_km = distances_deg * 111.0 # Warn about far matches too_far = distances_km > self._max_distance_km if too_far.any(): n_far = too_far.sum() warnings.warn( f"{n_far} asset(s) are more than {self._max_distance_km} km " "from any environmental data point. Those rows will have NaN values.", UserWarning, stacklevel=2, ) # Map each asset to its nearest environmental grid point lat/lon nearest_lats = env_points.loc[indices, "lat"].values nearest_lons = env_points.loc[indices, "lon"].values # Build result: for each asset, extract the env time series at its nearest point result_frames: list[pd.DataFrame] = [] env_value_cols = [ c for c in env_df.columns if c not in {"lat", "lon", time_col} ] for i, row in assets.iterrows(): asset_id = row["asset_id"] asset_lat = row["lat"] asset_lon = row["lon"] nn_lat = nearest_lats[list(assets.index).index(i) if i in assets.index else i] nn_lon = nearest_lons[list(assets.index).index(i) if i in assets.index else i] env_slice = env_df[ (env_df["lat"] == nn_lat) & (env_df["lon"] == nn_lon) ][env_value_cols + ([time_col] if time_col in env_df.columns else [])].copy() env_slice["asset_id"] = asset_id env_slice["lat"] = asset_lat env_slice["lon"] = asset_lon if distances_km[list(assets.index).index(i) if i in assets.index else i] > self._max_distance_km: for col in env_value_cols: env_slice[col] = float("nan") result_frames.append(env_slice) if not result_frames: return pd.DataFrame() result = pd.concat(result_frames, ignore_index=True) # Reorder columns: asset_id, timestamp, lat, lon, then env columns front_cols = ["asset_id"] if time_col in result.columns: front_cols.append(time_col) front_cols += ["lat", "lon"] remaining = [c for c in result.columns if c not in front_cols] return result[front_cols + remaining].reset_index(drop=True) # type: ignore[no-any-return]
[docs] def join_point( self, asset_lat: float, asset_lon: float, env_df: pd.DataFrame, time_col: str = "timestamp", ) -> pd.DataFrame: """Convenience method: join env data for a single lat/lon point.""" import os import tempfile from climagrid.assets.registry import AssetRegistry tmp_data = pd.DataFrame([{ "asset_id": "point", "lat": asset_lat, "lon": asset_lon, }]) with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: tmp_data.to_csv(f, index=False) tmp_path = f.name try: reg = AssetRegistry(tmp_path) return self.join(reg, env_df, time_col) finally: os.unlink(tmp_path)