Source code for climagrid.assets.registry

"""
AssetRegistry: loads utility asset records from CSV or GeoJSON.

Each asset must have at minimum: asset_id, lat, lon.
Optional fields: asset_type, voltage_kv, install_year, manufacturer.
"""

from __future__ import annotations

from pathlib import Path

import geopandas as gpd
import pandas as pd

REQUIRED_COLUMNS = {"asset_id", "lat", "lon"}

ASSET_TYPE_VALUES = {
    "transformer",
    "circuit_breaker",
    "transmission_line",
    "distribution_line",
    "substation",
    "capacitor_bank",
    "recloser",
    "other",
}


[docs] class AssetRegistry: """ Loads and validates a utility asset registry from CSV or GeoJSON. Parameters ---------- path: Path to a CSV file (must have asset_id, lat, lon columns) or a GeoJSON file (must have asset_id and Point geometry). asset_type_filter: If provided, only include assets of these types. Example ------- >>> registry = AssetRegistry("my_coop_assets.csv") >>> registry.assets.head() """ def __init__( self, path: str | Path, asset_type_filter: list[str] | None = None, ): self._path = Path(path) self._gdf = self._load(self._path) if asset_type_filter: self._gdf = self._gdf[ self._gdf["asset_type"].isin(asset_type_filter) ] @property def assets(self) -> gpd.GeoDataFrame: """GeoDataFrame with one row per asset, CRS=EPSG:4326.""" return self._gdf @property def count(self) -> int: return len(self._gdf) @property def bounding_box(self) -> tuple[float, float, float, float]: """(min_lat, max_lat, min_lon, max_lon) covering all assets.""" bounds = self._gdf.total_bounds # (minx, miny, maxx, maxy) return bounds[1], bounds[3], bounds[0], bounds[2] def __len__(self) -> int: return self.count def __repr__(self) -> str: return f"AssetRegistry(n={self.count}, path={self._path.name!r})" # ------------------------------------------------------------------ # Internal loading # ------------------------------------------------------------------ def _load(self, path: Path) -> gpd.GeoDataFrame: suffix = path.suffix.lower() if suffix == ".csv": return self._load_csv(path) elif suffix in {".geojson", ".json"}: return self._load_geojson(path) else: raise ValueError( f"Unsupported file type: {suffix!r}. " "Use .csv or .geojson." ) def _load_csv(self, path: Path) -> gpd.GeoDataFrame: df = pd.read_csv(path, dtype={"asset_id": str}) self._check_required_columns(df, path) df["lat"] = pd.to_numeric(df["lat"], errors="coerce") df["lon"] = pd.to_numeric(df["lon"], errors="coerce") n_before = len(df) df = df.dropna(subset=["lat", "lon"]) if len(df) < n_before: import warnings warnings.warn( f"Dropped {n_before - len(df)} rows with null lat/lon", UserWarning, stacklevel=3, ) gdf = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df["lon"], df["lat"]), crs="EPSG:4326", ) return gdf def _load_geojson(self, path: Path) -> gpd.GeoDataFrame: gdf = gpd.read_file(path) gdf = gdf.set_crs("EPSG:4326") if gdf.crs is None else gdf.to_crs("EPSG:4326") # Extract lat/lon from geometry if not present if "lat" not in gdf.columns: gdf["lat"] = gdf.geometry.y if "lon" not in gdf.columns: gdf["lon"] = gdf.geometry.x df_check = pd.DataFrame(gdf.drop(columns="geometry")) self._check_required_columns(df_check, path) return gdf @staticmethod def _check_required_columns(df: pd.DataFrame, path: Path) -> None: missing = REQUIRED_COLUMNS - set(df.columns) if missing: raise ValueError( f"Asset file {path.name!r} is missing required columns: " f"{sorted(missing)}. " f"Required: {sorted(REQUIRED_COLUMNS)}" )
[docs] def load_sample_assets() -> AssetRegistry: """Load the bundled sample registry (33 real substations across 7 states).""" here = Path(__file__).parent.parent.parent.parent sample_path = here / "examples" / "data" / "sample_assets.csv" if not sample_path.exists(): raise FileNotFoundError( f"Sample asset file not found at {sample_path}. " "Has the repository been cloned fully?" ) return AssetRegistry(sample_path)