Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pandas>=2.2.3",
"pydantic>=2.11.1"
"pydantic>=2.11.1",
"scipy>=1.15.2"
]

[dependency-groups]
Expand Down
12 changes: 6 additions & 6 deletions src/graphomotor/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ class Spiral(BaseModel):
"""A class representing a spiral drawing, encapsulating both raw data and metadata.

Attributes:
data: DataFrame containing drawing data with required columns
(line_number, x, y, UTC_Timestamp, seconds)
data: DataFrame containing drawing data with required columns (line_number, x,
y, UTC_Timestamp, seconds).
metadata: Dictionary containing metadata about the spiral:
- id: Unique identifier for the participant
- hand: Hand used ('Dom' for dominant, 'NonDom' for non-dominant)
- task: Task name
- start_time: Start time of drawing
- id: Unique identifier for the participant,
- hand: Hand used ('Dom' for dominant, 'NonDom' for non-dominant),
- task: Task name,
- start_time: Start time of drawing.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)
Expand Down
1 change: 1 addition & 0 deletions src/graphomotor/features/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""".. include:: ../../README.md""" # noqa: D415
120 changes: 120 additions & 0 deletions src/graphomotor/features/distance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Feature extraction module for distance-based metrics in spiral drawing data."""

import numpy as np
from scipy import stats
from scipy.spatial import distance

from graphomotor.core import models


def _segment_data(data: np.ndarray, start_prop: float, end_prop: float) -> np.ndarray:
"""Extract segment of data based on given proportion range.

Args:
Comment thread
alperkent marked this conversation as resolved.
data: Data to segment.
start_prop: Start proportion, [0-1).
end_prop: End proportion, (0-1].

Returns:
Segmented data.
"""
if not (0 <= start_prop < end_prop <= 1):
raise ValueError(
"Proportions must be between 0 and 1, "
"and start_prop must be less than end_prop"
)
num_samples = len(data)
start_idx = int(start_prop * num_samples)
end_idx = int(end_prop * num_samples)
return data[start_idx:end_idx]


def calculate_hausdorff_metrics(
spiral: models.Spiral, reference_spiral: np.ndarray
) -> dict:
"""Calculate Hausdorff distance metrics for a spiral object.

This function computes multiple features based on the Hausdorff distance between a
drawn spiral and a reference (ideal) spiral, as described in the [1]. Implementation
Comment thread
alperkent marked this conversation as resolved.
Outdated
is based on the original R script provided with the publication. The Hausdorff
distance measures the maximum distance of a set to the nearest point in the other
set. This metric and its derivatives capture various aspects of the spatial
relationship between the drawn and reference spirals. Calculated features include:
- hausdorff_distance_maximum: The maximum of the directed Hausdorff distances
between the data points and the reference data points,
- hausdorff_distance_sum: The sum of the directed Hausdorff distances,
- hausdorff_distance_sum_per_second: The sum of the directed Hausdorff distances
divided by the total drawing duration,
- hausdorff_distance_interquartile_range: The interquartile range of the
directed Hausdorff distances,
- hausdorff_distance_start_segment_maximum_normalized: The maximum of the
directed Hausdorff distances between the beginning segment (0% to 25%) of
data points and the beginning segment of reference data points divided by
the number of data points in the beginning segment,
- hausdorff_distance_end_segment_maximum_normalized: The maximum of the directed
Hausdorff distances in the ending segment (75% to 100%) of data points and
the ending segment of reference data points divided by the number of data
points in the ending segment,
- hausdorff_distance_middle_segment_maximum: The maximum of the directed
Hausdorff distances in the middle segment (15% to 85%) of data points and
the ending segment of reference data points (this metric is not divided by
the number of data points in the middle segment unlike previous ones),
- hausdorff_distance_middle_segment_maximum_per_second: The maximum of the
directed Hausdorff distances in the middle segment divided by the total
drawing duration.

Args:
spiral: Spiral object with drawing data.
reference_spiral: Reference spiral data for comparison.

Returns:
Dictionary containing Hausdorff distance-based features.

References:
[1] Messan, Komi S et al. “Assessment of Smartphone-Based Spiral Tracing in
Multiple Sclerosis Reveals Intra-Individual Reproducibility as a Major
Determinant of the Clinical Utility of the Digital Test.” Frontiers in
medical technology vol. 3 714682. 1 Feb. 2022, doi:10.3389/fmedt.2021.714682
"""
spiral_data = np.column_stack((spiral.data["x"].values, spiral.data["y"].values))

total_duration = spiral.data["seconds"].iloc[-1]

start_segment_data = _segment_data(spiral_data, 0.0, 0.25)
end_segment_data = _segment_data(spiral_data, 0.75, 1.0)
mid_segment_data = _segment_data(spiral_data, 0.15, 0.85)

start_segment_ref = _segment_data(reference_spiral, 0.0, 0.25)
end_segment_ref = _segment_data(reference_spiral, 0.75, 1.0)
mid_segment_ref = _segment_data(reference_spiral, 0.15, 0.85)

haus_dist = [
Comment thread
alperkent marked this conversation as resolved.
distance.directed_hausdorff(spiral_data, reference_spiral)[0],
distance.directed_hausdorff(reference_spiral, spiral_data)[0],
]
haus_dist_start = [
distance.directed_hausdorff(start_segment_data, start_segment_ref)[0],
distance.directed_hausdorff(start_segment_ref, start_segment_data)[0],
]
haus_dist_end = [
distance.directed_hausdorff(end_segment_data, end_segment_ref)[0],
distance.directed_hausdorff(end_segment_ref, end_segment_data)[0],
]
haus_dist_mid = [
distance.directed_hausdorff(mid_segment_data, mid_segment_ref)[0],
distance.directed_hausdorff(mid_segment_ref, mid_segment_data)[0],
]

return {
"hausdorff_distance_maximum": np.max(haus_dist),
"hausdorff_distance_sum": np.sum(haus_dist),
"hausdorff_distance_sum_per_second": np.sum(haus_dist) / total_duration,
"hausdorff_distance_interquartile_range": stats.iqr(haus_dist),
"hausdorff_distance_start_segment_maximum_normalized": np.max(haus_dist_start)
/ len(start_segment_data),
"hausdorff_distance_end_segment_maximum_normalized": np.max(haus_dist_end)
/ len(end_segment_data),
"hausdorff_distance_middle_segment_maximum": np.max(haus_dist_mid),
"hausdorff_distance_middle_segment_maximum_per_second": np.max(haus_dist_mid)
/ total_duration,
}
109 changes: 109 additions & 0 deletions src/graphomotor/utils/reference_spiral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Generate a reference spiral with equidistant points along its arc length."""

import numpy as np
from scipy import integrate, optimize

_SPIRAL_CENTER_X = 50
_SPIRAL_CENTER_Y = 50
_SPIRAL_INITIAL_RADIUS = 0
_SPIRAL_GROWTH_RATE = 1.075
_SPIRAL_TOTAL_ROTATION = 8 * np.pi
_SPIRAL_NUM_POINTS = 10000


def _spiral_arc_length_integrand(t: float) -> float:
"""Calculate the differential arc length at angle t for an Archimedean spiral.

Args:
t: Angle parameter.

Returns:
Differential arc length value.
"""
r_t = _SPIRAL_INITIAL_RADIUS + _SPIRAL_GROWTH_RATE * t
return np.sqrt(r_t**2 + _SPIRAL_GROWTH_RATE**2)


def _calculate_arc_length(theta: float) -> float:
"""Calculate the arc length of the spiral from 0 to theta.

Args:
theta: The angle in radians.

Returns:
The arc length of the spiral from 0 to theta.
"""
return integrate.quad(lambda t: _spiral_arc_length_integrand(t), 0, theta)[0]


def _arc_length_difference(theta: float, target_arc_length: float) -> float:
"""Function to find the root for a given arc length.

Args:
theta: Angle to evaluate.
target_arc_length: Target arc length value.

Returns:
Difference between calculated and target arc length.
"""
return _calculate_arc_length(theta) - target_arc_length


def _find_theta_for_arc_length(target_arc_length: float) -> float:
"""Find the theta value for a given arc length.

Args:
target_arc_length: Target arc length.

Returns:
Angle theta corresponding to the arc length.
"""
solution = optimize.root_scalar(
lambda theta: _arc_length_difference(theta, target_arc_length),
bracket=[0, _SPIRAL_TOTAL_ROTATION],
)
return solution.root


def generate_reference_spiral() -> np.ndarray:
"""Generate a reference spiral with equidistant points along its arc length.

This function creates an Archimedean spiral with points distributed at equal arc
length intervals. The generated spiral serves as a standardized reference template
for feature extraction algorithms that compare user-drawn spirals with an ideal
form.

The algorithm works by:
1. Computing the total arc length for the entire spiral (0 to 8π),
Comment thread
alperkent marked this conversation as resolved.
Outdated
2. Creating equidistant target arc length values,
3. For each target arc length, finding the corresponding theta value that
produces that arc length using numerical root finding,
4. Converting these theta values to Cartesian coordinates.

Mathematical formulas used:
- Spiral equation: r(θ) = a + b·θ
- Arc length differential: ds = √(r(θ)² + b²) dθ
- Arc length from 0 to θ: s(θ) = ∫₀ᶿ √(r(t)² + b²) dt
- Cartesian coordinates: x = cx + r·cos(θ), y = cy + r·sin(θ)

Parameters used:
- Center coordinates: (50, 50)
Comment thread
alperkent marked this conversation as resolved.
Outdated
- Initial radius (a): 0
- Growth rate (b): 1.075
- Total rotation: 4 complete revolutions (θ from 0 to 8π)
- Number of points: 10,000

Returns:
Array with shape (10000, 2) containing Cartesian coordinates of the spiral.
"""
total_arc_length = _calculate_arc_length(_SPIRAL_TOTAL_ROTATION)

arc_length_values = np.linspace(0, total_arc_length, _SPIRAL_NUM_POINTS)

theta_values = np.array([_find_theta_for_arc_length(s) for s in arc_length_values])

r_values = _SPIRAL_INITIAL_RADIUS + _SPIRAL_GROWTH_RATE * theta_values
x_values = _SPIRAL_CENTER_X + r_values * np.cos(theta_values)
y_values = _SPIRAL_CENTER_Y + r_values * np.sin(theta_values)

return np.column_stack((x_values, y_values))
22 changes: 22 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import datetime
import pathlib

import numpy as np
import pandas as pd
import pytest

from graphomotor.core import models
from graphomotor.utils import reference_spiral


@pytest.fixture
def sample_data() -> pathlib.Path:
Expand Down Expand Up @@ -35,3 +39,21 @@ def valid_spiral_metadata() -> dict[str, str | datetime.datetime]:
tz=datetime.timezone.utc,
),
}


@pytest.fixture
def valid_spiral(
valid_spiral_data: pd.DataFrame,
valid_spiral_metadata: dict[str, str | datetime.datetime],
) -> models.Spiral:
"""Create a valid Spiral object."""
return models.Spiral(
data=valid_spiral_data,
metadata=valid_spiral_metadata,
)


@pytest.fixture
def ref_spiral() -> np.ndarray:
"""Create a reference spiral for testing."""
return reference_spiral.generate_reference_spiral()
52 changes: 52 additions & 0 deletions tests/unit/test_distance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Test cases for distance.py functions."""

import numpy as np
import pytest

from graphomotor.core import models
from graphomotor.features import distance


def test_segment_data_valid() -> None:
"""Test that the data is segmented correctly."""
data = np.array([[i, i] for i in range(100)])

segment = distance._segment_data(data, 0.1, 0.3)
assert len(segment) == 20
assert segment[0][0] == 10
assert segment[-1][0] == 29


def test_segment_data_invalid() -> None:
"""Test that invalid percentages raise a ValueError."""
data = np.array([[i, i] for i in range(100)])

with pytest.raises(
ValueError,
match=(
"Proportions must be between 0 and 1, "
"and start_prop must be less than end_prop"
),
):
distance._segment_data(data, 0.6, 0.5)


def test_calculate_hausdorff_metrics(
valid_spiral: models.Spiral, ref_spiral: np.ndarray
) -> None:
"""Test that each Hausdorff metric is calculated."""
metrics = distance.calculate_hausdorff_metrics(valid_spiral, ref_spiral)

expected_metrics = {
"hausdorff_distance_maximum",
"hausdorff_distance_sum",
"hausdorff_distance_sum_per_second",
"hausdorff_distance_interquartile_range",
"hausdorff_distance_start_segment_maximum_normalized",
"hausdorff_distance_end_segment_maximum_normalized",
"hausdorff_distance_middle_segment_maximum",
"hausdorff_distance_middle_segment_maximum_per_second",
}

assert set(metrics.keys()) == expected_metrics
assert all(isinstance(value, float) for value in metrics.values())
17 changes: 17 additions & 0 deletions tests/unit/test_reference_spiral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Test cases for reference_spiral.py functions."""

import numpy as np

from graphomotor.utils import reference_spiral


def test_generate_reference_spiral() -> None:
"""Test the generation of a reference spiral."""
spiral = reference_spiral.generate_reference_spiral()
assert isinstance(spiral, np.ndarray)
assert spiral.shape == (10000, 2)
assert np.array_equal(spiral[0], [50, 50])
assert np.allclose(spiral[-1], [50 + 1.075 * 8 * np.pi, 50], atol=1e-8)

distances = np.linalg.norm(np.diff(spiral, axis=0), axis=1)
Comment thread
alperkent marked this conversation as resolved.
Outdated
assert np.allclose(distances, distances[0], atol=1e-4)
Loading