Skip to content

API Reference

hms2cng exposes all public functions for programmatic use. Import from the package directly or from individual modules.

# Package-level imports
from hms2cng import (
    export_basin_geometry, export_hms_results,
    export_all_basin_geometry, export_all_results,
    get_project_manifest, export_project_manifest, export_full_project,
    DuckSession,
)

# Module-level imports
from hms2cng.project import get_project_manifest, export_project_manifest, export_full_project
from hms2cng.geometry import export_basin_geometry, get_basin_layer_gdf, export_all_basin_geometry
from hms2cng.results import export_hms_results, export_all_results
from hms2cng.duckdb_session import DuckSession, query_parquet, spatial_join
from hms2cng.pmtiles import generate_pmtiles_from_input
from hms2cng.postgis_sync import sync_to_postgres

hms2cng.project

hms2cng.project.get_project_manifest(hms_file)

Return a dict with project-level metadata (suitable for a 1-row DataFrame).

Parameters:

Name Type Description Default
hms_file Path

Path to the .hms project file or project directory.

required

Returns:

Type Description
dict

Dict with keys: project_name, project_file, hms_version, crs_epsg,

dict

is_gridded, num_basin_models, num_met_models, num_control_specs,

dict

num_runs, basin_models, met_models, run_names, export_timestamp.

dict

The list fields (basin_models, met_models, run_names) are JSON strings.

Source code in hms2cng/project.py
def get_project_manifest(hms_file: Path) -> dict:
    """Return a dict with project-level metadata (suitable for a 1-row DataFrame).

    Args:
        hms_file: Path to the .hms project file or project directory.

    Returns:
        Dict with keys: project_name, project_file, hms_version, crs_epsg,
        is_gridded, num_basin_models, num_met_models, num_control_specs,
        num_runs, basin_models, met_models, run_names, export_timestamp.
        The list fields (basin_models, met_models, run_names) are JSON strings.
    """
    hms_file = _find_hms_file(Path(hms_file))
    prj = _init_project(hms_file)

    basin_names = list(prj.basin_df["name"]) if not prj.basin_df.empty else []
    met_names = list(prj.met_df["name"]) if not prj.met_df.empty else []
    run_names = list(prj.run_df["name"]) if not prj.run_df.empty else []

    return {
        "project_name": prj.project_name,
        "project_file": str(hms_file),
        "hms_version": getattr(prj, "hms_version", None),
        "crs_epsg": getattr(prj, "crs_epsg", None),
        "is_gridded": getattr(prj, "is_gridded", False),
        "num_basin_models": len(basin_names),
        "num_met_models": len(met_names),
        "num_control_specs": len(prj.control_df) if not prj.control_df.empty else 0,
        "num_runs": len(run_names),
        "basin_models": json.dumps(basin_names),
        "met_models": json.dumps(met_names),
        "run_names": json.dumps(run_names),
        "export_timestamp": datetime.now(timezone.utc).isoformat(),
    }

hms2cng.project.export_project_manifest(hms_file, output_dir)

Write manifest.parquet, run_registry.parquet, and basin_inventory.parquet.

Parameters:

Name Type Description Default
hms_file Path

Path to the .hms project file or project directory.

required
output_dir Path

Directory to write the three parquet files.

required

Returns:

Type Description
tuple[Path, Path, Path]

Tuple of (manifest_path, run_registry_path, basin_inventory_path).

Source code in hms2cng/project.py
def export_project_manifest(hms_file: Path, output_dir: Path) -> tuple[Path, Path, Path]:
    """Write manifest.parquet, run_registry.parquet, and basin_inventory.parquet.

    Args:
        hms_file: Path to the .hms project file or project directory.
        output_dir: Directory to write the three parquet files.

    Returns:
        Tuple of (manifest_path, run_registry_path, basin_inventory_path).
    """
    hms_file = _find_hms_file(Path(hms_file))
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    prj = _init_project(hms_file)
    project_name = prj.project_name
    project_dir = hms_file.parent

    # --- manifest.parquet ---
    manifest_data = get_project_manifest(hms_file)
    manifest_df = pd.DataFrame([manifest_data])
    manifest_path = output_dir / "manifest.parquet"
    manifest_df.to_parquet(manifest_path, compression="zstd", index=False)

    # --- run_registry.parquet ---
    run_records = []
    for _, run_row in prj.run_df.iterrows():
        run_name = run_row.get("name", "")
        basin_model = run_row.get("basin_model", "")
        met_model = run_row.get("met_model", "")
        control_spec = run_row.get("control_spec", "")

        # Look up control window from control_df
        start_date = end_date = time_interval_minutes = duration_hours = None
        if not prj.control_df.empty and control_spec:
            ctrl = prj.control_df[prj.control_df["name"] == control_spec]
            if not ctrl.empty:
                row = ctrl.iloc[0]
                start_date = row.get("start_date")
                end_date = row.get("end_date")
                time_interval_minutes = row.get("time_interval_minutes")
                duration_hours = row.get("duration_hours")

        # Locate results XML
        results_file = _find_results_xml_for_run(project_dir, run_name)

        run_records.append({
            "project_name": project_name,
            "run_name": run_name,
            "run_slug": slugify(run_name),
            "basin_model": basin_model,
            "met_model": met_model,
            "control_spec": control_spec,
            "start_date": start_date,
            "end_date": end_date,
            "time_interval_minutes": time_interval_minutes,
            "duration_hours": duration_hours,
            "last_execution_date": run_row.get("last_execution_date"),
            "last_execution_time": run_row.get("last_execution_time"),
            "results_file": str(results_file) if results_file else None,
            "has_results": results_file is not None,
        })

    run_registry_df = pd.DataFrame(run_records)
    run_registry_path = output_dir / "run_registry.parquet"
    run_registry_df.to_parquet(run_registry_path, compression="zstd", index=False)

    # --- basin_inventory.parquet ---
    basin_records = []
    for _, basin_row in prj.basin_df.iterrows():
        basin_name = basin_row.get("name", "")
        basin_records.append({
            "project_name": project_name,
            "basin_model": basin_name,
            "basin_slug": slugify(basin_name),
            "basin_file": str(basin_row.get("full_path", "")),
            "description": basin_row.get("description"),
            "num_subbasins": basin_row.get("num_subbasins"),
            "num_reaches": basin_row.get("num_reaches"),
            "num_junctions": basin_row.get("num_junctions"),
            "num_reservoirs": basin_row.get("num_reservoirs"),
            "total_area": basin_row.get("total_area"),
            "loss_methods": json.dumps(list(basin_row["loss_methods"]))
                if isinstance(basin_row.get("loss_methods"), (list, set)) else None,
            "transform_methods": json.dumps(list(basin_row["transform_methods"]))
                if isinstance(basin_row.get("transform_methods"), (list, set)) else None,
            "routing_methods": json.dumps(list(basin_row["routing_methods"]))
                if isinstance(basin_row.get("routing_methods"), (list, set)) else None,
        })

    basin_inventory_df = pd.DataFrame(basin_records)
    basin_inventory_path = output_dir / "basin_inventory.parquet"
    basin_inventory_df.to_parquet(basin_inventory_path, compression="zstd", index=False)

    return manifest_path, run_registry_path, basin_inventory_path

hms2cng.project.export_full_project(hms_file, output_dir, *, layers=None, variables=None, out_crs='EPSG:4326', skip_errors=True, sort=True)

Export an entire HMS project to a single consolidated GeoParquet.

Produces

output_dir/ {project_slug}.parquet -- all geometry + results with layer discriminator manifest.json -- JSON catalog (schema v2.0)

The layer column discriminates between geometry layers (subbasins, reaches, junctions, ...) and result variables (outflow, stage, ...). Query examples::

SELECT * FROM 'project.parquet' WHERE layer = 'subbasins'
SELECT * FROM 'project.parquet' WHERE layer = 'outflow' AND run_name = 'Run 1'

Parameters:

Name Type Description Default
hms_file Path

Path to the .hms project file or project directory.

required
output_dir Path

Root output directory for the archive.

required
layers Optional[list]

Geometry layers to export (default: all available).

None
variables Optional[list]

Result variables to export (default: Outflow, Inflow, Stage, Depth).

None
out_crs str

Output CRS (default EPSG:4326).

'EPSG:4326'
skip_errors bool

If True, skip layers/runs with errors and continue.

True
sort bool

If True, apply Hilbert spatial sorting within each layer.

True

Returns:

Type Description
dict

Summary dict with keys: parquet_file, manifest_json,

dict

geometry_rows, results_rows, errors.

Source code in hms2cng/project.py
def export_full_project(
    hms_file: Path,
    output_dir: Path,
    *,
    layers: Optional[list] = None,
    variables: Optional[list] = None,
    out_crs: str = "EPSG:4326",
    skip_errors: bool = True,
    sort: bool = True,
) -> dict:
    """Export an entire HMS project to a single consolidated GeoParquet.

    Produces:
      output_dir/
        {project_slug}.parquet   -- all geometry + results with ``layer`` discriminator
        manifest.json            -- JSON catalog (schema v2.0)

    The ``layer`` column discriminates between geometry layers (subbasins,
    reaches, junctions, ...) and result variables (outflow, stage, ...).
    Query examples::

        SELECT * FROM 'project.parquet' WHERE layer = 'subbasins'
        SELECT * FROM 'project.parquet' WHERE layer = 'outflow' AND run_name = 'Run 1'

    Args:
        hms_file: Path to the .hms project file or project directory.
        output_dir: Root output directory for the archive.
        layers: Geometry layers to export (default: all available).
        variables: Result variables to export (default: Outflow, Inflow, Stage, Depth).
        out_crs: Output CRS (default EPSG:4326).
        skip_errors: If True, skip layers/runs with errors and continue.
        sort: If True, apply Hilbert spatial sorting within each layer.

    Returns:
        Summary dict with keys: parquet_file, manifest_json,
        geometry_rows, results_rows, errors.
    """
    import geopandas as gpd
    from hms2cng.geometry import merge_all_layers
    from hms2cng.results import merge_all_variables
    from hms2cng.catalog import Manifest

    hms_file = _find_hms_file(Path(hms_file))
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    prj = _init_project(hms_file)
    project_name = prj.project_name
    project_dir = hms_file.parent

    errors: list[str] = []
    all_parts: list = []

    # Build control lookup
    ctrl_lookup: dict = {}
    if not prj.control_df.empty:
        for _, cr in prj.control_df.iterrows():
            ctrl_lookup[cr.get("name", "")] = cr

    # --- Geometry for each basin model ---
    geometry_rows = 0
    for _, basin_row in prj.basin_df.iterrows():
        basin_name = basin_row.get("name", "")
        basin_file_path = Path(basin_row.get("full_path", ""))
        if not basin_file_path.is_file():
            errors.append(f"Basin file not found: {basin_file_path}")
            if not skip_errors:
                raise FileNotFoundError(f"Basin file not found: {basin_file_path}")
            continue

        try:
            merged = merge_all_layers(
                basin_file_path,
                layers=layers,
                out_crs=out_crs,
                project_name=project_name,
                basin_model=basin_name,
                sort=sort,
            )
        except Exception as exc:
            errors.append(f"basin={basin_name!r}: {exc}")
            if not skip_errors:
                raise
            continue

        if merged is not None:
            geometry_rows += len(merged)
            all_parts.append(merged)

    # --- Results for each run ---
    results_rows = 0
    for _, run_row in prj.run_df.iterrows():
        run_name = run_row.get("name", "")
        basin_model = run_row.get("basin_model", "")
        met_model = run_row.get("met_model", "")
        control_spec = run_row.get("control_spec", "")

        results_xml = _find_results_xml_for_run(project_dir, run_name)
        if results_xml is None:
            errors.append(f"No results XML for run: {run_name!r}")
            continue

        # Find basin file for geometry merge
        basin_file: Optional[Path] = None
        if not prj.basin_df.empty:
            match = prj.basin_df[prj.basin_df["name"] == basin_model]
            if not match.empty:
                bp = Path(match.iloc[0].get("full_path", ""))
                if bp.is_file():
                    basin_file = bp

        start_date = end_date = time_interval_minutes = None
        if control_spec in ctrl_lookup:
            cr = ctrl_lookup[control_spec]
            start_date = cr.get("start_date")
            end_date = cr.get("end_date")
            time_interval_minutes = cr.get("time_interval_minutes")

        try:
            merged = merge_all_variables(
                results_xml,
                basin_file,
                variables=variables,
                out_crs=out_crs,
                project_name=project_name,
                run_name=run_name,
                basin_model=basin_model,
                met_model=met_model,
                control_spec=control_spec,
                start_date=start_date,
                end_date=end_date,
                time_interval_minutes=time_interval_minutes,
            )
        except Exception as exc:
            errors.append(f"run={run_name!r}: {exc}")
            if not skip_errors:
                raise
            continue

        if merged is not None:
            results_rows += len(merged)
            all_parts.append(merged)

    # --- Consolidate into one GeoParquet ---
    if not all_parts:
        return {
            "parquet_file": None,
            "manifest_json": None,
            "geometry_rows": 0,
            "results_rows": 0,
            "errors": errors,
        }

    all_frames = pd.concat(all_parts, ignore_index=True)
    parquet_path = output_dir / f"{slugify(project_name)}.parquet"

    if "geometry" in all_frames.columns:
        combined = gpd.GeoDataFrame(all_frames, geometry="geometry")
        for part in all_parts:
            if isinstance(part, gpd.GeoDataFrame) and part.crs is not None:
                combined = combined.set_crs(part.crs, allow_override=True)
                break
        _write_geoparquet(combined, parquet_path)
    else:
        all_frames.to_parquet(parquet_path, compression="zstd", index=False)

    # --- Write manifest.json ---
    manifest = Manifest.create(
        project_name=project_name,
        hms_version=getattr(prj, "hms_version", None),
        crs_epsg=getattr(prj, "crs_epsg", None),
        parquet_file=parquet_path.name,
        total_rows=len(all_frames),
        basin_models=list(prj.basin_df["name"]) if not prj.basin_df.empty else [],
        run_names=list(prj.run_df["name"]) if not prj.run_df.empty else [],
        errors=errors,
    )

    if "geometry" in all_frames.columns and "layer" in all_frames.columns:
        crs_str = str(combined.crs) if combined.crs else None
        for layer_name in all_frames["layer"].unique():
            subset = all_frames[all_frames["layer"] == layer_name]
            geom_type = None
            geom_col = subset.get("geometry")
            if geom_col is not None and not geom_col.isna().all():
                geom_types = gpd.GeoSeries(geom_col.dropna()).geom_type.unique()
                geom_type = geom_types[0] if len(geom_types) == 1 else "Mixed"
            manifest.add_layer(
                name=layer_name,
                rows=len(subset),
                geometry_type=geom_type,
                crs=crs_str,
            )

    manifest_path = output_dir / "manifest.json"
    manifest.write(manifest_path)

    return {
        "parquet_file": parquet_path,
        "manifest_json": manifest_path,
        "geometry_rows": geometry_rows,
        "results_rows": results_rows,
        "errors": errors,
    }

hms2cng.project.slugify(name)

Convert a run/basin name to a filesystem-safe slug.

Replaces any non-word character with an underscore and lowercases.

Parameters:

Name Type Description Default
name str

The name to slugify (e.g. "Run 1 - Calibration").

required

Returns:

Type Description
str

A lowercase slug, e.g. "run_1___calibration".

Source code in hms2cng/project.py
def slugify(name: str) -> str:
    """Convert a run/basin name to a filesystem-safe slug.

    Replaces any non-word character with an underscore and lowercases.

    Args:
        name: The name to slugify (e.g. "Run 1 - Calibration").

    Returns:
        A lowercase slug, e.g. "run_1___calibration".
    """
    return re.sub(r"[^\w]", "_", name.strip()).lower()

hms2cng.geometry

hms2cng.geometry.export_basin_geometry(basin_input, output, layer=None, *, crs_epsg=None, out_crs='EPSG:4326')

Export a basin geometry layer to GeoParquet.

Source code in hms2cng/geometry.py
def export_basin_geometry(
    basin_input: Path,
    output: Path,
    layer: Optional[str] = None,
    *,
    crs_epsg: Optional[str] = None,
    out_crs: Optional[str] = "EPSG:4326",
) -> None:
    """Export a basin geometry layer to GeoParquet."""

    _ensure_parent(Path(output))

    layer_name: LayerName = "subbasins" if layer is None else layer  # type: ignore[assignment]
    valid_layers = {
        "subbasins", "reaches", "junctions", "diversions", "reservoirs", "sources", "sinks",
        "watershed", "subbasin_polygons", "longest_flowpaths", "centroidal_flowpaths",
        "teneightyfive_flowpaths", "subbasin_statistics",
    }
    if layer_name not in valid_layers:
        raise ValueError(f"layer must be one of: {', '.join(sorted(valid_layers))}")

    gdf = get_basin_layer_gdf(
        basin_input,
        layer=layer_name,
        crs_epsg=crs_epsg,
        out_crs=out_crs,
    )

    gdf.to_parquet(output, compression="zstd")

hms2cng.geometry.get_basin_layer_gdf(basin_input, layer='subbasins', *, crs_epsg=None, out_crs='EPSG:4326')

Return a GeoDataFrame for a basin geometry layer.

Notes
  • If crs_epsg is not provided and cannot be inferred, output CRS will be unset and no transformation to out_crs will occur.
  • For "watershed", requires a .map file to construct polygons.
Source code in hms2cng/geometry.py
def get_basin_layer_gdf(
    basin_input: Path,
    layer: LayerName = "subbasins",
    *,
    crs_epsg: Optional[str] = None,
    out_crs: Optional[str] = "EPSG:4326",
) -> gpd.GeoDataFrame:
    """Return a GeoDataFrame for a basin geometry layer.

    Notes:
      - If crs_epsg is not provided and cannot be inferred, output CRS will be
        unset and no transformation to out_crs will occur.
      - For "watershed", requires a .map file to construct polygons.
    """

    basin_file = _find_basin_file(basin_input)
    project_dir = basin_file.parent

    # Try to infer CRS from HMS project if present.
    if crs_epsg is None:
        try:
            from hms_commander import init_hms_project

            h = init_hms_project(project_dir)
            crs_epsg = h.crs_epsg
        except Exception:
            crs_epsg = None

    from shapely.geometry import Point, LineString

    from hms_commander import HmsBasin

    if layer == "subbasins":
        df = HmsBasin.get_subbasins(basin_file)
        if df.empty:
            raise ValueError(f"No subbasins found in basin: {basin_file}")
        gdf = gpd.GeoDataFrame(
            df,
            geometry=[Point(xy) for xy in zip(df["canvas_x"], df["canvas_y"])],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    if layer == "junctions":
        df = HmsBasin.get_junctions(basin_file)
        if df.empty:
            raise ValueError(f"No junctions found in basin: {basin_file}")
        gdf = gpd.GeoDataFrame(
            df,
            geometry=[Point(xy) for xy in zip(df["canvas_x"], df["canvas_y"])],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    if layer == "reaches":
        df = HmsBasin.get_reaches(basin_file)
        if df.empty:
            raise ValueError(f"No reaches found in basin: {basin_file}")

        lines = []
        for _, r in df.iterrows():
            fx, fy = r.get("from_canvas_x"), r.get("from_canvas_y")
            tx, ty = r.get("canvas_x"), r.get("canvas_y")
            if pd.isna(fx) or pd.isna(fy) or pd.isna(tx) or pd.isna(ty):
                lines.append(None)
            else:
                lines.append(LineString([(float(fx), float(fy)), (float(tx), float(ty))]))

        gdf = gpd.GeoDataFrame(df, geometry=lines, crs=crs_epsg)
        return _maybe_to_crs(gdf, out_crs)

    if layer == "diversions":
        df = HmsBasin.get_diversions(basin_file)
        if df.empty:
            raise ValueError(f"No diversions found in basin: {basin_file}")
        gdf = gpd.GeoDataFrame(
            df,
            geometry=[Point(xy) for xy in zip(df["canvas_x"], df["canvas_y"])],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    if layer == "reservoirs":
        df = HmsBasin.get_reservoirs(basin_file)
        if df.empty:
            raise ValueError(f"No reservoirs found in basin: {basin_file}")
        gdf = gpd.GeoDataFrame(
            df,
            geometry=[Point(xy) for xy in zip(df["canvas_x"], df["canvas_y"])],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    if layer == "sources":
        df = HmsBasin.get_sources(basin_file)
        if df.empty:
            raise ValueError(f"No sources found in basin: {basin_file}")
        gdf = gpd.GeoDataFrame(
            df,
            geometry=[Point(xy) for xy in zip(df["canvas_x"], df["canvas_y"])],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    if layer == "sinks":
        df = HmsBasin.get_sinks(basin_file)
        if df.empty:
            raise ValueError(f"No sinks found in basin: {basin_file}")
        gdf = gpd.GeoDataFrame(
            df,
            geometry=[Point(xy) for xy in zip(df["canvas_x"], df["canvas_y"])],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    # --- SQLite-based layers (gridded models with terrain data) ---

    if layer in ("subbasin_polygons", "longest_flowpaths", "centroidal_flowpaths",
                 "teneightyfive_flowpaths", "subbasin_statistics"):
        from hms_commander import HmsSqlite

        sqlite_file = _find_sqlite_file(project_dir, basin_file)
        if sqlite_file is None:
            raise FileNotFoundError(
                f"No .sqlite file found in {project_dir}. "
                f"Layer '{layer}' requires an HMS SQLite grid database."
            )

        # Map layer names to HmsSqlite methods
        _sqlite_geo_methods = {
            "subbasin_polygons": ("get_subbasins", "subbasin polygons"),
            "longest_flowpaths": ("get_longest_flowpaths", "longest flowpaths"),
            "centroidal_flowpaths": ("get_centroidal_flowpaths", "centroidal flowpaths"),
            "teneightyfive_flowpaths": ("get_teneightyfive_flowpaths", "10-85 flowpaths"),
        }

        if layer in _sqlite_geo_methods:
            method_name, label = _sqlite_geo_methods[layer]
            try:
                gdf = getattr(HmsSqlite, method_name)(sqlite_file)
            except ValueError as exc:
                raise ValueError(f"No {label} found in: {sqlite_file}") from exc
            if gdf.empty:
                raise ValueError(f"No {label} found in: {sqlite_file}")
            return _maybe_to_crs(gdf, out_crs)

        if layer == "subbasin_statistics":
            try:
                df = HmsSqlite.get_subbasin_statistics(sqlite_file)
            except ValueError as exc:
                raise ValueError(f"No subbasin statistics found in: {sqlite_file}") from exc
            if df.empty:
                raise ValueError(f"No subbasin statistics found in: {sqlite_file}")
            # Non-spatial: use subbasin point geometry for spatial reference
            sub_df = HmsBasin.get_subbasins(basin_file)
            if not sub_df.empty:
                from shapely.geometry import Point
                sub_gdf = gpd.GeoDataFrame(
                    sub_df[["name", "canvas_x", "canvas_y"]],
                    geometry=[Point(xy) for xy in zip(sub_df["canvas_x"], sub_df["canvas_y"])],
                    crs=crs_epsg,
                )
                merged = sub_gdf.merge(df, left_on="name", right_on="subbasin_name", how="right")
                merged = gpd.GeoDataFrame(merged, geometry="geometry", crs=crs_epsg)
                return _maybe_to_crs(merged, out_crs)
            # Fallback: return as plain DataFrame (no geometry)
            return gpd.GeoDataFrame(df)

    if layer == "watershed":
        # Requires a .map file.
        map_files = sorted(project_dir.glob("*.map"))
        if not map_files:
            raise FileNotFoundError(
                f"No .map file found in {project_dir}. "
                "Watershed boundary extraction requires the HMS map file."
            )

        from hms_commander import HmsGeo
        from shapely.geometry import Polygon
        from shapely.ops import unary_union

        map_data = HmsGeo.parse_map_file(map_files[0])
        boundaries = map_data.get("boundaries", [])
        polys = []
        for b in boundaries:
            coords = b.get("coordinates") or []
            if len(coords) < 3:
                continue
            # ensure closed
            if coords[0] != coords[-1]:
                coords = coords + [coords[0]]
            try:
                polys.append(Polygon(coords))
            except Exception:
                continue

        if not polys:
            raise ValueError(f"No boundary polygons found in map file: {map_files[0]}")

        union = unary_union(polys)
        gdf = gpd.GeoDataFrame(
            [{"name": "watershed"}],
            geometry=[union],
            crs=crs_epsg,
        )
        return _maybe_to_crs(gdf, out_crs)

    raise ValueError(f"Unknown layer: {layer}")

hms2cng.geometry.export_all_basin_geometry(hms_file, output_dir, *, layers=None, out_crs='EPSG:4326', skip_errors=True)

Export geometry for ALL basin models in an HMS project.

Output: output_dir/{basin_slug}/{layer}.parquet Each file gains 'project_name' and 'basin_model' columns.

Parameters:

Name Type Description Default
hms_file Path

Path to the .hms project file or project directory.

required
output_dir Path

Root output directory (geometry files go in subdirs per basin).

required
layers Optional[list]

Geometry layers to attempt. Defaults to ["subbasins", "junctions", "reaches", "watershed", "subbasin_polygons", "longest_flowpaths"].

None
out_crs Optional[str]

Output CRS (default EPSG:4326).

'EPSG:4326'
skip_errors bool

If True, skip layers that fail; otherwise raise.

True

Returns:

Type Description
list

List of Path objects for created parquet files.

Source code in hms2cng/geometry.py
def export_all_basin_geometry(
    hms_file: Path,
    output_dir: Path,
    *,
    layers: Optional[list] = None,
    out_crs: Optional[str] = "EPSG:4326",
    skip_errors: bool = True,
) -> list:
    """Export geometry for ALL basin models in an HMS project.

    Output: output_dir/{basin_slug}/{layer}.parquet
    Each file gains 'project_name' and 'basin_model' columns.

    Args:
        hms_file: Path to the .hms project file or project directory.
        output_dir: Root output directory (geometry files go in subdirs per basin).
        layers: Geometry layers to attempt. Defaults to
            ["subbasins", "junctions", "reaches", "watershed",
             "subbasin_polygons", "longest_flowpaths"].
        out_crs: Output CRS (default EPSG:4326).
        skip_errors: If True, skip layers that fail; otherwise raise.

    Returns:
        List of Path objects for created parquet files.
    """
    from hms2cng.project import _find_hms_file, _init_project, slugify

    hms_file = _find_hms_file(Path(hms_file))
    output_dir = Path(output_dir)
    layers_to_try = layers if layers is not None else _DEFAULT_LAYERS

    prj = _init_project(hms_file)
    project_name = prj.project_name
    created: list = []

    for _, basin_row in prj.basin_df.iterrows():
        basin_name = basin_row.get("name", "")
        basin_file_path = Path(basin_row.get("full_path", ""))
        if not basin_file_path.is_file():
            if skip_errors:
                continue
            raise FileNotFoundError(f"Basin file not found: {basin_file_path}")

        slug = slugify(basin_name)
        basin_out_dir = output_dir / slug
        basin_out_dir.mkdir(parents=True, exist_ok=True)

        for layer in layers_to_try:
            out_path = basin_out_dir / f"{layer}.parquet"
            try:
                gdf = get_basin_layer_gdf(
                    basin_file_path,
                    layer=layer,  # type: ignore[arg-type]
                    out_crs=out_crs,
                )
                gdf = _add_provenance(gdf, project_name=project_name, basin_model=basin_name)
                gdf.to_parquet(out_path, compression="zstd")
                created.append(out_path)
            except (ValueError, FileNotFoundError):
                # Layer not available for this basin — skip silently
                continue
            except Exception:
                if skip_errors:
                    continue
                raise

    return created

hms2cng.results

hms2cng.results.export_hms_results(results_input, output, *, element_type='subbasin', variable='Flow Out', crs_epsg=None, out_crs='EPSG:4326')

Export HMS results summary statistics to GeoParquet.

  • If given a folder, we look for RUN_*.results XML and parse it.
  • Geometry is inferred from the parent HMS project (find .hms and .basin).

This currently exports summary statistics (peak/min/avg). Full time-series export can be added once DSS reading dependencies are standardized.

Source code in hms2cng/results.py
def export_hms_results(
    results_input: Path,
    output: Path,
    *,
    element_type: str = "subbasin",
    variable: str = "Flow Out",
    crs_epsg: Optional[str] = None,
    out_crs: Optional[str] = "EPSG:4326",
) -> None:
    """Export HMS results summary statistics to GeoParquet.

    - If given a folder, we look for RUN_*.results XML and parse it.
    - Geometry is inferred from the parent HMS project (find *.hms and *.basin).

    This currently exports *summary* statistics (peak/min/avg). Full time-series
    export can be added once DSS reading dependencies are standardized.
    """

    _ensure_parent(Path(output))

    et: ElementType
    et_lower = (element_type or "subbasin").strip().lower()
    valid_types = {"subbasin", "reach", "junction", "diversion", "reservoir", "source", "sink", "all"}
    if et_lower not in valid_types:
        raise ValueError(f"--type must be one of: {', '.join(sorted(valid_types))}")
    et = et_lower  # type: ignore[assignment]

    results_path = Path(results_input)

    # Resolve the XML file.
    if results_path.is_dir() and results_path.name.lower() != "results":
        # user passed project dir maybe; allow .../ProjectRoot
        if (results_path / "results").is_dir():
            results_xml = _find_results_xml(results_path / "results")
        else:
            results_xml = _find_results_xml(results_path)
    else:
        results_xml = _find_results_xml(results_path)

    # Infer project + basin file for geometry.
    project_dir = _find_project_dir(results_xml)
    basin_file = _find_basin_file_from_project(project_dir) if project_dir else None

    stats_df = _parse_results_xml_statistics(results_xml, element_type=et, variable=variable)

    if basin_file is None:
        # No geometry available; write plain Parquet.
        stats_df.to_parquet(output, compression="zstd", index=False)
        return

    # Build geometry for the requested element type(s)
    if et == "all":
        layers = []
        for lyr, e in [
            ("subbasins", "subbasin"), ("junctions", "junction"), ("reaches", "reach"),
            ("diversions", "diversion"), ("reservoirs", "reservoir"),
            ("sources", "source"), ("sinks", "sink"),
        ]:
            try:
                geom = get_basin_layer_gdf(basin_file, layer=lyr, crs_epsg=crs_epsg, out_crs=out_crs)
            except Exception:
                continue
            geom["element_type"] = e
            layers.append(geom)
        if not layers:
            # fallback without geometry
            stats_df.to_parquet(output, compression="zstd", index=False)
            return
        geom_gdf = pd.concat(layers, ignore_index=True)
        geom_gdf = gpd.GeoDataFrame(geom_gdf, geometry="geometry", crs=layers[0].crs)

        merged = geom_gdf.merge(stats_df, on=["name", "element_type"], how="left")
        merged.to_parquet(output, compression="zstd")
        return

    # single element type
    layer_map = {
        "subbasin": "subbasins", "junction": "junctions", "reach": "reaches",
        "diversion": "diversions", "reservoir": "reservoirs",
        "source": "sources", "sink": "sinks",
    }
    geom_gdf = get_basin_layer_gdf(basin_file, layer=layer_map[et], crs_epsg=crs_epsg, out_crs=out_crs)

    merged = geom_gdf.merge(stats_df, on="name", how="left")
    merged.to_parquet(output, compression="zstd")

hms2cng.results.export_all_results(hms_file, output_dir, *, variables=None, out_crs='EPSG:4326', skip_errors=True)

Export results for ALL runs in an HMS project.

Output: output_dir/{run_slug}/{variable_slug}.parquet Each file is enriched with: project_name, run_name, basin_model, met_model, control_spec, start_date, end_date, time_interval_minutes.

Parameters:

Name Type Description Default
hms_file Path

Path to the .hms project file or project directory.

required
output_dir Path

Root output directory (results go in per-run subdirs).

required
variables Optional[list]

Result variables to export. Defaults to ["Outflow"].

None
out_crs Optional[str]

Output CRS (default EPSG:4326).

'EPSG:4326'
skip_errors bool

If True, skip runs/variables that fail; otherwise raise.

True

Returns:

Type Description
tuple

Tuple of (created_paths: list[Path], errors: list[str]).

Source code in hms2cng/results.py
def export_all_results(
    hms_file: Path,
    output_dir: Path,
    *,
    variables: Optional[list] = None,
    out_crs: Optional[str] = "EPSG:4326",
    skip_errors: bool = True,
) -> tuple:
    """Export results for ALL runs in an HMS project.

    Output: output_dir/{run_slug}/{variable_slug}.parquet
    Each file is enriched with: project_name, run_name, basin_model,
    met_model, control_spec, start_date, end_date, time_interval_minutes.

    Args:
        hms_file: Path to the .hms project file or project directory.
        output_dir: Root output directory (results go in per-run subdirs).
        variables: Result variables to export. Defaults to ["Outflow"].
        out_crs: Output CRS (default EPSG:4326).
        skip_errors: If True, skip runs/variables that fail; otherwise raise.

    Returns:
        Tuple of (created_paths: list[Path], errors: list[str]).
    """
    import re
    from hms2cng.project import _find_hms_file, _init_project, slugify, _find_results_xml_for_run

    hms_file = _find_hms_file(Path(hms_file))
    output_dir = Path(output_dir)
    vars_to_export = variables if variables is not None else ["Outflow"]

    prj = _init_project(hms_file)
    project_name = prj.project_name
    project_dir = hms_file.parent

    # Build control lookup: control_spec name -> row
    ctrl_lookup: dict = {}
    if not prj.control_df.empty:
        for _, cr in prj.control_df.iterrows():
            ctrl_lookup[cr.get("name", "")] = cr

    created: list = []
    errors: list = []

    for _, run_row in prj.run_df.iterrows():
        run_name = run_row.get("name", "")
        basin_model = run_row.get("basin_model", "")
        met_model = run_row.get("met_model", "")
        control_spec = run_row.get("control_spec", "")
        run_slug = slugify(run_name)

        # Locate results XML
        results_xml = _find_results_xml_for_run(project_dir, run_name)
        if results_xml is None:
            errors.append(f"No results XML found for run: {run_name!r}")
            continue

        # Control window metadata
        start_date = end_date = time_interval_minutes = None
        if control_spec in ctrl_lookup:
            cr = ctrl_lookup[control_spec]
            start_date = cr.get("start_date")
            end_date = cr.get("end_date")
            time_interval_minutes = cr.get("time_interval_minutes")

        # Find basin file for geometry
        basin_file: Optional[Path] = None
        if not prj.basin_df.empty:
            match = prj.basin_df[prj.basin_df["name"] == basin_model]
            if not match.empty:
                bp = Path(match.iloc[0].get("full_path", ""))
                if bp.is_file():
                    basin_file = bp

        run_out_dir = output_dir / run_slug
        run_out_dir.mkdir(parents=True, exist_ok=True)

        for variable in vars_to_export:
            var_slug = re.sub(r"[^\w]", "_", variable.strip()).lower()
            out_path = run_out_dir / f"{var_slug}.parquet"
            try:
                stats_df = _parse_results_xml_statistics(
                    results_xml,
                    element_type="all",
                    variable=variable,
                )
            except (ValueError, FileNotFoundError) as exc:
                errors.append(f"run={run_name!r}, var={variable!r}: {exc}")
                if not skip_errors:
                    raise
                continue
            except Exception as exc:
                errors.append(f"run={run_name!r}, var={variable!r}: {exc}")
                if not skip_errors:
                    raise
                continue

            # Enrich with lineage columns
            for col, val in [
                ("project_name", project_name),
                ("run_name", run_name),
                ("basin_model", basin_model),
                ("met_model", met_model),
                ("control_spec", control_spec),
                ("start_date", start_date),
                ("end_date", end_date),
                ("time_interval_minutes", time_interval_minutes),
            ]:
                stats_df[col] = val

            # Merge geometry if available
            if basin_file is not None:
                layer_map = {
                    "subbasin": "subbasins", "junction": "junctions", "reach": "reaches",
                    "diversion": "diversions", "reservoir": "reservoirs",
                    "source": "sources", "sink": "sinks",
                }
                geo_parts = []
                for et, lyr in layer_map.items():
                    subset = stats_df[stats_df["element_type"] == et].copy()
                    if subset.empty:
                        continue
                    try:
                        geom_gdf = get_basin_layer_gdf(
                            basin_file, layer=lyr, out_crs=out_crs  # type: ignore[arg-type]
                        )
                        merged = geom_gdf.merge(
                            subset.drop(columns=["element_type"], errors="ignore"),
                            on="name",
                            how="inner",
                        )
                        if not merged.empty:
                            merged["element_type"] = et
                            geo_parts.append(merged)
                    except Exception:
                        # Layer not available: keep stats without geometry
                        geo_parts.append(subset)

                if geo_parts:
                    import geopandas as gpd
                    # Combine: some parts may be plain DataFrames (no geometry)
                    geo_gdfs = [p for p in geo_parts if isinstance(p, gpd.GeoDataFrame)]
                    plain_dfs = [p for p in geo_parts if not isinstance(p, gpd.GeoDataFrame)]
                    if geo_gdfs:
                        combined = gpd.GeoDataFrame(
                            pd.concat(geo_gdfs + plain_dfs, ignore_index=True),
                            geometry="geometry",
                            crs=geo_gdfs[0].crs,
                        )
                        combined.to_parquet(out_path, compression="zstd")
                    else:
                        pd.concat(plain_dfs, ignore_index=True).to_parquet(
                            out_path, compression="zstd", index=False
                        )
                else:
                    stats_df.to_parquet(out_path, compression="zstd", index=False)
            else:
                stats_df.to_parquet(out_path, compression="zstd", index=False)

            created.append(out_path)

    return created, errors

hms2cng.duckdb_session

hms2cng.duckdb_session.DuckSession

DuckDB session with spatial extension pre-loaded

Source code in hms2cng/duckdb_session.py
class DuckSession:
    """DuckDB session with spatial extension pre-loaded"""

    def __init__(self, db_path: str = ":memory:"):
        self.con = duckdb.connect(db_path)
        self.con.execute("INSTALL spatial; LOAD spatial;")

    def register_parquet(self, path: Path, name: str = "_"):
        """Register a Parquet file as a table view"""
        self.con.execute(f"CREATE OR REPLACE VIEW {name} AS SELECT * FROM read_parquet('{path}');")
        return self

    def query(self, sql: str) -> pd.DataFrame:
        """Execute SQL query and return DataFrame"""
        return self.con.execute(sql).df()

    def close(self):
        self.con.close()

register_parquet(path, name='_')

Register a Parquet file as a table view

Source code in hms2cng/duckdb_session.py
def register_parquet(self, path: Path, name: str = "_"):
    """Register a Parquet file as a table view"""
    self.con.execute(f"CREATE OR REPLACE VIEW {name} AS SELECT * FROM read_parquet('{path}');")
    return self

query(sql)

Execute SQL query and return DataFrame

Source code in hms2cng/duckdb_session.py
def query(self, sql: str) -> pd.DataFrame:
    """Execute SQL query and return DataFrame"""
    return self.con.execute(sql).df()

hms2cng.duckdb_session.query_parquet(input_file, sql)

Quick helper to query a single GeoParquet file.

Parameters:

Name Type Description Default
input_file Path

Path to GeoParquet file

required
sql str

SQL query (use '_' as table name)

required

Returns:

Type Description
DataFrame

pandas DataFrame with query results

Example

df = query_parquet("subbasins.parquet", "SELECT * FROM _ WHERE max_value > 1000")

Source code in hms2cng/duckdb_session.py
def query_parquet(input_file: Path, sql: str) -> pd.DataFrame:
    """
    Quick helper to query a single GeoParquet file.

    Args:
        input_file: Path to GeoParquet file
        sql: SQL query (use '_' as table name)

    Returns:
        pandas DataFrame with query results

    Example:
        df = query_parquet("subbasins.parquet", "SELECT * FROM _ WHERE max_value > 1000")
    """
    session = DuckSession()
    session.register_parquet(input_file)
    try:
        return session.query(sql)
    finally:
        session.close()

hms2cng.duckdb_session.spatial_join(left_file, right_file, predicate='ST_Intersects', output_file=None)

Perform spatial join between two GeoParquet files.

Parameters:

Name Type Description Default
left_file Path

Left GeoParquet file

required
right_file Path

Right GeoParquet file

required
predicate str

Spatial predicate (ST_Intersects, ST_Contains, ST_Within, etc.)

'ST_Intersects'
output_file Optional[Path]

Optional output file for results

None

Returns:

Type Description
DataFrame

DataFrame with joined results

Source code in hms2cng/duckdb_session.py
def spatial_join(
    left_file: Path,
    right_file: Path,
    predicate: str = "ST_Intersects",
    output_file: Optional[Path] = None
) -> pd.DataFrame:
    """
    Perform spatial join between two GeoParquet files.

    Args:
        left_file: Left GeoParquet file
        right_file: Right GeoParquet file
        predicate: Spatial predicate (ST_Intersects, ST_Contains, ST_Within, etc.)
        output_file: Optional output file for results

    Returns:
        DataFrame with joined results
    """
    session = DuckSession()
    session.register_parquet(left_file, "left")
    session.register_parquet(right_file, "right")

    sql = f"""
    SELECT l.*, r.*
    FROM left AS l
    JOIN right AS r
    ON {predicate}(l.geometry, r.geometry)
    """

    df = session.query(sql)
    session.close()

    if output_file:
        df.to_parquet(output_file, index=False)

    return df

hms2cng.pmtiles

hms2cng.pmtiles.generate_pmtiles_from_input(input_file, output, layer_name='layer', min_zoom=None, max_zoom=None)

Generate PMTiles (or MBTiles) from GeoParquet (vector).

Source code in hms2cng/pmtiles.py
def generate_pmtiles_from_input(
    input_file: Path,
    output: Path,
    layer_name: str = "layer",
    min_zoom: Optional[int] = None,
    max_zoom: Optional[int] = None,
):
    """Generate PMTiles (or MBTiles) from GeoParquet (vector)."""

    input_path = Path(input_file)
    output_path = Path(output)

    if input_path.suffix.lower() not in {".parquet", ".gpq"}:
        raise ValueError(f"Unsupported input format: {input_path.suffix}")

    if output_path.suffix.lower() not in {".pmtiles", ".mbtiles"}:
        raise ValueError("Output must end with .pmtiles or .mbtiles")

    generate_vector_tiles(
        input_path,
        output_path,
        layer_name=layer_name,
        min_zoom=min_zoom,
        max_zoom=max_zoom,
    )

hms2cng.postgis_sync

hms2cng.postgis_sync.sync_to_postgres(input_file, postgres_uri, table_name, schema='public', if_exists='replace')

Sync GeoParquet file to PostGIS table.

Parameters:

Name Type Description Default
input_file Path

Path to GeoParquet file

required
postgres_uri str

PostgreSQL connection URI (postgresql://user:pass@host:port/db)

required
table_name str

Target table name

required
schema str

Target schema

'public'
if_exists str

'replace', 'append', or 'fail'

'replace'
Source code in hms2cng/postgis_sync.py
def sync_to_postgres(
    input_file: Path,
    postgres_uri: str,
    table_name: str,
    schema: str = "public",
    if_exists: str = "replace"
):
    """
    Sync GeoParquet file to PostGIS table.

    Args:
        input_file: Path to GeoParquet file
        postgres_uri: PostgreSQL connection URI (postgresql://user:pass@host:port/db)
        table_name: Target table name
        schema: Target schema
        if_exists: 'replace', 'append', or 'fail'
    """
    # Read GeoParquet
    gdf = gpd.read_parquet(input_file)

    # Create SQLAlchemy engine
    # Prefer psycopg (v3) driver if available.
    uri = postgres_uri
    if uri.startswith("postgresql://"):
        # SQLAlchemy defaults to psycopg2 for postgresql://. If users installed psycopg (v3),
        # rewrite to the explicit dialect.
        uri_psycopg = "postgresql+psycopg://" + uri[len("postgresql://"):]
    else:
        uri_psycopg = uri

    try:
        engine = create_engine(uri)
    except ModuleNotFoundError as e:
        # Common on fresh Windows installs: psycopg2 not installed.
        if "psycopg2" in str(e):
            engine = create_engine(uri_psycopg, connect_args={"client_encoding": "UTF8"})
        else:
            raise

    # Ensure geometry column is properly typed
    if 'geometry' in gdf.columns:
        gdf = gdf.set_geometry('geometry')

    # Write to PostGIS
    gdf.to_postgis(
        table_name,
        engine,
        schema=schema,
        if_exists=if_exists,
        index=False
    )

    # Create spatial index if geometry exists
    if 'geometry' in gdf.columns:
        with engine.connect() as conn:
            conn.execute(text(f"""
                CREATE INDEX IF NOT EXISTS {table_name}_geom_idx 
                ON {schema}.{table_name} 
                USING GIST (geometry)
            """))
            conn.commit()