Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5f40b12
wrote calculate_velocity_metrics
cgmaiorano Jan 19, 2026
de32337
add distance assign
cgmaiorano Jan 19, 2026
591a752
new classmethod for validating dataframe does not contain duplicate t…
cgmaiorano Feb 10, 2026
ac26494
added UTC_timestap check to validator and wrote unit test
cgmaiorano Feb 10, 2026
0952f4a
adjust unit test for validator
cgmaiorano Feb 10, 2026
3983c9c
fixing other unit tests by adding required columns
cgmaiorano Feb 10, 2026
3843b46
update validate data function to only apply to trails
cgmaiorano Feb 10, 2026
a3efd89
revert validator test back to only seconds
cgmaiorano Feb 10, 2026
e0cdea0
added new line checking for task name in metadata
cgmaiorano Feb 10, 2026
3489b85
remove validator function - will be its own new issue
cgmaiorano Feb 10, 2026
eb4b522
ruff reformat
cgmaiorano Feb 10, 2026
4e4ca3e
remove divide by 0 replacement
cgmaiorano Feb 10, 2026
a3a1918
unit tests for velocity function
cgmaiorano Feb 10, 2026
e6c7536
Merge branch 'main' into 96-task-write-trails-velocity-feature-functi…
cgmaiorano Feb 10, 2026
467553c
fix small bug from merge from main into branch
cgmaiorano Feb 10, 2026
10957f9
ruff reformat
cgmaiorano Feb 10, 2026
9e4883e
wrote detect_hesitations
cgmaiorano Feb 10, 2026
eded58d
adjusting the functions for ink_points which no longer will be in the…
cgmaiorano Feb 10, 2026
679cffc
made ink_points a class attribute again and fixed all unit tests
cgmaiorano Feb 10, 2026
e9f54eb
ruff reformat
cgmaiorano Feb 10, 2026
91d5e56
Merge branch 'main' into 97-task-write-trails-velocity-feature-functi…
cgmaiorano Feb 12, 2026
369887d
ruff remvoe unused import
cgmaiorano Feb 12, 2026
ea1379b
ruff format
cgmaiorano Feb 12, 2026
8e14942
updating requested fixes
cgmaiorano Feb 16, 2026
dec843f
fix tests and ruff
cgmaiorano Feb 16, 2026
af311e7
Update src/graphomotor/core/models.py
cgmaiorano Feb 16, 2026
793d2bf
edit
cgmaiorano Feb 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions src/graphomotor/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ class LineSegment:
points: pd.DataFrame
is_error: bool
line_number: int
ink_points: np.ndarray = dataclasses.field(default_factory=lambda: np.array([]))

ink_time: float = 0.0
think_time: float = 0.0
Expand All @@ -203,6 +202,7 @@ class LineSegment:
hesitation_duration: float = 0.0
velocities: List[float] = dataclasses.field(default_factory=list)
accelerations: List[float] = dataclasses.field(default_factory=list)
ink_points: pd.DataFrame = dataclasses.field(default_factory=pd.DataFrame)

def valid_ink_trajectory(
self,
Expand Down Expand Up @@ -230,7 +230,7 @@ def valid_ink_trajectory(
end_circle: CircleTarget representing the end circle.

Returns:
Tuple of (ink_start_idx: int, ink_end_idx: int) if valid
Tuple of (ink_start_idx: Optional[int], ink_end_idx: Optional[int]) if valid
trajectory exists, else (None, None).
"""
ink_start_idx = None
Expand All @@ -248,7 +248,6 @@ def valid_ink_trajectory(
):
ink_end_idx = idx
break

if (
ink_start_idx is not None
and ink_end_idx is not None
Expand Down Expand Up @@ -292,16 +291,15 @@ def calculate_path_optimality(
self.path_optimality = optimal_distance / self.distance
return

def calculate_velocity_metrics(self, ink_points: pd.DataFrame) -> None:
"""Get distance, velocity, and acceleration metrics of a LineSegment.
def calculate_velocity_metrics(self) -> None:
"""Get velocity metrics of a LineSegment.
Comment thread
cgmaiorano marked this conversation as resolved.

Args:
self: LineSegment object to calculate velocities for.
ink_points: DataFrame of ink points with 'x', 'y', and 'seconds' columns.
"""
dx = np.diff(ink_points["x"].values)
dy = np.diff(ink_points["y"].values)
dt = np.diff(ink_points["seconds"].values)
dx = np.diff(self.ink_points["x"].values)
dy = np.diff(self.ink_points["y"].values)
dt = np.diff(self.ink_points["seconds"].values)

distances = np.sqrt(dx**2 + dy**2)
self.distance = np.sum(distances)
Expand All @@ -316,3 +314,41 @@ def calculate_velocity_metrics(self, ink_points: pd.DataFrame) -> None:
self.accelerations = np.diff(velocities).tolist()

return

def detect_hesitations(self, threshold_percentile: int = 20) -> None:
"""Detect hesitations as periods of significantly reduced velocity.

This function defines a hesitation as any period where the velocity falls below
a certain threshold, which is determined by the specified percentile of the
velocity distribution. It counts the number of distinct hesitation periods and
adds 1 if the line starts with a hesitation. It also calculates the total
duration of hesitations based on the number of points that fall below the
threshold and the time interval between points.

hesitation_count defaults to 0 and hesitation_duration defaults to 0.0 in the
LineSegment object if there are less than 3 velocity points. This function also
assumes uniform sampling.

Args:
threshold_percentile: Percentile to determine the velocity threshold for
hesitations (default is 20, meaning the bottom 20% of velocities are
considered hesitations).
"""
if len(self.velocities) < 3:
Comment thread
cgmaiorano marked this conversation as resolved.
return

dt = np.diff(self.ink_points["seconds"].values)

threshold_velocity = np.percentile(self.velocities, threshold_percentile)
hesitations = self.velocities < threshold_velocity

hesitation_changes = np.diff(hesitations.astype(int))
hesitation_count = np.sum(hesitation_changes == 1)

if hesitations[0]:
hesitation_count += 1

self.hesitation_count = hesitation_count
Comment thread
cgmaiorano marked this conversation as resolved.
self.hesitation_duration = np.sum(hesitations) * dt[0]

return
124 changes: 114 additions & 10 deletions tests/unit/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime
from typing import Dict, cast

import numpy as np
import pandas as pd
import pytest

Expand Down Expand Up @@ -256,15 +255,16 @@ def test_uniform_motion() -> None:
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics(points)
segment.calculate_velocity_metrics()

assert segment.distance == 3.0
assert segment.mean_speed == 1.0
assert segment.speed_variance == 0.0
assert np.all(segment.velocities) == 1.0
assert np.all(segment.accelerations) == 0.0
assert segment.velocities == [1.0, 1.0, 1.0]
assert segment.accelerations == [0.0, 0.0]


def test_accelerating_motion() -> None:
Expand All @@ -282,13 +282,14 @@ def test_accelerating_motion() -> None:
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics(points)
segment.calculate_velocity_metrics()

assert segment.distance == 9.0
assert segment.mean_speed == 3.0
assert segment.speed_variance == pytest.approx(2.6666666666666665)
assert segment.speed_variance > 0.0
assert segment.velocities == [1.0, 3.0, 5.0]
assert segment.accelerations == [2.0, 2.0]

Expand All @@ -308,15 +309,16 @@ def test_velocity_two_points_only() -> None:
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics(points)
segment.calculate_velocity_metrics()

assert segment.distance == 5.0
assert segment.mean_speed == 2.5
assert segment.speed_variance == 0.0
assert segment.velocities == [2.5]
assert segment.accelerations == []
assert segment.accelerations == [] # No acceleration with only one velocity point


def test_decelerating_motion() -> None:
Expand All @@ -334,9 +336,10 @@ def test_decelerating_motion() -> None:
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics(points)
segment.calculate_velocity_metrics()

assert segment.distance == 9.0
assert segment.mean_speed == 3.0
Expand All @@ -360,12 +363,113 @@ def test_stationary_motion() -> None:
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics(points)
segment.calculate_velocity_metrics()

assert segment.distance == 0.0
assert segment.mean_speed == 0.0
assert segment.speed_variance == 0.0
assert segment.velocities == [0.0, 0.0]
assert segment.accelerations == [0.0]


def test_no_hesitations_uniform_motion() -> None:
"""Test with uniform motion where all velocities are equal."""
points = pd.DataFrame(
{
"x": [0, 1, 2, 3],
"y": [0, 0, 0, 0],
"seconds": [0, 1, 2, 3],
}
)
segment = models.LineSegment(
start_label="1",
end_label="2",
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics()
segment.detect_hesitations()

assert segment.hesitation_count == 0
assert segment.hesitation_duration == 0.0


def test_hesitation_at_start() -> None:
"""Test when the line starts with a hesitation."""
points = pd.DataFrame(
{
"x": [0, 0.1, 1, 2],
"y": [0, 0.1, 0, 0],
"seconds": [0, 1, 2, 3],
}
)
segment = models.LineSegment(
start_label="1",
end_label="2",
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics()
segment.detect_hesitations()

assert segment.hesitation_count == 1
assert segment.hesitation_duration == 1.0


def test_multiple_hesitations() -> None:
"""Test when there are multiple hesitation periods."""
points = pd.DataFrame(
{
"x": [0, 100, 100.1, 200, 200.1, 300, 400, 500, 600],
"y": [0, 0, 0, 0, 0, 0, 0, 0, 0],
"seconds": [0, 1, 2, 3, 4, 5, 6, 7, 8],
}
)
segment = models.LineSegment(
start_label="1",
end_label="2",
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics()
segment.detect_hesitations()

assert segment.hesitation_count == 2
assert segment.hesitation_duration == 2.0


def test_less_than_three_velocities() -> None:
"""Test early return when velocities length is less than 3."""
points = pd.DataFrame(
{
"x": [0, 1],
"y": [0, 0],
"seconds": [0, 1],
}
)
segment = models.LineSegment(
start_label="1",
end_label="2",
points=points,
is_error=False,
line_number=1,
ink_points=points, # Pre-assign ink_points for velocity calculation
)

segment.calculate_velocity_metrics()
segment.detect_hesitations()

assert segment.hesitation_count == 0
assert segment.hesitation_duration == 0.0