128 lines
4.4 KiB
Python
128 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Extract the 1 km Ground Zero subset from the acquired USGS DEM.
|
|
|
|
This script requires either rasterio or GDAL Python bindings. The current
|
|
Ubuntu-Codex image does not include those packages by default, so this file is
|
|
checked in as the repeatable extraction step once the geospatial dependency is
|
|
available.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from prototype_ground_zero_terrain import TARGET_TILE_ID
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
REGISTRY_PATH = PROJECT_ROOT / "Data" / "Tiles" / "ground_zero_tiles.json"
|
|
SOURCE_ROOT = PROJECT_ROOT / "Data" / "Terrain" / "Sources" / TARGET_TILE_ID
|
|
EXTRACT_ROOT = PROJECT_ROOT / "Data" / "Terrain" / "Extracted" / TARGET_TILE_ID
|
|
|
|
|
|
def load_ground_zero() -> dict:
|
|
registry = json.loads(REGISTRY_PATH.read_text())
|
|
for tile in registry["tiles"]:
|
|
if tile["tile_id"] == TARGET_TILE_ID:
|
|
return tile
|
|
raise RuntimeError(f"Could not find {TARGET_TILE_ID}")
|
|
|
|
|
|
def find_source_tiffs(tile: dict) -> list[Path]:
|
|
for source in tile["sources"]:
|
|
if source["source_kind"] != "elevation":
|
|
continue
|
|
if source.get("local_source_folder"):
|
|
folder = PROJECT_ROOT / source["local_source_folder"]
|
|
candidates = sorted(folder.glob("*.tif"))
|
|
if candidates:
|
|
return candidates
|
|
if source.get("local_source_path"):
|
|
path = PROJECT_ROOT / source["local_source_path"]
|
|
if path.exists():
|
|
return [path]
|
|
candidates = sorted(SOURCE_ROOT.glob("*.tif"))
|
|
if candidates:
|
|
return candidates
|
|
raise RuntimeError("No acquired source GeoTIFF found. Run acquire_ground_zero_dem.py first.")
|
|
|
|
|
|
def extract_with_rasterio(source_tiffs: list[Path], tile: dict) -> None:
|
|
import rasterio
|
|
from rasterio.merge import merge
|
|
from rasterio.windows import from_bounds
|
|
|
|
EXTRACT_ROOT.mkdir(parents=True, exist_ok=True)
|
|
grid = tile["grid"]
|
|
output_tiff = EXTRACT_ROOT / f"{TARGET_TILE_ID}_1m_dem_subset.tif"
|
|
output_metadata = EXTRACT_ROOT / f"{TARGET_TILE_ID}_1m_dem_subset_metadata.json"
|
|
bounds = (
|
|
grid["easting_min_m"],
|
|
grid["northing_min_m"],
|
|
grid["easting_max_m"],
|
|
grid["northing_max_m"],
|
|
)
|
|
|
|
datasets = [rasterio.open(path) for path in source_tiffs]
|
|
try:
|
|
if len(datasets) == 1:
|
|
dataset = datasets[0]
|
|
source_bounds = [list(dataset.bounds)]
|
|
source_crs = str(dataset.crs)
|
|
window = from_bounds(*bounds, dataset.transform).round_offsets().round_lengths()
|
|
data = dataset.read(window=window)
|
|
transform = dataset.window_transform(window)
|
|
profile = dataset.profile
|
|
else:
|
|
source_bounds = [list(dataset.bounds) for dataset in datasets]
|
|
source_crs = str(datasets[0].crs)
|
|
data, transform = merge(datasets, bounds=bounds, res=(1.0, 1.0), nodata=-999999)
|
|
profile = datasets[0].profile
|
|
|
|
profile.update(
|
|
{
|
|
"height": data.shape[1],
|
|
"width": data.shape[2],
|
|
"transform": transform,
|
|
}
|
|
)
|
|
|
|
with rasterio.open(output_tiff, "w", **profile) as output:
|
|
output.write(data)
|
|
|
|
metadata = {
|
|
"tile_id": TARGET_TILE_ID,
|
|
"source_tiffs": [str(path.relative_to(PROJECT_ROOT)) for path in source_tiffs],
|
|
"output_tiff": str(output_tiff.relative_to(PROJECT_ROOT)),
|
|
"source_crs": source_crs,
|
|
"source_bounds": source_bounds,
|
|
"subset_bounds_utm_m": list(bounds),
|
|
"subset_width_pixels": int(data.shape[2]),
|
|
"subset_height_pixels": int(data.shape[1]),
|
|
"pixel_size_x": abs(transform.a),
|
|
"pixel_size_y": abs(transform.e),
|
|
}
|
|
output_metadata.write_text(json.dumps(metadata, indent=2) + "\n")
|
|
finally:
|
|
for dataset in datasets:
|
|
dataset.close()
|
|
|
|
print(f"Subset GeoTIFF: {output_tiff}")
|
|
print(f"Metadata: {output_metadata}")
|
|
|
|
|
|
def main() -> None:
|
|
tile = load_ground_zero()
|
|
source_tiffs = find_source_tiffs(tile)
|
|
try:
|
|
extract_with_rasterio(source_tiffs, tile)
|
|
except ModuleNotFoundError as exc:
|
|
raise SystemExit(
|
|
"Missing rasterio. Install rasterio or GDAL Python bindings, then rerun this script."
|
|
) from exc
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|