#!/usr/bin/env python3 """Acquire the best available USGS 3DEP DEM source for Ground Zero. This script queries the official USGS TNMAccess API for the selected 1 km tile, stores the product metadata, downloads the chosen 1-meter GeoTIFF source, and updates the tile registry source record. """ from __future__ import annotations import argparse import json import time import urllib.parse import urllib.request from pathlib import Path from prototype_ground_zero_terrain import TARGET_TILE_ID, utm_to_lat_lon PROJECT_ROOT = Path(__file__).resolve().parents[1] REGISTRY_PATH = PROJECT_ROOT / "Data" / "Tiles" / "ground_zero_tiles.json" SOURCE_ROOT = PROJECT_ROOT / "Data" / "Terrain" / "Sources" / TARGET_TILE_ID TNM_PRODUCTS_URL = "https://tnmaccess.nationalmap.gov/api/v1/products" TARGET_DATASET = "Digital Elevation Model (DEM) 1 meter" def load_tile() -> dict: registry = json.loads(REGISTRY_PATH.read_text()) for tile in registry["tiles"]: if tile["tile_id"] == TARGET_TILE_ID: return tile raise RuntimeError(f"Could not find {TARGET_TILE_ID} in {REGISTRY_PATH}") def tile_bbox_lon_lat(tile: dict) -> tuple[float, float, float, float]: grid = tile["grid"] corners = [ (grid["easting_min_m"], grid["northing_min_m"]), (grid["easting_max_m"], grid["northing_min_m"]), (grid["easting_max_m"], grid["northing_max_m"]), (grid["easting_min_m"], grid["northing_max_m"]), ] lat_lons = [utm_to_lat_lon(easting, northing, 10, True) for easting, northing in corners] return ( min(lon for _, lon in lat_lons), min(lat for lat, _ in lat_lons), max(lon for _, lon in lat_lons), max(lat for lat, _ in lat_lons), ) def query_tnm_products(bbox: tuple[float, float, float, float]) -> dict: params = urllib.parse.urlencode( { "bbox": ",".join(f"{value:.10f}" for value in bbox), "datasets": TARGET_DATASET, "prodFormats": "GeoTIFF", "outputFormat": "JSON", } ) url = f"{TNM_PRODUCTS_URL}?{params}" with urllib.request.urlopen(url, timeout=60) as response: payload = json.loads(response.read().decode("utf-8")) payload["query_url"] = url return payload def choose_product(products: dict) -> dict: items = products.get("items", []) if not items: raise RuntimeError("TNMAccess returned no 1-meter DEM products for Ground Zero") def product_score(item: dict) -> tuple[int, str]: title = item.get("title", "") size = int(item.get("sizeInBytes") or 0) is_geotiff = 1 if item.get("format") == "GeoTIFF" else 0 has_download = 1 if item.get("downloadURL") else 0 return (has_download + is_geotiff, f"{size:020d}_{title}") return sorted(items, key=product_score, reverse=True)[0] def coverage_products(products: dict) -> list[dict]: items = products.get("items", []) if not items: raise RuntimeError("TNMAccess returned no 1-meter DEM products for Ground Zero") return [item for item in items if item.get("downloadURL")] def download_file(url: str, output_path: Path) -> None: if output_path.exists() and output_path.stat().st_size > 0: print(f"Using existing download: {output_path}") return temp_path = output_path.with_suffix(output_path.suffix + ".part") with urllib.request.urlopen(url, timeout=120) as response, temp_path.open("wb") as output_file: while True: chunk = response.read(1024 * 1024) if not chunk: break output_file.write(chunk) temp_path.replace(output_path) def update_registry_source(product: dict, metadata_path: Path, geotiff_path: Path) -> None: registry = json.loads(REGISTRY_PATH.read_text()) for tile in registry["tiles"]: if tile["tile_id"] != TARGET_TILE_ID: continue for source in tile["sources"]: if source["source_kind"] == "elevation": source["source_name"] = product.get("title", source["source_name"]) source["source_uri"] = product.get("downloadURL", source["source_uri"]) source["source_version"] = product.get("publicationDate", "downloaded") source["coverage_status"] = "confirmed" source["local_metadata_path"] = str(metadata_path.relative_to(PROJECT_ROOT)) source["local_source_path"] = str(geotiff_path.relative_to(PROJECT_ROOT)) source["local_source_folder"] = str(geotiff_path.parent.relative_to(PROJECT_ROOT)) tile["status"] = "source_data_found" tile["notes"] = ( "Final MVP 1-meter USGS DEM source acquired. " "Prototype heightmap remains generated separately until DEM extraction/import is run." ) break else: raise RuntimeError(f"Could not update missing tile {TARGET_TILE_ID}") REGISTRY_PATH.write_text(json.dumps(registry, indent=2) + "\n") def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--metadata-only", action="store_true", help="Query and write metadata without downloading the GeoTIFF") args = parser.parse_args() tile = load_tile() bbox = tile_bbox_lon_lat(tile) SOURCE_ROOT.mkdir(parents=True, exist_ok=True) products = query_tnm_products(bbox) products_to_download = coverage_products(products) product = choose_product(products) metadata = { "tile_id": TARGET_TILE_ID, "acquired_at_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "tnm_query": { "url": products["query_url"], "bbox_lon_lat": bbox, "dataset": TARGET_DATASET, "product_count": products.get("total"), }, "selected_product": product, "coverage_products": products_to_download, "all_products": products.get("items", []), } metadata_path = SOURCE_ROOT / f"{TARGET_TILE_ID}_tnm_1m_dem_product.json" metadata_path.write_text(json.dumps(metadata, indent=2) + "\n") geotiff_paths = [] for coverage_product in products_to_download: download_url = coverage_product.get("downloadURL") filename = Path(urllib.parse.urlparse(download_url).path).name geotiff_path = SOURCE_ROOT / filename geotiff_paths.append(geotiff_path) if not args.metadata_only: download_file(download_url, geotiff_path) update_registry_source(product, metadata_path, geotiff_paths[0]) print(f"Selected: {product.get('title')}") print(f"Published: {product.get('publicationDate')}") print(f"Size: {product.get('sizeInBytes')} bytes") print(f"Metadata: {metadata_path}") for geotiff_path in geotiff_paths: if args.metadata_only: print(f"GeoTIFF download skipped: {geotiff_path}") else: print(f"GeoTIFF: {geotiff_path}") if __name__ == "__main__": main()