Add Ground Zero terrain pipeline and playable assets
This commit is contained in:
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Acquire the best available USGS 3DEP DEM source for Ground Zero.
|
||||
|
||||
This script queries the official USGS TNMAccess API for the selected 1 km tile,
|
||||
stores the product metadata, downloads the chosen 1-meter GeoTIFF source, and
|
||||
updates the tile registry source record.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import time
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
from prototype_ground_zero_terrain import TARGET_TILE_ID, utm_to_lat_lon
|
||||
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
REGISTRY_PATH = PROJECT_ROOT / "Data" / "Tiles" / "ground_zero_tiles.json"
|
||||
SOURCE_ROOT = PROJECT_ROOT / "Data" / "Terrain" / "Sources" / TARGET_TILE_ID
|
||||
TNM_PRODUCTS_URL = "https://tnmaccess.nationalmap.gov/api/v1/products"
|
||||
TARGET_DATASET = "Digital Elevation Model (DEM) 1 meter"
|
||||
|
||||
|
||||
def load_tile() -> dict:
|
||||
registry = json.loads(REGISTRY_PATH.read_text())
|
||||
for tile in registry["tiles"]:
|
||||
if tile["tile_id"] == TARGET_TILE_ID:
|
||||
return tile
|
||||
raise RuntimeError(f"Could not find {TARGET_TILE_ID} in {REGISTRY_PATH}")
|
||||
|
||||
|
||||
def tile_bbox_lon_lat(tile: dict) -> tuple[float, float, float, float]:
|
||||
grid = tile["grid"]
|
||||
corners = [
|
||||
(grid["easting_min_m"], grid["northing_min_m"]),
|
||||
(grid["easting_max_m"], grid["northing_min_m"]),
|
||||
(grid["easting_max_m"], grid["northing_max_m"]),
|
||||
(grid["easting_min_m"], grid["northing_max_m"]),
|
||||
]
|
||||
lat_lons = [utm_to_lat_lon(easting, northing, 10, True) for easting, northing in corners]
|
||||
return (
|
||||
min(lon for _, lon in lat_lons),
|
||||
min(lat for lat, _ in lat_lons),
|
||||
max(lon for _, lon in lat_lons),
|
||||
max(lat for lat, _ in lat_lons),
|
||||
)
|
||||
|
||||
|
||||
def query_tnm_products(bbox: tuple[float, float, float, float]) -> dict:
|
||||
params = urllib.parse.urlencode(
|
||||
{
|
||||
"bbox": ",".join(f"{value:.10f}" for value in bbox),
|
||||
"datasets": TARGET_DATASET,
|
||||
"prodFormats": "GeoTIFF",
|
||||
"outputFormat": "JSON",
|
||||
}
|
||||
)
|
||||
url = f"{TNM_PRODUCTS_URL}?{params}"
|
||||
with urllib.request.urlopen(url, timeout=60) as response:
|
||||
payload = json.loads(response.read().decode("utf-8"))
|
||||
payload["query_url"] = url
|
||||
return payload
|
||||
|
||||
|
||||
def choose_product(products: dict) -> dict:
|
||||
items = products.get("items", [])
|
||||
if not items:
|
||||
raise RuntimeError("TNMAccess returned no 1-meter DEM products for Ground Zero")
|
||||
|
||||
def product_score(item: dict) -> tuple[int, str]:
|
||||
title = item.get("title", "")
|
||||
size = int(item.get("sizeInBytes") or 0)
|
||||
is_geotiff = 1 if item.get("format") == "GeoTIFF" else 0
|
||||
has_download = 1 if item.get("downloadURL") else 0
|
||||
return (has_download + is_geotiff, f"{size:020d}_{title}")
|
||||
|
||||
return sorted(items, key=product_score, reverse=True)[0]
|
||||
|
||||
|
||||
def coverage_products(products: dict) -> list[dict]:
|
||||
items = products.get("items", [])
|
||||
if not items:
|
||||
raise RuntimeError("TNMAccess returned no 1-meter DEM products for Ground Zero")
|
||||
return [item for item in items if item.get("downloadURL")]
|
||||
|
||||
|
||||
def download_file(url: str, output_path: Path) -> None:
|
||||
if output_path.exists() and output_path.stat().st_size > 0:
|
||||
print(f"Using existing download: {output_path}")
|
||||
return
|
||||
|
||||
temp_path = output_path.with_suffix(output_path.suffix + ".part")
|
||||
with urllib.request.urlopen(url, timeout=120) as response, temp_path.open("wb") as output_file:
|
||||
while True:
|
||||
chunk = response.read(1024 * 1024)
|
||||
if not chunk:
|
||||
break
|
||||
output_file.write(chunk)
|
||||
temp_path.replace(output_path)
|
||||
|
||||
|
||||
def update_registry_source(product: dict, metadata_path: Path, geotiff_path: Path) -> None:
|
||||
registry = json.loads(REGISTRY_PATH.read_text())
|
||||
for tile in registry["tiles"]:
|
||||
if tile["tile_id"] != TARGET_TILE_ID:
|
||||
continue
|
||||
|
||||
for source in tile["sources"]:
|
||||
if source["source_kind"] == "elevation":
|
||||
source["source_name"] = product.get("title", source["source_name"])
|
||||
source["source_uri"] = product.get("downloadURL", source["source_uri"])
|
||||
source["source_version"] = product.get("publicationDate", "downloaded")
|
||||
source["coverage_status"] = "confirmed"
|
||||
source["local_metadata_path"] = str(metadata_path.relative_to(PROJECT_ROOT))
|
||||
source["local_source_path"] = str(geotiff_path.relative_to(PROJECT_ROOT))
|
||||
source["local_source_folder"] = str(geotiff_path.parent.relative_to(PROJECT_ROOT))
|
||||
|
||||
tile["status"] = "source_data_found"
|
||||
tile["notes"] = (
|
||||
"Final MVP 1-meter USGS DEM source acquired. "
|
||||
"Prototype heightmap remains generated separately until DEM extraction/import is run."
|
||||
)
|
||||
break
|
||||
else:
|
||||
raise RuntimeError(f"Could not update missing tile {TARGET_TILE_ID}")
|
||||
|
||||
REGISTRY_PATH.write_text(json.dumps(registry, indent=2) + "\n")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--metadata-only", action="store_true", help="Query and write metadata without downloading the GeoTIFF")
|
||||
args = parser.parse_args()
|
||||
|
||||
tile = load_tile()
|
||||
bbox = tile_bbox_lon_lat(tile)
|
||||
SOURCE_ROOT.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
products = query_tnm_products(bbox)
|
||||
products_to_download = coverage_products(products)
|
||||
product = choose_product(products)
|
||||
|
||||
metadata = {
|
||||
"tile_id": TARGET_TILE_ID,
|
||||
"acquired_at_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"tnm_query": {
|
||||
"url": products["query_url"],
|
||||
"bbox_lon_lat": bbox,
|
||||
"dataset": TARGET_DATASET,
|
||||
"product_count": products.get("total"),
|
||||
},
|
||||
"selected_product": product,
|
||||
"coverage_products": products_to_download,
|
||||
"all_products": products.get("items", []),
|
||||
}
|
||||
metadata_path = SOURCE_ROOT / f"{TARGET_TILE_ID}_tnm_1m_dem_product.json"
|
||||
metadata_path.write_text(json.dumps(metadata, indent=2) + "\n")
|
||||
|
||||
geotiff_paths = []
|
||||
for coverage_product in products_to_download:
|
||||
download_url = coverage_product.get("downloadURL")
|
||||
filename = Path(urllib.parse.urlparse(download_url).path).name
|
||||
geotiff_path = SOURCE_ROOT / filename
|
||||
geotiff_paths.append(geotiff_path)
|
||||
if not args.metadata_only:
|
||||
download_file(download_url, geotiff_path)
|
||||
|
||||
update_registry_source(product, metadata_path, geotiff_paths[0])
|
||||
|
||||
print(f"Selected: {product.get('title')}")
|
||||
print(f"Published: {product.get('publicationDate')}")
|
||||
print(f"Size: {product.get('sizeInBytes')} bytes")
|
||||
print(f"Metadata: {metadata_path}")
|
||||
for geotiff_path in geotiff_paths:
|
||||
if args.metadata_only:
|
||||
print(f"GeoTIFF download skipped: {geotiff_path}")
|
||||
else:
|
||||
print(f"GeoTIFF: {geotiff_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user