diff --git a/.flake8 b/.flake8 index ec4d2a5..50d5de0 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] -max-line-length = 119 \ No newline at end of file +max-line-length = 119 +extend-ignore = E203 \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 923ed6f..8a9fa32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# +- Add function pretreatment : + - check data, + - filter by deviation day, + - calculate normalized height in pointcloud + # v0.0.2 - Add folder "configs" in Dockerfile diff --git a/configs/config.yaml b/configs/config.yaml index afdac31..e571721 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -10,6 +10,22 @@ io: input_filename: null input_dir: null output_dir: null + # Spatial reference to use to override the one from input las. + spatial_reference: EPSG:2154 + +pretreatment: + filter: + dimension: Classification + keep_values: [1, 2, 3, 4, 5, 9, 66] # Classes to keep + filter_deviation: + deviation_days: 14 + # ISO-8601 UTC string corresponding to gpstime == 0. + # Default matches the R reference: as.POSIXct("2011-09-14 01:46:40") + # Override this value if your LiDAR data uses a different GPS time origin. + gpstime_ref: "2011-09-14 01:46:40" + filter_normalize: + height: 60 + min_height: -3 hydra: output_subdir: null diff --git a/data/pointcloud/test_semis_2022_0897_6577_LA93_IGN69_decimation.laz b/data/pointcloud/test_semis_2022_0897_6577_LA93_IGN69_decimation.laz new file mode 100644 index 0000000..4794c0f Binary files /dev/null and b/data/pointcloud/test_semis_2022_0897_6577_LA93_IGN69_decimation.laz differ diff --git a/lidar_for_fuel/_version.py b/lidar_for_fuel/_version.py index 536d454..cf50012 100644 --- a/lidar_for_fuel/_version.py +++ b/lidar_for_fuel/_version.py @@ -1,4 +1,4 @@ -__version__ = "0.0.2" +__version__ = "0.2.0" if __name__ == "__main__": diff --git a/lidar_for_fuel/main_pretreatment.py b/lidar_for_fuel/main_pretreatment.py index 9414870..3b873d2 100644 --- a/lidar_for_fuel/main_pretreatment.py +++ b/lidar_for_fuel/main_pretreatment.py @@ -10,6 +10,8 @@ import hydra from omegaconf import DictConfig +from lidar_for_fuel.pretreatment.filter_deviation_day import filter_deviation_day +from lidar_for_fuel.pretreatment.normalize_height_in_pointcloud import normalize_height from lidar_for_fuel.pretreatment.validate_lidar_file import check_lidar_file logger = logging.getLogger(__name__) @@ -51,25 +53,38 @@ def main_on_one_tile(filename): filename (str): filename to the LAS file """ tilename = os.path.splitext(filename)[0] # filename to the LAS file - input_file = os.path.join(input_dir, filename) # path to the LAS file - logging.info(f"\nNormalize and add attributes of 1 for tile : {tilename}") - las = check_lidar_file(input_file) + input_filename = os.path.join(input_dir, filename) # path to the LAS file + srid = config.io.spatial_reference + logging.info(f"\nCheck data of 1 for tile : {tilename}") + pipeline_check_lidar = check_lidar_file(input_filename, srid) + + logging.info(f"\nFilter deviation day of 1 for tile : {tilename}") + deviation_days = config.pretreatment.filter_deviation.deviation_days + gpstime_ref = config.pretreatment.filter_deviation.gpstime_ref + pipeline_filter_deviation_day = filter_deviation_day(pipeline_check_lidar, deviation_days, gpstime_ref) + + logging.info(f"\nCalculate normalize of 1 for tile : {tilename}") + # Generate output filenames + output_dir = config.io.output_dir + output_path = os.path.join(output_dir, filename) + # Parameters + dimension = config.pretreatment.filter.dimension + classes = config.pretreatment.filter.keep_values + height_value = config.pretreatment.filter_normalize.height + min_height = config.pretreatment.filter_normalize.min_height + las = normalize_height( + pipeline_filter_deviation_day, output_path, dimension, classes, height_value, min_height + ) return las if initial_las_filename: # Launch pretreatment by one tile: - las = main_on_one_tile(initial_las_filename) - print(f"✅ SUCCESS: {len(las.points)} points loaded") - print(f" Version: {las.header.version}") - print(f" Point format: {las.header.point_format}") + main_on_one_tile(initial_las_filename) else: # Lauch pretreatment tile by tile for file in os.listdir(input_dir): - las = main_on_one_tile(file) - print(f"✅ SUCCESS: {len(las.points)} points loaded") - print(f" Version: {las.header.version}") - print(f" Point format: {las.header.point_format}") + main_on_one_tile(file) if __name__ == "__main__": diff --git a/lidar_for_fuel/pretreatment/filter_deviation_day.py b/lidar_for_fuel/pretreatment/filter_deviation_day.py new file mode 100644 index 0000000..7050015 --- /dev/null +++ b/lidar_for_fuel/pretreatment/filter_deviation_day.py @@ -0,0 +1,100 @@ +"""Keep points within a ±deviation_days window around the most densely sampled acquisition day.""" + +import json +import logging +import math +import warnings +from datetime import datetime, timezone + +import numpy as np +import pdal + +logger = logging.getLogger(__name__) + +_SECONDS_PER_DAY = 86_400.0 +_EPSILON = 1e-3 # 1 ms — smaller than any realistic GpsTime resolution + + +def filter_deviation_day( + pipeline: pdal.Pipeline, + deviation_days: int | float = 14, + gpstime_ref: str = "2011-09-14 01:46:40", +) -> pdal.Pipeline: + """Filter a LiDAR point cloud keeping only points acquired within ±deviation_days + around the most densely sampled calendar day. + + Equivalent to the R function ``filter_date_mode()`` from lidR. + + Args: + pipeline (pdal.Pipeline): Executed PDAL Pipeline object. + deviation_days (int | float): Half-width of the retention window in days. + Pass ``math.inf`` to skip filtering entirely. Default: 14. + gpstime_ref (str): ISO-8601 UTC string of the GPS time reference epoch. + Default: "2011-09-14 01:46:40". + + Returns: + pdal.Pipeline: A new, configured-but-not-yet-executed PDAL Pipeline restricted + to the selected time window, or the original pipeline unchanged if + ``deviation_days`` is infinite. + + Raises: + ValueError: If the pipeline has no arrays, lacks a ``GpsTime`` dimension, + or if ``deviation_days`` is negative. + """ + if not math.isinf(deviation_days) and deviation_days < 0: + raise ValueError(f"deviation_days must be >= 0 or math.inf, got {deviation_days!r}") + + arrays = pipeline.arrays + if not arrays: + raise ValueError("No arrays produced by the pipeline.") + + points = arrays[0] + + if "GpsTime" not in points.dtype.names: + raise ValueError("Point cloud does not contain a 'GpsTime' dimension.") + + if math.isinf(deviation_days): + logger.debug("deviation_days is Inf — no filtering applied.") + return pipeline + + gpstime_ref_unix = datetime.fromisoformat(gpstime_ref).replace(tzinfo=timezone.utc).timestamp() + n_total = len(points) + + # Convert GpsTime to absolute UNIX time and floor to calendar day + unix_time = points["GpsTime"] + gpstime_ref_unix + day_index = np.floor(unix_time / _SECONDS_PER_DAY).astype(np.int64) + + # Find the modal day (most abundantly sampled) + unique_days, counts = np.unique(day_index, return_counts=True) + modal_day = int(unique_days[counts.argmax()]) + + # Compute the GpsTime filter window [t_min, t_max] + day_lo = modal_day - int(deviation_days) + day_hi = modal_day + int(deviation_days) + t_min = max(day_lo, 0) * _SECONDS_PER_DAY - gpstime_ref_unix + t_max = (day_hi + 1) * _SECONDS_PER_DAY - gpstime_ref_unix - _EPSILON + + # Warn about removed points + n_retained = int( + np.sum((unix_time >= max(day_lo, 0) * _SECONDS_PER_DAY) & (unix_time < (day_hi + 1) * _SECONDS_PER_DAY)) + ) + pct_removed = (1 - n_retained / n_total) * 100 + if pct_removed > 0: + warnings.warn( + f"Careful {round(pct_removed)} % of the returns were removed because they had a " + f"deviation of days around the most abundant date greater than your threshold " + f"({deviation_days} days).", + UserWarning, + stacklevel=2, + ) + + logger.debug( + "Modal day: %d | GpsTime window [%.1f, %.1f] | %.1f%% points removed", + modal_day, + t_min, + t_max, + pct_removed, + ) + + filter_json = {"pipeline": [{"type": "filters.range", "limits": f"GpsTime[{t_min}:{t_max}]"}]} + return pdal.Pipeline(json.dumps(filter_json), arrays=[points]) diff --git a/lidar_for_fuel/pretreatment/normalize_height_in_pointcloud.py b/lidar_for_fuel/pretreatment/normalize_height_in_pointcloud.py new file mode 100644 index 0000000..bff2260 --- /dev/null +++ b/lidar_for_fuel/pretreatment/normalize_height_in_pointcloud.py @@ -0,0 +1,132 @@ +""" +Calculate normalized height in pointcloud, i.e. height above ground. +""" +import json +import logging +from typing import List + +import numpy as np +import pdal +from numpy.lib import recfunctions as rfn +from scipy.interpolate import LinearNDInterpolator, NearestNDInterpolator + +logger = logging.getLogger(__name__) + + +def normalize_height( + input_pipeline: pdal.Pipeline, + output_file: str, + filter_dimension: str, + filter_values: List[int], + height_filter: float = 60.0, + min_height: float = -3.0, +) -> None: + """ + Normalize heights using a Delaunay TIN built from ground points, and write result. + + Equivalent to the R pipeline: + normalize_height(algorithm = tin()) + filter_poi((Classification <= 5 | Classification == 9) & Z < Height_filter) + + Steps: + 1. Execute the input pipeline. + 2. Build a Delaunay TIN from all points with dtm_marker == 1. + 3. Interpolate ground elevation for every point. Points outside the TIN convex + hull are handled by nearest-neighbour extrapolation. + 4. Compute Z_ref = Z - interpolated_ground_Z. + 5. Filter points by filter_dimension/filter_values. + 6. Remove points outside [min_height, height_filter]. + 7. Write output LAS/LAZ with Z_ref as an extra dimension. + + Args: + input_pipeline (pdal.Pipeline): PDAL Pipeline object (may be unexecuted). + output_file (str): Output LAS/LAZ path. + filter_dimension (str): Dimension name used to filter output points. + filter_values (List[int]): Values to keep along filter_dimension. + height_filter (float): Upper height threshold in metres (default: 60 m). + min_height (float): Lower height threshold in metres (default: -3 m). + + Raises: + ValueError: If the pipeline produces no arrays, or if there are fewer than + 3 ground points to build a TIN. + """ + # Execute the pipeline on all points + pipeline = input_pipeline if isinstance(input_pipeline, pdal.Pipeline) else pdal.Pipeline() | input_pipeline + pipeline.execute() + + arrays = pipeline.arrays + if not arrays: + raise ValueError("No arrays produced by the pipeline.") + points = arrays[0] + + # Select ground points for DTM computation: all points with dtm_marker == 1 + ground_mask = points["dtm_marker"] == 1 + logger.debug("%d ground points with dtm_marker=1", int(ground_mask.sum())) + ground_points = points[ground_mask] + + if len(ground_points) < 3: + raise ValueError(f"Not enough ground points to build a TIN (found {len(ground_points)}, need at least 3).") + + # Build Delaunay TIN interpolator from ground points (X, Y) → Z + # LinearNDInterpolator: builds a Delaunay triangulation from ground points. + # For each point (X, Y), finds the triangle it falls in and computes Z by + # barycentric interpolation between the 3 vertices. Equivalent to lidR::tin(). + # Returns NaN for points outside the convex hull of ground points. + # + # NearestNDInterpolator: fallback for points outside the convex hull (e.g. at + # tile edges where vegetation points exceed the ground point extent). + # Returns the Z of the nearest ground point. + ground_xy = np.column_stack([ground_points["X"], ground_points["Y"]]) + ground_z = ground_points["Z"] + lin_interp = LinearNDInterpolator(ground_xy, ground_z) + nn_interp = NearestNDInterpolator(ground_xy, ground_z) + + # Interpolate ground elevation for all points + all_xy = np.column_stack([points["X"], points["Y"]]) + ground_z_interp = lin_interp(all_xy) + + # Fill NaN (outside convex hull) with nearest-neighbour extrapolation + nan_mask = np.isnan(ground_z_interp) + if np.any(nan_mask): + logger.debug("%d points outside TIN convex hull — using nearest-neighbour extrapolation", int(nan_mask.sum())) + ground_z_interp[nan_mask] = nn_interp(all_xy[nan_mask]) + + # Compute height above ground + hag = points["Z"] - ground_z_interp + + # Filter points by filter_dimension/filter_values + if filter_dimension and filter_values: + dim_mask = np.isin(points[filter_dimension], filter_values) + points = points[dim_mask] + hag = hag[dim_mask] + logger.debug("Dimension filter on %s %s: %d points kept", filter_dimension, filter_values, len(points)) + + # Filter by height bounds [min_height, height_filter] + valid = (hag >= min_height) & (hag <= height_filter) + pct_removed = (1 - valid.sum() / len(points)) * 100 + logger.debug("Height filter [%.1f, %.1f]: %.1f%% points removed", min_height, height_filter, pct_removed) + + points_out = points[valid] + hag_out = hag[valid].astype(np.float64) + + # Add Z_ref dimension (= HeightAboveGround) + points_with_zref = rfn.append_fields(points_out, "Z_ref", hag_out, dtypes=np.float64, usemask=False) + + # Write output + writer_pipeline = pdal.Pipeline( + json.dumps( + { + "pipeline": [ + { + "type": "writers.las", + "filename": str(output_file), + "extra_dims": "all", + "forward": "all", + "minor_version": "4", + } + ] + } + ), + arrays=[points_with_zref], + ) + writer_pipeline.execute() diff --git a/lidar_for_fuel/pretreatment/validate_lidar_file.py b/lidar_for_fuel/pretreatment/validate_lidar_file.py index 0f052ef..1d02fdd 100644 --- a/lidar_for_fuel/pretreatment/validate_lidar_file.py +++ b/lidar_for_fuel/pretreatment/validate_lidar_file.py @@ -4,36 +4,32 @@ import logging import os -try: - import laspy - from laspy.errors import LaspyException -except Exception: - laspy = None - LaspyException = Exception +import pdal logger = logging.getLogger(__name__) -def check_lidar_file(input_file: str) -> "laspy.LasData": +def check_lidar_file(input_file: str, spatial_ref: str) -> pdal.Pipeline: """ Validate and load a LiDAR file (.las or .laz). Args: input_file: Path to .las or .laz file. + spatial_ref (str): spatial reference to use when reading las file. Returns: - laspy.LasData: Loaded and validated LAS data. + pdal.Pipeline: Loaded and validated PDAL Pipeline. Raises: - ImportError: If the `laspy` library is not installed, or if a LAZ backend - (such as `lazrs`) is required but missing to decompress LAZ files. - ValueError : If the input path is not a non-empty string, or if the file - extension is not `.las` or `.laz`. + ImportError: If the `pdal` library is not installed. + ValueError: If the input path is not a non-empty string, if the file + extension is not `.las` or `.laz`, or if the `dtm_marker` extra + dimension is missing. FileNotFoundError: If the input file does not exist at the given path. - IOError: If `laspy` fails to read the file due to I/O or data corruption issues. + IOError: If PDAL fails to read the file due to I/O or data corruption issues. """ - if laspy is None: - raise ImportError("Install laspy: pip install laspy") + if pdal is None: + raise ImportError("Install pdal: pip install pdal") # Path validation if not isinstance(input_file, str) or not input_file.strip(): @@ -46,23 +42,24 @@ def check_lidar_file(input_file: str) -> "laspy.LasData": if ext not in (".las", ".laz"): raise ValueError(f"Unsupported extension: {ext}") - # Read with LAZ backend handling + # Read with pdal try: - las = laspy.read(input_file) - except LaspyException as e: - error_msg = str(e) - if any(kw in error_msg for kw in ["No LazBackend", "cannot decompress", "LazBackend"]): - raise ImportError(f"LAZ error: {error_msg}\nInstall: pip install 'laspy[lazrs]' or 'lazrs'") from e - raise IOError(f"laspy read error: {error_msg}") from e - - # Conformity check - num_points = len(las.points) - header_count = getattr(las.header, "point_count", None) - if header_count and header_count != num_points: - logger.warning("Header points mismatch: %s vs %s", header_count, num_points) + pipeline = pdal.Pipeline() | pdal.Reader.las(filename=input_file, override_srs=spatial_ref, nosrs=True) + pipeline.execute() + arrays = pipeline.arrays + if not arrays: + raise IOError("No points read from file") + except Exception as e: + raise IOError(f"PDAL read error: {str(e)}") from e + + num_points = len(arrays[0]) if num_points == 0: logger.warning("Empty file: %s", input_file) - logger.info("✓ Valid LiDAR: %s (%s points)", input_file, num_points) - return las + if "dtm_marker" not in arrays[0].dtype.names: + raise ValueError(f"Missing extra dimension 'dtm_marker' in file: {input_file}") + + logger.info("Valid LiDAR: %s (%s points)", input_file, num_points) + + return pipeline diff --git a/test/pretreatment/test_filter_deviation_day_in_pointcloud.py b/test/pretreatment/test_filter_deviation_day_in_pointcloud.py new file mode 100644 index 0000000..bffe2c1 --- /dev/null +++ b/test/pretreatment/test_filter_deviation_day_in_pointcloud.py @@ -0,0 +1,148 @@ +import json +import math +import warnings + +import numpy as np +import pdal +import pytest + +from lidar_for_fuel.pretreatment.filter_deviation_day import filter_deviation_day + +_SECONDS_PER_DAY = 86_400.0 +_EPSILON = 1e-3 + +_N = 9 +_DEVIATION_DAYS = 2 +_GPSTIME_REF = "2023-01-01 00:00:00" + + +def _make_pipeline(gpstime: np.ndarray, rng: np.random.Generator) -> pdal.Pipeline: + """Build a minimal executed PDAL Pipeline from a 1-D GpsTime array.""" + n = len(gpstime) + dtype = np.dtype([("X", np.float64), ("Y", np.float64), ("Z", np.float64), ("GpsTime", np.float64)]) + points = np.zeros(n, dtype=dtype) + points["X"] = rng.random(n) + points["Y"] = rng.random(n) + points["Z"] = rng.random(n) + points["GpsTime"] = gpstime + pipeline = pdal.Pipeline(json.dumps({"pipeline": []}), arrays=[points]) + pipeline.execute() + return pipeline + + +@pytest.fixture() +def rng() -> np.random.Generator: + return np.random.default_rng(42) + + +def test_multiday_filters_correct_number_of_points_and_warns(rng): + gpstime_regular = np.arange(1, 101, dtype=np.float64) * _SECONDS_PER_DAY + gpstime_extra = (rng.random(_N) + 50) * _SECONDS_PER_DAY + gpstime = np.concatenate([gpstime_regular, gpstime_extra]) + + pipeline = _make_pipeline(gpstime, rng) + + with pytest.warns(UserWarning, match=r"% of the returns were removed"): + result_pipeline = filter_deviation_day(pipeline, deviation_days=_DEVIATION_DAYS, gpstime_ref=_GPSTIME_REF) + + result_pipeline.execute() + n_result = len(result_pipeline.arrays[0]) + expected = _DEVIATION_DAYS * 2 + _N + 1 # = 14 + assert n_result == expected, f"Expected {expected} points after filtering, got {n_result}" + + +def test_single_day_no_filtering(rng): + n_total = 100 + _N + gpstime = rng.random(n_total) * _SECONDS_PER_DAY + pipeline = _make_pipeline(gpstime, rng) + + with warnings.catch_warnings(): + warnings.simplefilter("error") + result_pipeline = filter_deviation_day(pipeline, deviation_days=_DEVIATION_DAYS, gpstime_ref=_GPSTIME_REF) + + result_pipeline.execute() + n_result = len(result_pipeline.arrays[0]) + assert n_result == n_total, f"Expected all {n_total} points to be retained, got {n_result}" + + +def test_infinite_deviation_returns_original_pipeline(rng): + gpstime = np.arange(1, 101, dtype=np.float64) * _SECONDS_PER_DAY + pipeline = _make_pipeline(gpstime, rng) + n_total = len(pipeline.arrays[0]) + + result = filter_deviation_day(pipeline, deviation_days=math.inf, gpstime_ref=_GPSTIME_REF) + assert result is pipeline + assert len(result.arrays[0]) == n_total + + +def test_default_deviation_days_is_14(): + """Verify that the default deviation_days is 14.""" + import inspect + + sig = inspect.signature(filter_deviation_day) + assert sig.parameters["deviation_days"].default == 14 + + +def test_negative_deviation_raises(rng): + gpstime = np.arange(1, 10, dtype=np.float64) * _SECONDS_PER_DAY + pipeline = _make_pipeline(gpstime, rng) + + with pytest.raises(ValueError, match="deviation_days must be >= 0"): + filter_deviation_day(pipeline, deviation_days=-1, gpstime_ref=_GPSTIME_REF) + + +def test_missing_gpstime_dimension_raises(): + n = 10 + dtype = np.dtype([("X", np.float64), ("Y", np.float64), ("Z", np.float64)]) + points = np.zeros(n, dtype=dtype) + pipeline = pdal.Pipeline(json.dumps({"pipeline": []}), arrays=[points]) + pipeline.execute() + + with pytest.raises(ValueError, match="GpsTime"): + filter_deviation_day(pipeline, deviation_days=2, gpstime_ref=_GPSTIME_REF) + + +def test_gpstime_window_correctness(rng): + """Verify the filters.range limits in the returned pipeline and the retained GpsTime values.""" + GPSTIME_REF = "2023-01-01 00:00:00" + DEVIATION_DAY = 1 + + EXPECTED_T_MIN = 345_600.0 + EXPECTED_T_MAX = 604_800.0 - _EPSILON + + EXPECTED_RETAINED_MIDPOINTS = np.array([388_800.0, 475_200.0, 561_600.0], dtype=np.float64) + EXCLUDED_MIDPOINTS = np.array( + [(day + 0.5) * _SECONDS_PER_DAY for day in list(range(0, 4)) + list(range(7, 10))], + dtype=np.float64, + ) + + n_days = 10 + gpstime_per_day = (np.arange(n_days, dtype=np.float64) + 0.5) * _SECONDS_PER_DAY + n_extra = 20 + gpstime_modal = (rng.random(n_extra) + 5) * _SECONDS_PER_DAY + gpstime_input = np.concatenate([gpstime_per_day, gpstime_modal]) + pipeline = _make_pipeline(gpstime_input, rng) + + with pytest.warns(UserWarning, match=r"% of the returns were removed"): + result_pipeline = filter_deviation_day(pipeline, deviation_days=DEVIATION_DAY, gpstime_ref=GPSTIME_REF) + + # Check filters.range limits in the PDAL pipeline JSON + pipeline_spec = json.loads(result_pipeline.pipeline) + range_stages = [s for s in pipeline_spec["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.range"] + assert len(range_stages) == 1 + limits_str = range_stages[0]["limits"] + inner = limits_str[len("GpsTime[") : -1] + parsed_t_min, parsed_t_max = (float(v) for v in inner.split(":")) + + assert parsed_t_min == pytest.approx(EXPECTED_T_MIN, abs=1e-6) + assert parsed_t_max == pytest.approx(EXPECTED_T_MAX, abs=1e-6) + + # Check retained GpsTime values + result_pipeline.execute() + retained = result_pipeline.arrays[0]["GpsTime"] + + assert len(retained) == len(EXPECTED_RETAINED_MIDPOINTS) + n_extra + for ref_val in EXPECTED_RETAINED_MIDPOINTS: + assert np.sum(retained == ref_val) == 1 + for excl_val in EXCLUDED_MIDPOINTS: + assert excl_val not in retained diff --git a/test/pretreatment/test_normalize_height_in_pointcloud.py b/test/pretreatment/test_normalize_height_in_pointcloud.py new file mode 100644 index 0000000..c1ef1bd --- /dev/null +++ b/test/pretreatment/test_normalize_height_in_pointcloud.py @@ -0,0 +1,142 @@ +import json +import os +import shutil +from pathlib import Path + +import laspy +import numpy as np +import pdal + +from lidar_for_fuel.pretreatment.normalize_height_in_pointcloud import normalize_height + +TMP_PATH = Path("./tmp/normalize_height") +SAMPLE_LAS = "./data/pointcloud/test_semis_2022_0897_6577_LA93_IGN69_decimation.laz" +OUTPUT_LAS = TMP_PATH / "normalized.laz" +OUTPUT_DTM_MARKER = TMP_PATH / "normalized_dtm_marker.laz" +OUTPUT_FLAT = TMP_PATH / "normalized_flat.las" +OUTPUT_SLOPE = TMP_PATH / "normalized_slope.las" + +# LAS point dtype with minimum required fields +_LAS_DTYPE = np.dtype( + [ + ("X", np.float64), + ("Y", np.float64), + ("Z", np.float64), + ("Intensity", np.uint16), + ("ReturnNumber", np.uint8), + ("NumberOfReturns", np.uint8), + ("Classification", np.uint8), + ("dtm_marker", np.uint8), + ] +) + + +def _make_synthetic_pipeline(rows: list) -> pdal.Pipeline: + """Build an unexecuted PDAL Pipeline from a list of (x, y, z, classification, dtm_marker) tuples.""" + pts = np.zeros(len(rows), dtype=_LAS_DTYPE) + for i, (x, y, z, c, dtm) in enumerate(rows): + pts[i]["X"] = x + pts[i]["Y"] = y + pts[i]["Z"] = z + pts[i]["Classification"] = c + pts[i]["dtm_marker"] = dtm + return pdal.Pipeline(json.dumps({"pipeline": []}), arrays=[pts]) + + +def setup_module(module): + """Clean and recreate tmp directory before tests.""" + if TMP_PATH.is_dir(): + shutil.rmtree(TMP_PATH) + os.makedirs(TMP_PATH) + + +def _make_pipeline(): + return pdal.Reader.las(filename=SAMPLE_LAS, override_srs="EPSG:2154", nosrs=True) + + +def test_normalize_height_in_pointcloud(): + """Test function produces valid normalized output with Z_ref in expected bounds.""" + normalize_height( + _make_pipeline(), + OUTPUT_LAS, + "Classification", + [1, 2, 3, 4, 5], + height_filter=60.0, + min_height=-3.0, + ) + + assert OUTPUT_LAS.exists(), "Output file created" + + las_out = laspy.read(OUTPUT_LAS) + assert len(las_out.points) > 0, "Points preserved after processing" + assert "Z_ref" in las_out.point_format.dimension_names, "Z_ref dimension added" + assert np.all(las_out.Z_ref >= -3.0), "No points below min_height threshold" + assert np.all(las_out.Z_ref <= 60.0), "No points above height_filter threshold" + + +def test_normalize_height_flat_ground(): + """On a flat ground at Z=100, Z_ref must equal Z - 100 for each non-ground point. + + Geometry (top view): + Ground (Class 2) at Z=100 forms a grid covering [0,20]x[0,20]. + Non-ground (Class 3) are placed inside this area at known heights. + + Expected: Z_ref = Z - 100 (exactly, since the TIN reproduces a flat plane). + """ + ground = [(0, 0, 100, 2, 1), (20, 0, 100, 2, 1), (0, 20, 100, 2, 1), (20, 20, 100, 2, 1), (10, 10, 100, 2, 1)] + # (x, y, z, class, dtm_marker) — expected Z_ref in comment + non_ground = [ + (5, 5, 103, 3, 0), # Z_ref = 3.0 + (10, 5, 105, 3, 0), # Z_ref = 5.0 + (8, 8, 102, 3, 0), # Z_ref = 2.0 + ] + expected_z_ref = np.array([3.0, 5.0, 2.0]) + + normalize_height( + _make_synthetic_pipeline(ground + non_ground), + OUTPUT_FLAT, + "Classification", + [2, 3], + height_filter=60.0, + min_height=-3.0, + ) + + las_out = laspy.read(OUTPUT_FLAT) + non_ground_mask = np.array(las_out.classification) == 3 + z_ref_out = np.sort(np.array(las_out.Z_ref)[non_ground_mask]) + + np.testing.assert_allclose(z_ref_out, np.sort(expected_z_ref), atol=1e-2) + + +def test_normalize_height_sloped_ground(): + """On a sloped ground where Z_ground = X, Z_ref must equal Z - X for each non-ground point. + + Geometry: + Ground (Class 2): Z = X (slope of 1 m/m along X axis) + Non-ground (Class 3): Z = X + known_height → expected Z_ref = known_height + + The TIN exactly reproduces a linear plane, so Z_ref should be constant + regardless of the (X, Y) position of the non-ground point. + """ + known_height = 5.0 + ground = [(0, 0, 0, 2, 1), (20, 0, 20, 2, 1), (0, 20, 0, 2, 1), (20, 20, 20, 2, 1), (10, 10, 10, 2, 1)] + non_ground = [ + (5, 5, 5 + known_height, 3, 0), + (10, 5, 10 + known_height, 3, 0), + (8, 8, 8 + known_height, 3, 0), + ] + + normalize_height( + _make_synthetic_pipeline(ground + non_ground), + OUTPUT_SLOPE, + "Classification", + [2, 3], + height_filter=60.0, + min_height=-3.0, + ) + + las_out = laspy.read(OUTPUT_SLOPE) + non_ground_mask = np.array(las_out.classification) == 3 + z_ref_out = np.array(las_out.Z_ref)[non_ground_mask] + + np.testing.assert_allclose(z_ref_out, known_height, atol=1e-2) diff --git a/test/pretreatment/test_validate_lidar_file.py b/test/pretreatment/test_validate_lidar_file.py index 71d53db..9e8a799 100644 --- a/test/pretreatment/test_validate_lidar_file.py +++ b/test/pretreatment/test_validate_lidar_file.py @@ -2,13 +2,13 @@ import shutil from pathlib import Path -import laspy +import pdal import pytest from lidar_for_fuel.pretreatment.validate_lidar_file import check_lidar_file TMP_PATH = Path("./tmp/check_lidar") -SAMPLE_LAS = "./data/pointcloud/test_data_0000_0000_LA93_IGN69.laz" +SAMPLE_LAS = "./data/pointcloud/test_semis_2022_0897_6577_LA93_IGN69_decimation.laz" def setup_module(module): @@ -20,37 +20,28 @@ def setup_module(module): def test_check_lidar_file_return_format_okay(): """Test function returns valid LasData object.""" - las = check_lidar_file(SAMPLE_LAS) - - assert isinstance(las, laspy.LasData) is True - assert len(las.points) > 0 - assert las.header.version == "1.4" - - -def test_check_lidar_file_empty(): - """Test empty file handling (warning but success).""" - # Create empty LAS file - empty_path = TMP_PATH / "empty.las" - las_empty = laspy.create(point_format=2, file_version="1.2") - las_empty.write(empty_path) - - las = check_lidar_file(str(empty_path)) - assert isinstance(las, laspy.LasData) is True - assert len(las.points) == 0 + pipeline = check_lidar_file(SAMPLE_LAS, "EPSG:2154") + assert isinstance(pipeline, pdal.Pipeline) + arrays = pipeline.arrays + assert len(arrays) == 1 + assert len(arrays[0]) > 0 # Fichier test a des points + metadata = pipeline.metadata + assert isinstance(metadata, dict) def test_check_lidar_file_unsupported_extension(): - """Test unsupported file extension.""" - bad_path = TMP_PATH / "test.txt" - bad_path.write_text("fake data") - + unsupported_path = TMP_PATH / "file.txt" + unsupported_path.write_text("fake") with pytest.raises(ValueError, match="Unsupported extension"): - check_lidar_file(str(bad_path)) + check_lidar_file(str(unsupported_path), "EPSG:2154") def test_check_lidar_file_not_exists(): - """Test non-existent file.""" - fake_path = TMP_PATH / "fake.las" + with pytest.raises(FileNotFoundError): + check_lidar_file("nonexistent.laz", "EPSG:2154") + - with pytest.raises(FileNotFoundError, match="File not found"): - check_lidar_file(str(fake_path)) +def test_check_lidar_file_missing_dtm_marker(): + """A LAS file without dtm_marker extra dimension must raise ValueError.""" + with pytest.raises(ValueError, match="dtm_marker"): + check_lidar_file("./data/pointcloud/test_data_0000_0000_LA93_IGN69.laz", "EPSG:2154")