Skip to content

[ENH] Add Swale Scoring Model #1365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions aeon/distances/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
"shape_dtw_pairwise_distance",
"sbd_distance",
"sbd_pairwise_distance",
"swale_distance",
"swale_pairwise_distance",
"mp_distance",
"mp_pairwise_distance",
"mindist_paa_sax_distance",
Expand Down Expand Up @@ -136,6 +138,8 @@
soft_dtw_cost_matrix,
soft_dtw_distance,
soft_dtw_pairwise_distance,
swale_distance,
swale_pairwise_distance,
twe_alignment_path,
twe_cost_matrix,
twe_distance,
Expand Down
12 changes: 12 additions & 0 deletions aeon/distances/_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
soft_dtw_cost_matrix,
soft_dtw_distance,
soft_dtw_pairwise_distance,
swale_distance,
swale_pairwise_distance,
twe_alignment_path,
twe_cost_matrix,
twe_distance,
Expand Down Expand Up @@ -462,6 +464,7 @@ def get_distance_function(method: Union[str, DistanceFunction]) -> DistanceFunct
'manhattan' distances.manhattan_distance
'minkowski' distances.minkowski_distance
'sbd' distances.sbd_distance
'swale distances.swale_distance
'shift_scale' distances.shift_scale_invariant_distance
'soft_dtw' distances.soft_dtw_distance
=============== ========================================
Expand Down Expand Up @@ -521,6 +524,7 @@ def get_pairwise_distance_function(
'manhattan' distances.manhattan_pairwise_distance
'minkowski' distances.minkowski_pairwise_distance
'sbd' distances.sbd_pairwise_distance
'swale' disrances.swale_pairwise_distance
'shift_scale' distances.shift_scale_invariant_pairwise_distance
'soft_dtw' distances.soft_dtw_pairwise_distance
=============== ========================================
Expand Down Expand Up @@ -894,6 +898,14 @@ class DistanceType(Enum):
"symmetric": True,
"unequal_support": True,
},
{
"name": "swale",
"distance": swale_distance,
"pairwise_distance": swale_pairwise_distance,
"type": DistanceType.ELASTIC,
"symmetric": True,
"unequal_support": True,
},
]

DISTANCES_DICT = {d["name"]: d for d in DISTANCES}
Expand Down
3 changes: 3 additions & 0 deletions aeon/distances/elastic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
"soft_dtw_pairwise_distance",
"soft_dtw_alignment_path",
"soft_dtw_cost_matrix",
"swale_distance",
"swale_pairwise_distance",
]

from aeon.distances.elastic._adtw import (
Expand Down Expand Up @@ -107,6 +109,7 @@
soft_dtw_distance,
soft_dtw_pairwise_distance,
)
from aeon.distances.elastic._swale import swale_distance, swale_pairwise_distance
from aeon.distances.elastic._twe import (
twe_alignment_path,
twe_cost_matrix,
Expand Down
202 changes: 202 additions & 0 deletions aeon/distances/elastic/_swale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
__maintainer__ = []

import numpy as np
from numba import njit

from aeon.distances._utils import reshape_pairwise_to_multiple


@njit(cache=True, fastmath=True)
def swale_distance(
x: np.ndarray,
y: np.ndarray,
gapc: float = 1.0,
rewardm: float = 1.0,
epsilon: float = 1.0,
) -> float:
"""
Calculate the Swale distance between two time series.

Parameters
----------
x : numpy.ndarray
First time series.
y : numpy.ndarray
Second time series.
gapc : float
Gap cost.
rewardm : float
Reward for match.
epsilon : float
Epsilon value.

Returns
-------
float
The Swale distance between the time series.

Raises
------
ValueError
If x and y have different dimensions.

Examples
--------
>>> time_series1 = np.array([1, 2, 3, 4, 5, 6, 7])
>>> time_series2 = np.array([1, 2, 3, 4])
>>> print(swale_distance(time_series1, time_series2, 1, 2, 1))
11
"""
if x.ndim == 1 and y.ndim == 1:
return _univariate_swale_distance(x, y, gapc, rewardm, epsilon)
if x.ndim == 2 and y.ndim == 2:
return _swale_distance(x, y, gapc, rewardm, epsilon)
raise ValueError("x and y must be 1D or 2D")


@njit(cache=True, fastmath=True)
def _swale_distance(
x: np.ndarray,
y: np.ndarray,
gapc: float = 1.0,
rewardm: float = 1.0,
epsilon: float = 1.0,
) -> float:
n_series1 = x.shape[0]
n_series2 = y.shape[0]

distance = 0.0
min_val = min(n_series1, n_series2)
for i in range(min_val):
distance += _univariate_swale_distance(x[i], y[i], gapc, rewardm, epsilon)

for j in range(min_val, max(n_series1, n_series2)):
if n_series1 > n_series2:
distance += _univariate_swale_distance(
x[j], np.array((), dtype=np.float64), gapc, rewardm, epsilon
)
else:
distance += _univariate_swale_distance(
np.array((), dtype=np.float64), y[j], gapc, rewardm, epsilon
)

return float(distance)


@njit(cache=True, fastmath=True)
def _univariate_swale_distance(
x: np.ndarray,
y: np.ndarray,
gapc: float = 1.0,
rewardm: float = 1.0,
epsilon: float = 1.0,
) -> float:
m = float(len(x))
n = float(len(y))

if m == 0:
return n * gapc
elif n == 0:
return m * gapc
elif abs(x[0] - y[0]) <= epsilon:
return rewardm + _univariate_swale_distance(
x[1:], y[1:], gapc=gapc, rewardm=rewardm, epsilon=epsilon
)
else:
option1 = gapc + _univariate_swale_distance(
x[1:], y, gapc=gapc, rewardm=rewardm, epsilon=epsilon
)
option2 = gapc + _univariate_swale_distance(
x, y[1:], gapc=gapc, rewardm=rewardm, epsilon=epsilon
)
return max(option1, option2)


@njit(cache=True, fastmath=True)
def _swale_pairwise_distance(
x: np.ndarray, gapc: float = 1.0, rewardm: float = 1.0, epsilon: float = 1.0
) -> np.ndarray:
n_cases = x.shape[0]
distances = np.zeros((n_cases, n_cases))

for i in range(n_cases):
for j in range(i + 1, n_cases):
distances[i, j] = swale_distance(
x[i], x[j], gapc=gapc, rewardm=rewardm, epsilon=epsilon
)
distances[j, i] = distances[i, j]

return distances


@njit(cache=True, fastmath=True)
def swale_pairwise_distance(
x: np.ndarray,
y: np.ndarray,
gapc: float = 1.0,
rewardm: float = 1.0,
epsilon: float = 1.0,
) -> np.ndarray:
"""
Calculate pairwise Swale distances between two sets of time series.

Parameters
----------
x : numpy.ndarray
First set of time series.
y : numpy.ndarray
Second set of time series.
gapc : float
Gap cost.
rewardm : float
Reward for match.
epsilon : float
Epsilon value.

Returns
-------
numpy.ndarray
Pairwise Swale distances between the two sets of time series.

Examples
--------
>>> X = np.array([[[1, 2, 3]], [[4, 5, 6]], [[7, 8, 9]]])
>>> y_univariate = np.array([[11, 12, 13], [14, 15, 16], [17, 18, 19]])
>>> print(swale_pairwise_distance(X, y_univariate, 1, 2, 1))
[[12.]
[12.]
[12.]]
>>> print(swale_pairwise_distance(X, None, 1, 2, 1))
[[0. 6. 6.]
[6. 0. 6.]
[6. 6. 0.]]
"""
if y is None:
if x.ndim == 3:
return _swale_pairwise_distance(x, gapc, rewardm, epsilon)
elif x.ndim == 2:
_x = x.reshape((x.shape[0], 1, x.shape[1]))
return _swale_pairwise_distance(_x, gapc, rewardm, epsilon)
raise ValueError("X must be 2D or 3D array")

_x, _y = reshape_pairwise_to_multiple(x, y)
return _swale_from_multiple_to_multiple_distance(_x, _y, gapc, rewardm, epsilon)


@njit(cache=True, fastmath=True)
def _swale_from_multiple_to_multiple_distance(
x: np.ndarray,
y: np.ndarray,
gapc: float = 1.0,
rewardm: float = 1.0,
epsilon: float = 1.0,
) -> np.ndarray:
n_cases = x.shape[0]
m_cases = y.shape[0]
distances = np.zeros((n_cases, m_cases))

for i in range(n_cases):
for j in range(m_cases):
distances[i, j] = swale_distance(x[i], y[j], gapc, rewardm, epsilon)

return distances
8 changes: 8 additions & 0 deletions aeon/distances/elastic/tests/test_distance_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
twe_distance,
wddtw_distance,
wdtw_distance,
swale_distance,
)

distances = [
Expand All @@ -32,6 +33,7 @@
"ddtw",
"wddtw",
"twe",
"swale",
]

distance_parameters = {
Expand All @@ -44,6 +46,7 @@
"ddtw": [0.0, 0.1, 1.0], # window
"twe": [0.0, 0.1, 1.0], # window
"msm": [0.0, 0.2, 3.0], # parameter c
"swale": [1, 1, 1],
}
unit_test_distances = {
"euclidean": 619.7959,
Expand All @@ -58,6 +61,7 @@
"twe": [4536.0, 3192.0220, 3030.036000000001],
"msm_ind": [1515.0, 1517.8000000000004, 1557.0], # msm with independent distance
"msm_dep": [1897.0, 1898.6000000000001, 1921.0], # msm with dependent distance
"swale": [2.0, 15.0, 129.0, 12.0, 114.0],
}
basic_motions_distances = {
"euclidean": 27.51835240,
Expand Down Expand Up @@ -173,3 +177,7 @@ def test_univariate_correctness():
)
assert_almost_equal(d, unit_test_distances["msm_dep"][j], 4)
assert d == d2
d = swale_distance(cases1[0], cases2[0], c=distance_parameters["swale"][j])
d2 = swale_distance(cases1[1], cases2[1], c=distance_parameters["swale"][j])
assert_almost_equal(d, unit_test_distances["msm_ind"][j], 4)
assert d == d2
Loading