diff --git a/src/graphomotor/core/models.py b/src/graphomotor/core/models.py index 73dccc1..514ecc5 100644 --- a/src/graphomotor/core/models.py +++ b/src/graphomotor/core/models.py @@ -9,6 +9,8 @@ import pydantic import scipy.spatial.distance as dist +from graphomotor.core import config + class Drawing(pydantic.BaseModel): """Class representing a drawing task, encapsulating both raw data and metadata. @@ -248,12 +250,6 @@ def valid_ink_trajectory( ): ink_end_idx = idx break - if ( - ink_start_idx is not None - and ink_end_idx is not None - and ink_end_idx > ink_start_idx - ): - self.ink_points = self.points.iloc[ink_start_idx : ink_end_idx + 1].copy() return ink_start_idx, ink_end_idx @@ -352,3 +348,140 @@ def detect_hesitations(self, threshold_percentile: int = 20) -> None: self.hesitation_duration = np.sum(hesitations) * dt[0] return + + def calculate_smoothness(self) -> None: + """Calculate path smoothness based on Root Mean Square (RMS) curvature. + + Represents the curvature per unit arc length. + Lower values indicate smoother drawings. Penalizes sharp corners (e.g., + 90° turns) and noisy corrections. Normalized by arc length to reduce + sampling-rate dependence. + """ + if len(self.ink_points) < 3: + return + + xy = self.ink_points[["x", "y"]].to_numpy() + + forward_vector = xy[1:-1] - xy[:-2] + backward_vector = xy[2:] - xy[1:-1] + + forward_norm = np.linalg.norm(forward_vector, axis=1) + backward_norm = np.linalg.norm(backward_vector, axis=1) + + valid = (forward_norm > 0) & (backward_norm > 0) + if not np.any(valid): + return + + valid_forward_vector = forward_vector[valid] + valid_backward_vector = backward_vector[valid] + valid_forward_norm = forward_norm[valid] + valid_backward_norm = backward_norm[valid] + + cos_angle = (valid_forward_vector * valid_backward_vector).sum(axis=1) / ( + valid_forward_norm * valid_backward_norm + ) + cos_angle = np.clip(cos_angle, -1.0, 1.0) + + angles = np.arccos(cos_angle) + + avg_segment_length = (valid_forward_norm + valid_backward_norm) / 2.0 + curvatures = angles / avg_segment_length + + self.smoothness = float(np.sqrt(np.mean(curvatures**2))) + + return + + def compute_segment_metrics( + self, circles: dict[str, dict[str, CircleTarget]], trail_id: str + ) -> None: + """Compute all metrics for a line segment. + + This function computes various metrics for the line segment, including ink time, + velocity metrics, path optimality, smoothness, and hesitation detection. It + first determines the valid ink trajectory between the start and end circles. If + a valid trajectory is found, it updates the ink_points attribute and calculates + the metrics. + + Args: + circles: A dictionary mapping each trail type to dictionaries of + CircleTarget instances (output of load_scaled_circles in config). + trail_id: Trail identifier for circle lookup. + """ + logger = config.get_logger() + trail_circles = circles[trail_id] + points = self.points.copy() + + if len(points) < 2: + logger.warning( + "Not enough points to calculate metrics for line segment: " + "start=%s end=%s", + self.start_label, + self.end_label, + ) + return + + if self.start_label not in trail_circles or self.end_label not in trail_circles: + logger.warning( + "Missing start/end labels: start=%s end=%s available=%s", + self.start_label, + self.end_label, + list(trail_circles.keys()), + ) + return + + start_circle = trail_circles[self.start_label] + end_circle = trail_circles[self.end_label] + + ink_start_idx, ink_end_idx = self.valid_ink_trajectory(start_circle, end_circle) + + if ink_start_idx is None: + logger.warning( + "No valid ink trajectory found for line segment: start=%s end=%s", + self.start_label, + self.end_label, + ) + return + if ink_end_idx is None: + self.ink_points = points.iloc[ink_start_idx:].copy() + if len(self.ink_points) < 2: + logger.warning( + "Not enough ink points to calculate metrics for line segment: " + "start=%s end=%s", + self.start_label, + self.end_label, + ) + return + self.ink_time = ( + self.ink_points.iloc[-1]["seconds"] - self.ink_points.iloc[0]["seconds"] + ) + return + if ink_end_idx <= ink_start_idx: + logger.warning( + "Invalid ink trajectory: end index (%d) is not greater than " + "start index (%d) for line segment: start=%s end=%s", + ink_end_idx, + ink_start_idx, + self.start_label, + self.end_label, + ) + return + self.ink_points = self.points.iloc[ink_start_idx : ink_end_idx + 1].copy() + + if len(self.ink_points) < 2: + logger.warning( + "Not enough ink points to calculate metrics for line segment: " + "start=%s end=%s", + self.start_label, + self.end_label, + ) + return + + self.ink_time = ( + self.ink_points.iloc[-1]["seconds"] - self.ink_points.iloc[0]["seconds"] + ) + self.calculate_velocity_metrics() + self.calculate_path_optimality(start_circle, end_circle) + self.calculate_smoothness() + self.detect_hesitations() + + return diff --git a/src/graphomotor/features/trails/drawing_metrics.py b/src/graphomotor/features/trails/drawing_metrics.py index 98a2a80..f1b3bae 100644 --- a/src/graphomotor/features/trails/drawing_metrics.py +++ b/src/graphomotor/features/trails/drawing_metrics.py @@ -1,8 +1,5 @@ """Feature extraction module for drawing error-based metrics in trails drawing data.""" -import numpy as np -import pandas as pd - from graphomotor.core import models @@ -43,49 +40,3 @@ def percent_accurate_paths(drawing: models.Drawing) -> dict[str, float]: (drawing.data["correct_path"] == drawing.data["actual_path"]).mean() * 100 ) } - - -def calculate_smoothness(points: pd.DataFrame) -> float: - """Calculate path smoothness based on Root Mean Square (RMS) curvature. - - Represents the curvature per unit arc length. - Lower values indicate smoother drawings. Penalizes sharp corners (e.g., 90° turns) - and noisy corrections. Normalized by arc length to reduce sampling-rate dependence. - - Args: - points: DataFrame representing drawing points. - - Returns: - Smoothness metric as a float. - """ - if len(points) < 3: - return 0.0 - - xy = points[["x", "y"]].to_numpy() - - forward_vector = xy[1:-1] - xy[:-2] - backward_vector = xy[2:] - xy[1:-1] - - forward_norm = np.linalg.norm(forward_vector, axis=1) - backward_norm = np.linalg.norm(backward_vector, axis=1) - - valid = (forward_norm > 0) & (backward_norm > 0) - if not np.any(valid): - return 0.0 - - valid_forward_vector = forward_vector[valid] - valid_backward_vector = backward_vector[valid] - valid_forward_norm = forward_norm[valid] - valid_backward_norm = backward_norm[valid] - - cos_angle = (valid_forward_vector * valid_backward_vector).sum(axis=1) / ( - valid_forward_norm * valid_backward_norm - ) - cos_angle = np.clip(cos_angle, -1.0, 1.0) - - angles = np.arccos(cos_angle) - - avg_segment_length = (valid_forward_norm + valid_backward_norm) / 2.0 - curvatures = angles / avg_segment_length - - return float(np.sqrt(np.mean(curvatures**2))) diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index d78ee1a..798136f 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -2,7 +2,9 @@ import datetime from typing import Dict, cast +from unittest.mock import patch +import numpy as np import pandas as pd import pytest @@ -473,3 +475,370 @@ def test_less_than_three_velocities() -> None: assert segment.hesitation_count == 0 assert segment.hesitation_duration == 0.0 + + +def test_smoothness_less_than_three_points() -> None: + """Less than 3 points cannot define curvature.""" + points = pd.DataFrame( + { + "x": [0, 1], + "y": [0, 0], + "seconds": [0, 1], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for smoothness calculation + ) + segment.calculate_smoothness() + assert segment.smoothness == 0.0 + + +def test_smoothness_straight_line() -> None: + """Collinear points have zero curvature.""" + points = pd.DataFrame({"x": [0, 1, 2, 3], "y": [0, 0, 0, 0]}) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for smoothness calculation + ) + segment.calculate_smoothness() + assert segment.smoothness == 0.0 + + +def test_smoothness_single_right_angle() -> None: + """A single 90-degree corner should produce a large smoothness value. + + (non-zero), since sharp turns are penalized. + """ + points = pd.DataFrame({"x": [0, 1, 1], "y": [0, 0, 1]}) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for smoothness calculation + ) + expected = np.pi / 2 + segment.calculate_smoothness() + assert np.isclose(segment.smoothness, expected) + + +def test_smoothness_varied_angles() -> None: + """Multiple angles should produce RMS curvature. + + Path has a 90° turn followed by a 45° turn. + """ + points = pd.DataFrame({"x": [0, 1, 1, 2], "y": [0, 0, 1, 2]}) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for smoothness calculation + ) + c1 = np.pi / 2 + c2 = (np.pi / 4) / ((1 + np.sqrt(2)) / 2) + expected = np.sqrt((c1**2 + c2**2) / 2) + + segment.calculate_smoothness() + + assert np.isclose(segment.smoothness, expected) + + +def test_smoothness_zero_length_segments() -> None: + """Zero-length segments should be skipped; no angles → smoothness 0.""" + points = pd.DataFrame({"x": [0, 1, 1, 2], "y": [0, 0, 0, 0]}) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for smoothness calculation + ) + segment.calculate_smoothness() + assert segment.smoothness == 0.0 + + +def test_smoothness_single_180_degree_turn() -> None: + """A single 180-degree turn should produce a very large smoothness value. + + Since it represents maximal curvature. + """ + points = pd.DataFrame( + { + "x": [0, 1, 0], + "y": [0, 0, 0], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for smoothness calculation + ) + expected = np.pi + segment.calculate_smoothness() + assert np.isclose(segment.smoothness, expected) + + +def test_compute_segment_metrics_less_than_two_points() -> None: + """Test early return when segment has less than 2 points.""" + points = pd.DataFrame( + { + "x": [0], + "y": [0], + "seconds": [0], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ) + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert segment.ink_time == 0.0 + assert segment.distance == 0.0 + assert len(segment.velocities) == 0 + + +def test_compute_segment_metrics_invalid_start_label() -> None: + """Test early return when start_label not in trail circles.""" + points = pd.DataFrame( + { + "x": [0, 50, 100], + "y": [0, 0, 0], + "seconds": [0, 1, 2], + } + ) + segment = models.LineSegment( + start_label="999", + end_label="2", + points=points, + is_error=False, + line_number=1, + ) + + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert segment.ink_time == 0.0 + assert segment.distance == 0.0 + assert len(segment.velocities) == 0 + + +def test_compute_segment_metrics_invalid_end_label() -> None: + """Test early return when end_label not in trail circles.""" + points = pd.DataFrame( + { + "x": [0, 50, 100], + "y": [0, 0, 0], + "seconds": [0, 1, 2], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="999", + points=points, + is_error=False, + line_number=1, + ) + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert segment.ink_time == 0.0 + assert segment.distance == 0.0 + assert len(segment.velocities) == 0 + + +def test_compute_segment_metrics_valid_trajectory() -> None: + """Test successful computation of all metrics with valid trajectory.""" + points = pd.DataFrame( + { + "x": [0, 25, 50, 75, 100], + "y": [0, 0, 0, 0, 0], + "seconds": [0, 1, 2, 3, 4], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ) + + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + with patch.object(segment, "valid_ink_trajectory", return_value=(0, 4)): + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert np.isclose(segment.ink_time, 4.0) + assert len(segment.ink_points) == 5 + assert segment.distance > 0.0 + assert segment.mean_speed > 0.0 + assert len(segment.velocities) > 0 + assert segment.path_optimality > 0.0 + assert np.isclose(segment.smoothness, 0.0) + + +def test_compute_segment_metrics_ink_end_equals_ink_start() -> None: + """Test when ink_end_idx equals ink_start_idx (no valid trajectory).""" + points = pd.DataFrame( + { + "x": [0, 50, 100], + "y": [0, 0, 0], + "seconds": [0, 1, 2], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ) + + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + with patch.object(segment, "valid_ink_trajectory", return_value=(1, 1)): + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert segment.distance == 0.0 + assert len(segment.velocities) == 0 + + +def test_compute_segment_metrics_ink_end_before_ink_start() -> None: + """Test when ink_end_idx is before ink_start_idx (invalid trajectory).""" + points = pd.DataFrame( + { + "x": [0, 50, 100], + "y": [0, 0, 0], + "seconds": [0, 1, 2], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ) + + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + with patch.object(segment, "valid_ink_trajectory", return_value=(2, 0)): + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert segment.distance == 0.0 + assert len(segment.velocities) == 0 + + +def test_compute_segment_metrics_only_start_index_found() -> None: + """Test when only ink_start_idx is found (end is None).""" + points = pd.DataFrame( + { + "x": [0, 25, 50, 75], + "y": [0, 0, 0, 0], + "seconds": [0, 1, 2, 3], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ) + + circles = { + "A": { + "1": models.CircleTarget( + order=1, label="1", center_x=0, center_y=0, radius=10 + ), + "2": models.CircleTarget( + order=2, label="2", center_x=100, center_y=0, radius=10 + ), + } + } + + with patch.object(segment, "valid_ink_trajectory", return_value=(1, None)): + segment.compute_segment_metrics(circles=circles, trail_id="A") + + assert np.isclose(segment.ink_time, 2.0) + assert len(segment.ink_points) == 3 + assert segment.distance == 0.0 + assert len(segment.velocities) == 0 diff --git a/tests/unit/test_trails_drawing_metrics.py b/tests/unit/test_trails_drawing_metrics.py index 6507b74..f78cf69 100644 --- a/tests/unit/test_trails_drawing_metrics.py +++ b/tests/unit/test_trails_drawing_metrics.py @@ -1,6 +1,5 @@ """Test cases for drawing_metrics.py functions.""" -import numpy as np import pandas as pd import pytest @@ -63,64 +62,3 @@ def test_percent_accurate_paths_sample_data() -> None: result = drawing_metrics.percent_accurate_paths(drawing) assert result == {"percent_accurate_paths": 100.0} - - -def test_smoothness_less_than_three_points() -> None: - """Less than 3 points cannot define curvature.""" - points = pd.DataFrame({"x": [0, 1], "y": [0, 1]}) - assert drawing_metrics.calculate_smoothness(points) == 0.0 - - -def test_smoothness_straight_line() -> None: - """Collinear points have zero curvature.""" - points = pd.DataFrame({"x": [0, 1, 2, 3], "y": [0, 0, 0, 0]}) - assert drawing_metrics.calculate_smoothness(points) == 0.0 - - -def test_smoothness_single_right_angle() -> None: - """A single 90-degree corner should produce a large smoothness value. - - (non-zero), since sharp turns are penalized. - """ - points = pd.DataFrame({"x": [0, 1, 1], "y": [0, 0, 1]}) - expected = np.pi / 2 - smoothness = drawing_metrics.calculate_smoothness(points) - assert np.isclose(smoothness, expected) - - -def test_smoothness_varied_angles() -> None: - """Multiple angles should produce RMS curvature. - - Path has a 90° turn followed by a 45° turn. - """ - points = pd.DataFrame({"x": [0, 1, 1, 2], "y": [0, 0, 1, 2]}) - c1 = np.pi / 2 - c2 = (np.pi / 4) / ((1 + np.sqrt(2)) / 2) - expected = np.sqrt((c1**2 + c2**2) / 2) - - smoothness = drawing_metrics.calculate_smoothness(points) - - assert np.isclose(smoothness, expected) - - -def test_smoothness_zero_length_segments() -> None: - """Zero-length segments should be skipped; no angles → smoothness 0.""" - points = pd.DataFrame({"x": [0, 1, 1, 2], "y": [0, 0, 0, 0]}) - smoothness = drawing_metrics.calculate_smoothness(points) - assert smoothness == 0.0 - - -def test_smoothness_single_180_degree_turn() -> None: - """A single 180-degree turn should produce a very large smoothness value. - - Since it represents maximal curvature. - """ - points = pd.DataFrame( - { - "x": [0, 1, 0], - "y": [0, 0, 0], - } - ) - expected = np.pi - smoothness = drawing_metrics.calculate_smoothness(points) - assert np.isclose(smoothness, expected)