-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(#3): group and crop surveys based on h3 cells
- Loading branch information
AdrianSolberg
committed
Oct 23, 2025
1 parent
bedebd3
commit a4059ab
Showing
2 changed files
with
220 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| #survey content | ||
| /surveys/* | ||
| /output_las/* | ||
| surveys | ||
| converted_surveys | ||
| temp_cells | ||
| metadata | ||
| h3_cells | ||
| .DS_Store | ||
| .venv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,102 +1,233 @@ | ||
| import sys | ||
| from collections import defaultdict | ||
| import json | ||
| import shutil | ||
| from pathlib import Path | ||
| import dask.dataframe as dd | ||
| import h3 | ||
| import numpy as np | ||
| import laspy | ||
| import pandas as pd | ||
| import pdal | ||
| from pyproj import Transformer | ||
|
|
||
|
|
||
| def CSV_2_LAS(surveys_folders, output_folder, output_name="merged_final1.las", chunk_size_bytes="64MB"): | ||
| from shapely import MultiPolygon, Polygon | ||
| from shapely.ops import transform | ||
| from shapely import wkt | ||
|
|
||
| def prepare_cells(metadata): | ||
| # Relevant level 1 H3 cells | ||
| level1_cells = pd.read_csv("metadata/level1_h3_cells.csv")["cell_id"].tolist() | ||
|
|
||
| # Transformer for polygons | ||
| transformer = Transformer.from_crs("EPSG:4326", "EPSG:4978", always_xy=True) | ||
|
|
||
| # Generate level 2 cells & polygons | ||
| level2_cells = [] | ||
| cell_polygons = {} | ||
| for c1 in level1_cells: | ||
| children = h3.cell_to_children(c1, 2) | ||
| for c2 in children: | ||
| coords = h3.cell_to_boundary(c2) | ||
| poly = Polygon([(lng, lat) for lat, lng in coords]) | ||
| cell_polygons[c2] = transform(transformer.transform, poly) | ||
| level2_cells.append(c2) | ||
|
|
||
| metadata["geom"] = metadata["geom"].apply(wkt.loads) | ||
| transform_4258_to_4978 = Transformer.from_crs("EPSG:4258", "EPSG:4978", always_xy=True) | ||
| metadata["geom_4978"] = metadata["geom"].apply(lambda g: transform(transform_4258_to_4978.transform, g)) | ||
|
|
||
| # Assign surveys to cells | ||
| cell_to_surveys = defaultdict(list) | ||
| for c2, cell_poly in cell_polygons.items(): | ||
| for _, survey in metadata.iterrows(): | ||
| geom = survey["geom_4978"] | ||
|
|
||
| if geom.is_empty: | ||
| continue | ||
|
|
||
| # Handle both Polygon and MultiPolygon | ||
| if isinstance(geom, Polygon) and cell_poly.intersects(geom): | ||
| cell_to_surveys[c2].append(survey["fid"]) | ||
| elif isinstance(geom, MultiPolygon) and any(cell_poly.intersects(subgeom) for subgeom in geom.geoms): | ||
| cell_to_surveys[c2].append(survey["fid"]) | ||
|
|
||
| return cell_to_surveys, cell_polygons | ||
|
|
||
| def CSV_2_LAS(surveys_folders, output_folder, metadata, chunk_size_bytes="64MB"): | ||
| surveys_path = Path(surveys_folders) | ||
| output_folder = Path(output_folder) | ||
| output_folder.mkdir(parents=True, exist_ok=True) | ||
| transformer = Transformer.from_crs("EPSG:25832", "EPSG:4978", always_xy=True) | ||
|
|
||
|
|
||
| csv_files = list(surveys_path.rglob("*.csv")) | ||
| if not csv_files: | ||
| print("No CSV files found.") | ||
| return | ||
| print(f"Found {len(csv_files)} CSV files.") | ||
|
|
||
| # Map surveys to IDs | ||
| top_dirs = sorted({f.relative_to(surveys_path).parts[0] for f in csv_files}) | ||
| dir_to_id = {name: i+1 for i, name in enumerate(top_dirs)} | ||
| pd.DataFrame(list(dir_to_id.items()), columns=["top_dir","id"]).to_csv(output_folder / "dir_mapping.csv", index=False) | ||
|
|
||
| temp_folder = output_folder / "tmp" | ||
| temp_folder.mkdir(exist_ok=True) | ||
|
|
||
| # Step 1: Write chunked LAS files | ||
| print("Step 1: Writing LAS chunks...") | ||
| file_counter = 1 | ||
|
|
||
| for f in csv_files: | ||
| top_dir_id = dir_to_id[f.relative_to(surveys_path).parts[0]] | ||
| # Read CSV in Dask partitions (out-of-core) | ||
| ddf = dd.read_csv(str(f), header=None, usecols=[0,1,2,3,4,5], blocksize=chunk_size_bytes) | ||
|
|
||
| for partition in ddf.to_delayed(): | ||
| df = partition.compute() | ||
| x, y, z = df.iloc[:,0].to_numpy(), df.iloc[:,1].to_numpy(), df.iloc[:,2].to_numpy() | ||
|
|
||
| # Transform coordinates into EPSG:4978 | ||
| x, y, z = transformer.transform(x, y, z) | ||
| accepted = ( | ||
| df.iloc[:, 3].astype(str) | ||
| .str.strip() | ||
| .str.lower() | ||
| .eq("accepted") | ||
| .astype(np.uint8) | ||
| .to_numpy() | ||
| ) | ||
| tvu = df.iloc[:, 4].to_numpy(dtype=np.float32) | ||
| thu = df.iloc[:, 5].to_numpy(dtype=np.float32) | ||
| ids = np.full(len(df), top_dir_id, dtype=np.uint16) | ||
|
|
||
| # Create LAS header template | ||
| header = laspy.LasHeader(point_format=6, version="1.4") | ||
|
|
||
| # Add extra dimensions | ||
| header.add_extra_dim(laspy.ExtraBytesParams(name="accepted", type=np.uint8)) | ||
| header.add_extra_dim(laspy.ExtraBytesParams(name="TVU", type=np.float32)) | ||
| header.add_extra_dim(laspy.ExtraBytesParams(name="THU", type=np.float32)) | ||
|
|
||
| las = laspy.LasData(header) | ||
| las.x = x | ||
| las.y = y | ||
| las.z = z | ||
|
|
||
| # Add extra dimensions | ||
| las["accepted"] = accepted | ||
| las["TVU"] = tvu | ||
| las["THU"] = thu | ||
| las.point_source_id = ids | ||
|
|
||
| las_path = temp_folder / f"{output_name}_chunk_{file_counter}.las" | ||
| las.write(las_path) | ||
| print(f"✅ Wrote {las_path}") | ||
| file_counter += 1 | ||
|
|
||
| # Merging chunked LAS files into single LAS | ||
| print("Step 2: Merging LAS chunks into final LAS...") | ||
| las_files = sorted(temp_folder.glob(f"{output_name}_chunk_*.las")) | ||
| first_las = laspy.read(str(las_files[0])) | ||
| merged_header = first_las.header | ||
|
|
||
| with laspy.open(str(output_folder / output_name), mode="w", header=merged_header) as merged_writer: | ||
| # Set EPSG:25832 as default when crs is missing | ||
| metadata["epsg"] = metadata["epsg"].fillna(25832).astype(int) | ||
|
|
||
| for source_id, row in metadata.iterrows(): | ||
| survey_name = row["survey_name"] | ||
| survey_folder = surveys_path / survey_name | ||
|
|
||
| if not survey_folder.exists() or not survey_folder.is_dir(): | ||
| print(f"⚠️ Survey folder '{survey_folder}' does not exist, skipping.") | ||
| continue | ||
|
|
||
| csv_files = sorted(survey_folder.glob("*.csv")) | ||
| if not csv_files: | ||
| print(f"⚠️ No CSV files for {survey_name}, skipping.") | ||
| continue | ||
|
|
||
| epsg = row["epsg"] | ||
| transformer = Transformer.from_crs(f"EPSG:{epsg}", "EPSG:4978", always_xy=True) | ||
| file_counter = 1 | ||
| print(f"Writing LAS chunks for {survey_name}...") | ||
| for csv_file in csv_files: | ||
| # Read CSV in Dask partitions (out-of-core) | ||
| ddf = dd.read_csv(str(csv_file), header=None, blocksize=chunk_size_bytes) | ||
|
|
||
| for partition in ddf.to_delayed(): | ||
| df = partition.compute() | ||
| x, y, z = df.iloc[:,0].to_numpy(), df.iloc[:,1].to_numpy(), df.iloc[:,2].to_numpy() | ||
|
|
||
| # Transform coordinates into EPSG:4978 | ||
| x, y, z = transformer.transform(x, y, z) | ||
|
|
||
| accepted = ( | ||
| df.iloc[:, 3].astype(str) | ||
| .str.strip() | ||
| .str.lower() | ||
| .eq("accepted") | ||
| .astype(np.uint8) | ||
| .to_numpy() | ||
| ) | ||
|
|
||
| # Default to 0 if TVU/THU missing | ||
| tvu = df.iloc[:, 4].to_numpy(dtype=np.float32) if df.shape[1] > 4 else np.zeros(len(df), dtype=np.float32) | ||
| thu = df.iloc[:, 5].to_numpy(dtype=np.float32) if df.shape[1] > 5 else np.zeros(len(df), dtype=np.float32) | ||
|
|
||
| ids = np.full(len(df), source_id, dtype=np.uint16) | ||
|
|
||
| # Create LAS header template | ||
| header = laspy.LasHeader(point_format=6, version="1.4") | ||
|
|
||
| # Add extra dimensions | ||
| header.add_extra_dim(laspy.ExtraBytesParams(name="accepted", type=np.uint8)) | ||
| header.add_extra_dim(laspy.ExtraBytesParams(name="TVU", type=np.float32)) | ||
| header.add_extra_dim(laspy.ExtraBytesParams(name="THU", type=np.float32)) | ||
|
|
||
| las = laspy.LasData(header) | ||
| las.x = x | ||
| las.y = y | ||
| las.z = z | ||
|
|
||
| # Add extra dimensions | ||
| las["accepted"] = accepted | ||
| las["TVU"] = tvu | ||
| las["THU"] = thu | ||
| las.point_source_id = ids | ||
|
|
||
| las_path = temp_folder / f"{survey_name}_chunk_{file_counter}.las" | ||
| las.write(las_path) | ||
| print(f"✅ Wrote {las_path}") | ||
| file_counter += 1 | ||
|
|
||
| # Merging chunked LAS files into single LAS | ||
| print("Step 2: Merging LAS chunks into final LAS...") | ||
| las_files = sorted(temp_folder.glob(f"{survey_name}_chunk_*.las")) | ||
| first_las = laspy.read(str(las_files[0])) | ||
| merged_header = first_las.header | ||
|
|
||
| with laspy.open(str(output_folder / f"{survey_name}.las"), mode="w", header=merged_header) as merged_writer: | ||
| for f in las_files: | ||
| las = laspy.read(str(f)) | ||
| merged_writer.write_points(las.points) | ||
| print(f"✅ Merged {f.name} ({len(las.points)} points)") | ||
|
|
||
| # Cleanup temporary LAS files | ||
| for f in las_files: | ||
| las = laspy.read(str(f)) | ||
| merged_writer.write_points(las.points) | ||
| print(f"Merged {f.name} ({len(las.points)} points)") | ||
| f.unlink() | ||
|
|
||
| # Cleanup temporary LAS files | ||
| for f in las_files: | ||
| f.unlink() | ||
| temp_folder.rmdir() | ||
|
|
||
| print(f"Final merged LAS saved at {output_folder / output_name}") | ||
|
|
||
| def group_by_h3_cell(cell_to_surveys, cell_polygons): | ||
| output_folder = Path("h3_cells") | ||
| output_folder.mkdir(exist_ok=True) | ||
|
|
||
| temp_folder_base = Path("temp_cells") # temporary folder base | ||
| temp_folder_base.mkdir(exist_ok=True) | ||
|
|
||
| for c2, survey_fids in cell_to_surveys.items(): | ||
| if not survey_fids: | ||
| continue | ||
|
|
||
| # Create a temporary folder for this H3 cell | ||
| temp_folder = temp_folder_base / c2 | ||
| temp_folder.mkdir(exist_ok=True) | ||
|
|
||
| input_files = [] | ||
| for fid in survey_fids: | ||
| survey_name = metadata.loc[metadata['fid'] == fid, 'survey_name'].item() | ||
|
|
||
| src = Path("converted_surveys") / f"{survey_name}.las" | ||
| if not src.exists(): | ||
| continue | ||
|
|
||
| dst = temp_folder / src.name | ||
| if dst.exists(): | ||
| dst.unlink() # remove the existing file or symlink first | ||
|
|
||
| dst.symlink_to(src.resolve()) | ||
| input_files.append(str(dst)) | ||
|
|
||
|
|
||
| # Skip cell if no files exist | ||
| if not input_files: | ||
| shutil.rmtree(temp_folder) | ||
| print(f"⚠️ No survey files for cell {c2}, skipping") | ||
| continue | ||
|
|
||
| # PDAL filename glob pattern | ||
| input_pattern = str(temp_folder / "*.las") | ||
| cell_poly = cell_polygons[c2] | ||
|
|
||
| # PDAL JSON pipeline | ||
| pdal_pipeline = { | ||
| "pipeline": [ | ||
| { | ||
| "type": "readers.las", | ||
| "filename": input_pattern, | ||
| "extra_dims": "Accepted=uint8,TVU=float32,THU=float32" | ||
| }, | ||
| { | ||
| "type": "filters.crop", | ||
| "polygon": cell_poly.wkt | ||
| }, | ||
| { | ||
| "type": "writers.las", | ||
| "filename": str(output_folder / f"{c2}.las"), | ||
| "extra_dims": "Accepted=uint8,TVU=float32,THU=float32" | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| pipeline_json = json.dumps(pdal_pipeline) | ||
|
|
||
| p = pdal.Pipeline(pipeline_json) | ||
| try: | ||
| p.execute_streaming(chunk_size=500000) | ||
| print(f"Written H3 cell {c2}.") | ||
| except KeyboardInterrupt: | ||
| print("Keyboard interrupt detected, cleaning up and exiting...") | ||
| break | ||
| except RuntimeError as e: | ||
| print(f"Pipeline failed for cell {c2}: {e}") | ||
| finally: | ||
| shutil.rmtree(temp_folder) | ||
|
|
||
| if __name__ == "__main__": | ||
| CSV_2_LAS(sys.argv[1], sys.argv[2]) | ||
| metadata = pd.read_csv("./metadata/metadata.csv") # includes fid, survey_name, survey_area, geom, epsg | ||
|
|
||
| cell_to_surveys, cell_polygons = prepare_cells(metadata) | ||
|
|
||
| CSV_2_LAS("surveys", "converted_surveys", metadata) | ||
|
|
||
| group_by_h3_cell(cell_to_surveys, cell_polygons) |