diff --git a/src/graphomotor/core/models.py b/src/graphomotor/core/models.py index 34aaa61..73dccc1 100644 --- a/src/graphomotor/core/models.py +++ b/src/graphomotor/core/models.py @@ -189,7 +189,6 @@ class LineSegment: points: pd.DataFrame is_error: bool line_number: int - ink_points: np.ndarray = dataclasses.field(default_factory=lambda: np.array([])) ink_time: float = 0.0 think_time: float = 0.0 @@ -203,6 +202,7 @@ class LineSegment: hesitation_duration: float = 0.0 velocities: List[float] = dataclasses.field(default_factory=list) accelerations: List[float] = dataclasses.field(default_factory=list) + ink_points: pd.DataFrame = dataclasses.field(default_factory=pd.DataFrame) def valid_ink_trajectory( self, @@ -230,7 +230,7 @@ def valid_ink_trajectory( end_circle: CircleTarget representing the end circle. Returns: - Tuple of (ink_start_idx: int, ink_end_idx: int) if valid + Tuple of (ink_start_idx: Optional[int], ink_end_idx: Optional[int]) if valid trajectory exists, else (None, None). """ ink_start_idx = None @@ -248,7 +248,6 @@ def valid_ink_trajectory( ): ink_end_idx = idx break - if ( ink_start_idx is not None and ink_end_idx is not None @@ -292,16 +291,15 @@ def calculate_path_optimality( self.path_optimality = optimal_distance / self.distance return - def calculate_velocity_metrics(self, ink_points: pd.DataFrame) -> None: - """Get distance, velocity, and acceleration metrics of a LineSegment. + def calculate_velocity_metrics(self) -> None: + """Get velocity metrics of a LineSegment. Args: self: LineSegment object to calculate velocities for. - ink_points: DataFrame of ink points with 'x', 'y', and 'seconds' columns. """ - dx = np.diff(ink_points["x"].values) - dy = np.diff(ink_points["y"].values) - dt = np.diff(ink_points["seconds"].values) + dx = np.diff(self.ink_points["x"].values) + dy = np.diff(self.ink_points["y"].values) + dt = np.diff(self.ink_points["seconds"].values) distances = np.sqrt(dx**2 + dy**2) self.distance = np.sum(distances) @@ -316,3 +314,41 @@ def calculate_velocity_metrics(self, ink_points: pd.DataFrame) -> None: self.accelerations = np.diff(velocities).tolist() return + + def detect_hesitations(self, threshold_percentile: int = 20) -> None: + """Detect hesitations as periods of significantly reduced velocity. + + This function defines a hesitation as any period where the velocity falls below + a certain threshold, which is determined by the specified percentile of the + velocity distribution. It counts the number of distinct hesitation periods and + adds 1 if the line starts with a hesitation. It also calculates the total + duration of hesitations based on the number of points that fall below the + threshold and the time interval between points. + + hesitation_count defaults to 0 and hesitation_duration defaults to 0.0 in the + LineSegment object if there are less than 3 velocity points. This function also + assumes uniform sampling. + + Args: + threshold_percentile: Percentile to determine the velocity threshold for + hesitations (default is 20, meaning the bottom 20% of velocities are + considered hesitations). + """ + if len(self.velocities) < 3: + return + + dt = np.diff(self.ink_points["seconds"].values) + + threshold_velocity = np.percentile(self.velocities, threshold_percentile) + hesitations = self.velocities < threshold_velocity + + hesitation_changes = np.diff(hesitations.astype(int)) + hesitation_count = np.sum(hesitation_changes == 1) + + if hesitations[0]: + hesitation_count += 1 + + self.hesitation_count = hesitation_count + self.hesitation_duration = np.sum(hesitations) * dt[0] + + return diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index 00875ac..d78ee1a 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -3,7 +3,6 @@ import datetime from typing import Dict, cast -import numpy as np import pandas as pd import pytest @@ -256,15 +255,16 @@ def test_uniform_motion() -> None: points=points, is_error=False, line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation ) - segment.calculate_velocity_metrics(points) + segment.calculate_velocity_metrics() assert segment.distance == 3.0 assert segment.mean_speed == 1.0 assert segment.speed_variance == 0.0 - assert np.all(segment.velocities) == 1.0 - assert np.all(segment.accelerations) == 0.0 + assert segment.velocities == [1.0, 1.0, 1.0] + assert segment.accelerations == [0.0, 0.0] def test_accelerating_motion() -> None: @@ -282,13 +282,14 @@ def test_accelerating_motion() -> None: points=points, is_error=False, line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation ) - segment.calculate_velocity_metrics(points) + segment.calculate_velocity_metrics() assert segment.distance == 9.0 assert segment.mean_speed == 3.0 - assert segment.speed_variance == pytest.approx(2.6666666666666665) + assert segment.speed_variance > 0.0 assert segment.velocities == [1.0, 3.0, 5.0] assert segment.accelerations == [2.0, 2.0] @@ -308,15 +309,16 @@ def test_velocity_two_points_only() -> None: points=points, is_error=False, line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation ) - segment.calculate_velocity_metrics(points) + segment.calculate_velocity_metrics() assert segment.distance == 5.0 assert segment.mean_speed == 2.5 assert segment.speed_variance == 0.0 assert segment.velocities == [2.5] - assert segment.accelerations == [] + assert segment.accelerations == [] # No acceleration with only one velocity point def test_decelerating_motion() -> None: @@ -334,9 +336,10 @@ def test_decelerating_motion() -> None: points=points, is_error=False, line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation ) - segment.calculate_velocity_metrics(points) + segment.calculate_velocity_metrics() assert segment.distance == 9.0 assert segment.mean_speed == 3.0 @@ -360,12 +363,113 @@ def test_stationary_motion() -> None: points=points, is_error=False, line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation ) - segment.calculate_velocity_metrics(points) + segment.calculate_velocity_metrics() assert segment.distance == 0.0 assert segment.mean_speed == 0.0 assert segment.speed_variance == 0.0 assert segment.velocities == [0.0, 0.0] assert segment.accelerations == [0.0] + + +def test_no_hesitations_uniform_motion() -> None: + """Test with uniform motion where all velocities are equal.""" + points = pd.DataFrame( + { + "x": [0, 1, 2, 3], + "y": [0, 0, 0, 0], + "seconds": [0, 1, 2, 3], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation + ) + + segment.calculate_velocity_metrics() + segment.detect_hesitations() + + assert segment.hesitation_count == 0 + assert segment.hesitation_duration == 0.0 + + +def test_hesitation_at_start() -> None: + """Test when the line starts with a hesitation.""" + points = pd.DataFrame( + { + "x": [0, 0.1, 1, 2], + "y": [0, 0.1, 0, 0], + "seconds": [0, 1, 2, 3], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation + ) + + segment.calculate_velocity_metrics() + segment.detect_hesitations() + + assert segment.hesitation_count == 1 + assert segment.hesitation_duration == 1.0 + + +def test_multiple_hesitations() -> None: + """Test when there are multiple hesitation periods.""" + points = pd.DataFrame( + { + "x": [0, 100, 100.1, 200, 200.1, 300, 400, 500, 600], + "y": [0, 0, 0, 0, 0, 0, 0, 0, 0], + "seconds": [0, 1, 2, 3, 4, 5, 6, 7, 8], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation + ) + + segment.calculate_velocity_metrics() + segment.detect_hesitations() + + assert segment.hesitation_count == 2 + assert segment.hesitation_duration == 2.0 + + +def test_less_than_three_velocities() -> None: + """Test early return when velocities length is less than 3.""" + points = pd.DataFrame( + { + "x": [0, 1], + "y": [0, 0], + "seconds": [0, 1], + } + ) + segment = models.LineSegment( + start_label="1", + end_label="2", + points=points, + is_error=False, + line_number=1, + ink_points=points, # Pre-assign ink_points for velocity calculation + ) + + segment.calculate_velocity_metrics() + segment.detect_hesitations() + + assert segment.hesitation_count == 0 + assert segment.hesitation_duration == 0.0