Skip to content
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 119
max-line-length = 119
extend-ignore = E203
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#
- Add function pretreatment :
- check data,
- filter by deviation day,
- calculate normalized height in pointcloud

# v0.0.2
- Add folder "configs" in Dockerfile

Expand Down
16 changes: 16 additions & 0 deletions configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ io:
input_filename: null
input_dir: null
output_dir: null
# Spatial reference to use to override the one from input las.
spatial_reference: EPSG:2154

pretreatment:
filter:
dimension: Classification
keep_values: [1, 2, 3, 4, 5, 9, 66] # Classes to keep
filter_deviation:
deviation_days: 14
# ISO-8601 UTC string corresponding to gpstime == 0.
# Default matches the R reference: as.POSIXct("2011-09-14 01:46:40")
# Override this value if your LiDAR data uses a different GPS time origin.
gpstime_ref: "2011-09-14 01:46:40"
filter_normalize:
height: 60
min_height: -3

hydra:
output_subdir: null
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion lidar_for_fuel/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.2"
__version__ = "0.2.0"


if __name__ == "__main__":
Expand Down
37 changes: 26 additions & 11 deletions lidar_for_fuel/main_pretreatment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import hydra
from omegaconf import DictConfig

from lidar_for_fuel.pretreatment.filter_deviation_day import filter_deviation_day
from lidar_for_fuel.pretreatment.normalize_height_in_pointcloud import normalize_height
from lidar_for_fuel.pretreatment.validate_lidar_file import check_lidar_file

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,25 +53,38 @@ def main_on_one_tile(filename):
filename (str): filename to the LAS file
"""
tilename = os.path.splitext(filename)[0] # filename to the LAS file
input_file = os.path.join(input_dir, filename) # path to the LAS file
logging.info(f"\nNormalize and add attributes of 1 for tile : {tilename}")
las = check_lidar_file(input_file)
input_filename = os.path.join(input_dir, filename) # path to the LAS file
srid = config.io.spatial_reference
logging.info(f"\nCheck data of 1 for tile : {tilename}")
pipeline_check_lidar = check_lidar_file(input_filename, srid)

logging.info(f"\nFilter deviation day of 1 for tile : {tilename}")
deviation_days = config.pretreatment.filter_deviation.deviation_days
gpstime_ref = config.pretreatment.filter_deviation.gpstime_ref
pipeline_filter_deviation_day = filter_deviation_day(pipeline_check_lidar, deviation_days, gpstime_ref)

logging.info(f"\nCalculate normalize of 1 for tile : {tilename}")
# Generate output filenames
output_dir = config.io.output_dir
output_path = os.path.join(output_dir, filename)
# Parameters
dimension = config.pretreatment.filter.dimension
classes = config.pretreatment.filter.keep_values
height_value = config.pretreatment.filter_normalize.height
min_height = config.pretreatment.filter_normalize.min_height
las = normalize_height(
pipeline_filter_deviation_day, output_path, dimension, classes, height_value, min_height
)
return las

if initial_las_filename:
# Launch pretreatment by one tile:
las = main_on_one_tile(initial_las_filename)
print(f"✅ SUCCESS: {len(las.points)} points loaded")
print(f" Version: {las.header.version}")
print(f" Point format: {las.header.point_format}")
main_on_one_tile(initial_las_filename)

else:
# Lauch pretreatment tile by tile
for file in os.listdir(input_dir):
las = main_on_one_tile(file)
print(f"✅ SUCCESS: {len(las.points)} points loaded")
print(f" Version: {las.header.version}")
print(f" Point format: {las.header.point_format}")
main_on_one_tile(file)


if __name__ == "__main__":
Expand Down
100 changes: 100 additions & 0 deletions lidar_for_fuel/pretreatment/filter_deviation_day.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Keep points within a ±deviation_days window around the most densely sampled acquisition day."""

import json
import logging
import math
import warnings
from datetime import datetime, timezone

import numpy as np
import pdal

logger = logging.getLogger(__name__)

_SECONDS_PER_DAY = 86_400.0
_EPSILON = 1e-3 # 1 ms — smaller than any realistic GpsTime resolution


def filter_deviation_day(
pipeline: pdal.Pipeline,
deviation_days: int | float = 14,
gpstime_ref: str = "2011-09-14 01:46:40",
) -> pdal.Pipeline:
"""Filter a LiDAR point cloud keeping only points acquired within ±deviation_days
around the most densely sampled calendar day.

Equivalent to the R function ``filter_date_mode()`` from lidR.

Args:
pipeline (pdal.Pipeline): Executed PDAL Pipeline object.
deviation_days (int | float): Half-width of the retention window in days.
Pass ``math.inf`` to skip filtering entirely. Default: 14.
gpstime_ref (str): ISO-8601 UTC string of the GPS time reference epoch.
Default: "2011-09-14 01:46:40".

Returns:
pdal.Pipeline: A new, configured-but-not-yet-executed PDAL Pipeline restricted
to the selected time window, or the original pipeline unchanged if
``deviation_days`` is infinite.

Raises:
ValueError: If the pipeline has no arrays, lacks a ``GpsTime`` dimension,
or if ``deviation_days`` is negative.
"""
if not math.isinf(deviation_days) and deviation_days < 0:
raise ValueError(f"deviation_days must be >= 0 or math.inf, got {deviation_days!r}")

arrays = pipeline.arrays
if not arrays:
raise ValueError("No arrays produced by the pipeline.")

points = arrays[0]

if "GpsTime" not in points.dtype.names:
raise ValueError("Point cloud does not contain a 'GpsTime' dimension.")

if math.isinf(deviation_days):
logger.debug("deviation_days is Inf — no filtering applied.")
return pipeline

gpstime_ref_unix = datetime.fromisoformat(gpstime_ref).replace(tzinfo=timezone.utc).timestamp()
n_total = len(points)

# Convert GpsTime to absolute UNIX time and floor to calendar day
unix_time = points["GpsTime"] + gpstime_ref_unix
day_index = np.floor(unix_time / _SECONDS_PER_DAY).astype(np.int64)

# Find the modal day (most abundantly sampled)
unique_days, counts = np.unique(day_index, return_counts=True)
modal_day = int(unique_days[counts.argmax()])

# Compute the GpsTime filter window [t_min, t_max]
day_lo = modal_day - int(deviation_days)
day_hi = modal_day + int(deviation_days)
t_min = max(day_lo, 0) * _SECONDS_PER_DAY - gpstime_ref_unix
t_max = (day_hi + 1) * _SECONDS_PER_DAY - gpstime_ref_unix - _EPSILON

# Warn about removed points
n_retained = int(
np.sum((unix_time >= max(day_lo, 0) * _SECONDS_PER_DAY) & (unix_time < (day_hi + 1) * _SECONDS_PER_DAY))
)
pct_removed = (1 - n_retained / n_total) * 100
if pct_removed > 0:
warnings.warn(
f"Careful {round(pct_removed)} % of the returns were removed because they had a "
f"deviation of days around the most abundant date greater than your threshold "
f"({deviation_days} days).",
UserWarning,
stacklevel=2,
)

logger.debug(
"Modal day: %d | GpsTime window [%.1f, %.1f] | %.1f%% points removed",
modal_day,
t_min,
t_max,
pct_removed,
)

filter_json = {"pipeline": [{"type": "filters.range", "limits": f"GpsTime[{t_min}:{t_max}]"}]}
return pdal.Pipeline(json.dumps(filter_json), arrays=[points])
132 changes: 132 additions & 0 deletions lidar_for_fuel/pretreatment/normalize_height_in_pointcloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Calculate normalized height in pointcloud, i.e. height above ground.
"""
import json
import logging
from typing import List

import numpy as np
import pdal
from numpy.lib import recfunctions as rfn
from scipy.interpolate import LinearNDInterpolator, NearestNDInterpolator

logger = logging.getLogger(__name__)


def normalize_height(
input_pipeline: pdal.Pipeline,
output_file: str,
filter_dimension: str,
filter_values: List[int],
height_filter: float = 60.0,
min_height: float = -3.0,
) -> None:
"""
Normalize heights using a Delaunay TIN built from ground points, and write result.

Equivalent to the R pipeline:
normalize_height(algorithm = tin())
filter_poi((Classification <= 5 | Classification == 9) & Z < Height_filter)

Steps:
1. Execute the input pipeline.
2. Build a Delaunay TIN from all points with dtm_marker == 1.
3. Interpolate ground elevation for every point. Points outside the TIN convex
hull are handled by nearest-neighbour extrapolation.
4. Compute Z_ref = Z - interpolated_ground_Z.
5. Filter points by filter_dimension/filter_values.
6. Remove points outside [min_height, height_filter].
7. Write output LAS/LAZ with Z_ref as an extra dimension.

Args:
input_pipeline (pdal.Pipeline): PDAL Pipeline object (may be unexecuted).
output_file (str): Output LAS/LAZ path.
filter_dimension (str): Dimension name used to filter output points.
filter_values (List[int]): Values to keep along filter_dimension.
height_filter (float): Upper height threshold in metres (default: 60 m).
min_height (float): Lower height threshold in metres (default: -3 m).

Raises:
ValueError: If the pipeline produces no arrays, or if there are fewer than
3 ground points to build a TIN.
"""
# Execute the pipeline on all points
pipeline = input_pipeline if isinstance(input_pipeline, pdal.Pipeline) else pdal.Pipeline() | input_pipeline
pipeline.execute()

arrays = pipeline.arrays
if not arrays:
raise ValueError("No arrays produced by the pipeline.")
points = arrays[0]

# Select ground points for DTM computation: all points with dtm_marker == 1
ground_mask = points["dtm_marker"] == 1
logger.debug("%d ground points with dtm_marker=1", int(ground_mask.sum()))
ground_points = points[ground_mask]

if len(ground_points) < 3:
raise ValueError(f"Not enough ground points to build a TIN (found {len(ground_points)}, need at least 3).")

# Build Delaunay TIN interpolator from ground points (X, Y) → Z
# LinearNDInterpolator: builds a Delaunay triangulation from ground points.
# For each point (X, Y), finds the triangle it falls in and computes Z by
# barycentric interpolation between the 3 vertices. Equivalent to lidR::tin().
# Returns NaN for points outside the convex hull of ground points.
#
# NearestNDInterpolator: fallback for points outside the convex hull (e.g. at
# tile edges where vegetation points exceed the ground point extent).
# Returns the Z of the nearest ground point.
ground_xy = np.column_stack([ground_points["X"], ground_points["Y"]])
ground_z = ground_points["Z"]
lin_interp = LinearNDInterpolator(ground_xy, ground_z)
nn_interp = NearestNDInterpolator(ground_xy, ground_z)

# Interpolate ground elevation for all points
all_xy = np.column_stack([points["X"], points["Y"]])
ground_z_interp = lin_interp(all_xy)

# Fill NaN (outside convex hull) with nearest-neighbour extrapolation
nan_mask = np.isnan(ground_z_interp)
if np.any(nan_mask):
logger.debug("%d points outside TIN convex hull — using nearest-neighbour extrapolation", int(nan_mask.sum()))
ground_z_interp[nan_mask] = nn_interp(all_xy[nan_mask])

# Compute height above ground
hag = points["Z"] - ground_z_interp

# Filter points by filter_dimension/filter_values
if filter_dimension and filter_values:
dim_mask = np.isin(points[filter_dimension], filter_values)
points = points[dim_mask]
hag = hag[dim_mask]
logger.debug("Dimension filter on %s %s: %d points kept", filter_dimension, filter_values, len(points))

# Filter by height bounds [min_height, height_filter]
valid = (hag >= min_height) & (hag <= height_filter)
pct_removed = (1 - valid.sum() / len(points)) * 100
logger.debug("Height filter [%.1f, %.1f]: %.1f%% points removed", min_height, height_filter, pct_removed)

points_out = points[valid]
hag_out = hag[valid].astype(np.float64)

# Add Z_ref dimension (= HeightAboveGround)
points_with_zref = rfn.append_fields(points_out, "Z_ref", hag_out, dtypes=np.float64, usemask=False)

# Write output
writer_pipeline = pdal.Pipeline(
json.dumps(
{
"pipeline": [
{
"type": "writers.las",
"filename": str(output_file),
"extra_dims": "all",
"forward": "all",
"minor_version": "4",
}
]
}
),
arrays=[points_with_zref],
)
writer_pipeline.execute()
Loading
Loading