Skip to content

Pose distance metrics from Ham2Pose #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0c7bb56
CDL: copying code by @j22melody, as requested.
cleong110 Nov 18, 2024
db9056e
Merge branch 'main' into ham2pose_metrics
cleong110 Jan 6, 2025
e5f703f
CDL: add new req for fastdtw
cleong110 Jan 6, 2025
a7ca062
Start ndtw_mje, add common functions for pose preprocessing, etc
cleong110 Jan 6, 2025
2dee6ce
Edit the name of a test function to avoid potential collisions
cleong110 Jan 6, 2025
04472e1
Stubbed test file
cleong110 Jan 6, 2025
1f5767d
A bit of pylint cleanup
cleong110 Jan 6, 2025
07225cb
Preprocessing for poses, and some type annotations, and a bit of refa…
cleong110 Jan 7, 2025
e931966
adding tests for local pose_utils
cleong110 Jan 8, 2025
7bc5371
some gitignore updates
cleong110 Jan 8, 2025
4369938
Fixing a few type issues
cleong110 Jan 8, 2025
bdf5d73
adding test data
cleong110 Jan 9, 2025
99e27f9
remove instead of hide legs in pose_utils
cleong110 Jan 9, 2025
ec09e3c
Take out temp test code
cleong110 Jan 9, 2025
66361ca
fix forgetting to assign in preprocess_pose
cleong110 Jan 9, 2025
389abe2
some minor fixes in tests
cleong110 Jan 9, 2025
8344d54
euclidean, not l2
cleong110 Jan 9, 2025
e9e8cc1
Transitioning to pytest from unittest
cleong110 Jan 10, 2025
4334fb8
Trying to figure out pytest
cleong110 Jan 10, 2025
92ec8e0
Caught another L2
cleong110 Jan 15, 2025
251ad26
basic scoring script
cleong110 Jan 15, 2025
0449eba
implement ape_metric
cleong110 Jan 15, 2025
daedc1e
Very WIP, pushing code for the day
cleong110 Jan 15, 2025
72cec6d
Starting the move to PoseProcessors
cleong110 Jan 17, 2025
74e016e
adding in the set_masked_values_to_zero
cleong110 Jan 27, 2025
d2d1759
Pushing all changes as-is
cleong110 Jan 27, 2025
d8dd461
Cleaning u and implementing separate DTW and Distance Metrics
cleong110 Jan 27, 2025
3517a76
I can build the basic Ham2Pose metrics!
cleong110 Jan 28, 2025
688fcee
remove unused file
cleong110 Jan 30, 2025
57f6932
remove unused alignment_strategy
cleong110 Jan 30, 2025
a636406
Various pylint changes
cleong110 Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
.idea/
build/
pose_evaluation.egg-info/
**/__pycache__/
**/__pycache__/
.coverage
.vscode/
coverage.lcov
52 changes: 52 additions & 0 deletions pose_evaluation/evaluation/score_poses_with_all_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import argparse
from pathlib import Path
from pose_format import Pose

from pose_evaluation.metrics import ape_metric, distance_metric, ndtw_mje_metric
from pose_evaluation.utils.pose_utils import load_pose_file, preprocess_pose, get_component_names_and_points_dict

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Load files and run score_all")
parser.add_argument("pose_dir", type=Path, help="Path to the directory containing SignCLIP .npy files")

args = parser.parse_args()

pose_files = args.pose_dir.glob("*.pose")
poses = [load_pose_file(pose_file) for pose_file in pose_files]
print(f"Loaded {len(poses)} poses from {args.pose_dir}")
original_component_names, original_points_dict = get_component_names_and_points_dict(poses[0])

preprocessed_poses = [preprocess_pose(pose) for pose in poses]
preprocessed_component_names, preprocessed_points_dict = get_component_names_and_points_dict(preprocessed_poses[0])

print(len(original_component_names))
print(len(preprocessed_component_names))
preprocessed_poses_not_normalized = [preprocess_pose(pose, normalize_poses=False) for pose in poses]



only_reduced_poses = [pose.get_components(preprocessed_component_names, preprocessed_points_dict) for pose in poses]




# metric = ndtw_mje.DynamicTimeWarpingMeanJointErrorMetric()
print(f"Reduced poses to {len(only_reduced_poses[0].header.components)} components and {only_reduced_poses[0].header.total_points()} points")
print(only_reduced_poses[0].body.data.shape) # (93, 1, 560, 3) for example
print(only_reduced_poses[0].body.points_perspective().shape) # (560, 1, 93, 3)

for metric_class in distance_metric.DistanceMetric.__subclasses__():
metric = metric_class()
print(metric)

# metric = ape_metric.AveragePositionErrorMetric()


# print(metric.score_all(poses, preprocessed_poses))
# print(metric.score_all(only_reduced_poses, preprocessed_poses))
print(metric.score_all(preprocessed_poses, preprocessed_poses))
# print(metric.score_all(only_reduced_poses, only_reduced_poses))




2 changes: 1 addition & 1 deletion pose_evaluation/metrics/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
temp/
tests
32 changes: 32 additions & 0 deletions pose_evaluation/metrics/aggregate_distances_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Literal
from pose_evaluation.metrics.base import SignatureMixin
# from numpy.typing import ArrayLike
import numpy as np

AggregationStrategy = Literal["mean", "sum", "max"]

class DistancesAggregator(SignatureMixin):

# aggregation_strategy: AggregationStrategy

def __init__(self, aggregation_strategy:AggregationStrategy) -> None:
self.aggregation_strategy = f"{aggregation_strategy}"

def aggregate(self, distances):
if self.aggregation_strategy == "sum":
return np.sum(distances)
if self.aggregation_strategy == "mean":
return np.mean(distances)
if self.aggregation_strategy == "max":
return np.max(distances)

def __str__(self):
return f"{self.aggregation_strategy}"

def __repr__(self):
return f"{self.aggregation_strategy}"

def get_signature(self) -> str:
return f"{self}"


45 changes: 45 additions & 0 deletions pose_evaluation/metrics/alignment_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

class TrajectoryAligner(PoseProcessor):
def __init__(self, alignment_strategy:str) -> None:
self.name = alignment_strategy

def align(self, hyp_trajectory:np.ma.MaskedArray, ref_trajectory:np.ma.MaskedArray) -> Tuple[np.ma.MaskedArray,np.ma.MaskedArray]:
raise NotImplementedError


def get_signature(self) -> str:
return f"alignment_strategy:{self.name}"

class DTW_Aligner(TrajectoryAligner):
def __init__(self) -> None:
super().__init__(alignment_strategy="dynamic_time_warping")

def align(self, hyp_trajectory:np.ma.MaskedArray, ref_trajectory:np.ma.MaskedArray) -> Tuple[np.ma.MaskedArray,np.ma.MaskedArray]:
x = hyp_trajectory
y = ref_trajectory
_, path = fastdtw(x.data, y.data) # Use the raw data for DTW computation

# Initialize lists for aligned data and masks
aligned_x_data = []
aligned_y_data = []

aligned_x_mask = []
aligned_y_mask = []

# Loop through the DTW path
for xi, yi in path:
# Append aligned data
aligned_x_data.append(x.data[xi])
aligned_y_data.append(y.data[yi])

# Append aligned masks (directly use .mask)
aligned_x_mask.append(x.mask[xi])
aligned_y_mask.append(y.mask[yi])

# Create aligned masked arrays
aligned_x = np.ma.array(aligned_x_data, mask=aligned_x_mask)
aligned_y = np.ma.array(aligned_y_data, mask=aligned_y_mask)
return aligned_x, aligned_y



63 changes: 63 additions & 0 deletions pose_evaluation/metrics/ape_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import numpy as np
from pose_format import Pose
from pose_evaluation.metrics.distance_metric import DistanceMetric, ValidDistanceKinds




class AveragePositionErrorMetric(DistanceMetric):
def __init__(self,
spatial_distance_kind: ValidDistanceKinds = "euclidean",
normalize_poses: bool = True,
reduce_poses_to_common_points: bool = True,
zero_pad_shorter_sequence: bool = True,
remove_legs: bool = True,
remove_world_landmarks: bool = True,
conf_threshold_to_drop_points: None | int = None):
super().__init__(spatial_distance_kind, normalize_poses, reduce_poses_to_common_points, zero_pad_shorter_sequence, remove_legs, remove_world_landmarks, conf_threshold_to_drop_points)

# def score(self, hypothesis: Pose, reference: Pose) -> float:

# hyp_points = hypothesis.body.points_perspective()
# ref_points = reference.body.points_perspective()


# if hyp_points.shape[0] != ref_points.shape[0] or hyp_points.shape[-1] != ref_points.shape[-1]:
# raise ValueError(
# f"Shapes of hyp ({hyp_points.shape}) and ref ({ref_points.shape}) unequal. Not supported by {self.name}"
# )

# point_count = hyp_points.shape[0]
# total_error = 0
# for hyp_point_data, ref_point_data in zip(hyp_points, ref_points):
# # shape is people, frames, xyz
# # NOTE: assumes only one person! # TODO: pytest test checking this.
# assert hyp_point_data.shape[0] == 1, f"{self} metric expects only one person. Hyp shape given: {hyp_point_data.shape}"
# assert ref_point_data.shape[0] == 1, f"{self} metric expects only one person. Reference shape given: {ref_point_data.shape}"
# hyp_point_trajectory = hyp_point_data[0]
# ref_point_trajectory = ref_point_data[0]
# joint_trajectory_error = self.average_position_error(hyp_point_trajectory, ref_point_trajectory)
# total_error += joint_trajectory_error

# average_position_error = total_error/point_count
# return average_position_error

def trajectory_pair_distance_function(self, hyp_trajectory, ref_trajectory) -> float:
assert len(hyp_trajectory) == len(ref_trajectory)
return self.average_position_error(hyp_trajectory, ref_trajectory)

def average_position_error(self, trajectory1, trajectory2):
# point_coordinate_count = trajectory1.shape[-1]
# if len(trajectory1) < len(trajectory2):
# diff = len(trajectory2) - len(trajectory1)
# trajectory1 = np.concatenate((trajectory1, np.zeros((diff, point_coordinate_count))))
# elif len(trajectory2) < len(trajectory1):
# trajectory2 = np.concatenate((trajectory2, np.zeros((len(trajectory1) - len(trajectory2), point_coordinate_count))))
pose1_mask = np.ma.getmask(trajectory1)
pose2_mask = np.ma.getmask(trajectory2)
trajectory1[pose1_mask] = 0
trajectory1[pose2_mask] = 0
trajectory2[pose1_mask] = 0
trajectory2[pose2_mask] = 0
sq_error = np.power(trajectory1 - trajectory2, 2).sum(-1)
return np.sqrt(sq_error).mean()
55 changes: 55 additions & 0 deletions pose_evaluation/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,59 @@
# pylint: disable=undefined-variable
from tqdm import tqdm
from typing import Any

class MetricSignature:
"""Represents reproducibility signatures for metrics. Inspired by sacreBLEU
"""
def __init__(self, args: dict):

self._abbreviated = {
"name":"n",
"higher_is_better":"hb"
}

self.signature_info = {
"name": args.get("name", None),
"higher_is_better": args.get("higher_is_better", None)
}

def update(self, key: str, value: Any):
self.signature_info[key] = value

def format(self, short: bool = False) -> str:
pairs = []
keys = list(self.signature_info.keys())
for name in keys:
value = self.signature_info[name]
if value is not None:
# Check for nested signature objects
if hasattr(value, "get_signature"):
# Wrap nested signatures in brackets
nested_signature = value.get_signature().format(short=short)
value = f"{{{nested_signature}}}"
if isinstance(value, bool):
# Replace True/False with yes/no
value = "yes" if value else "no"
final_name = self._abbreviated[name] if short else name
pairs.append(f"{final_name}:{value}")

return "|".join(pairs)

def __str__(self):
return self.format()

def __repr__(self):
return self.format()


class SignatureMixin:
def get_signature(self) -> str:
raise NotImplementedError("Components must implement `get_signature`.")

class BaseMetric[T]:
"""Base class for all metrics."""
# Each metric should define its Signature class' name here
_SIGNATURE_TYPE = MetricSignature

def __init__(self, name: str, higher_is_better: bool = True):
self.name = name
Expand Down Expand Up @@ -38,3 +88,8 @@ def score_all(self, hypotheses: list[T], references: list[T], progress_bar=True)

def __str__(self):
return self.name

def get_signature(self) -> MetricSignature:
return self._SIGNATURE_TYPE(self.__dict__)


77 changes: 75 additions & 2 deletions pose_evaluation/metrics/base_pose_metric.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,78 @@
from pose_format import Pose
from typing import Literal, Iterable, Tuple, List, cast, get_args, Callable, Union, TYPE_CHECKING
from pose_evaluation.metrics.pose_processors import PoseProcessor, NormalizePosesProcessor, RemoveLegsPosesProcessor, HideLowConfProcessor, ZeroPadShorterPosesProcessor, ReducePosesToCommonComponentsProcessor, RemoveWorldLandmarksProcessor, RemoveComponentsProcessor

from pose_evaluation.metrics.base import BaseMetric, MetricSignature

MismatchedComponentsStrategy = Literal["reduce"]
class PoseMetricSignature(MetricSignature):

def __init__(self, args: dict):
super().__init__(args)

self._abbreviated.update(
{
"pose_preprocessers":"pre"
}
)


pose_preprocessors = args.get('pose_preprocessers', None)
prep_string = ""
if pose_preprocessors is not None:
prep_string = "{" + "|".join([f"{prep}" for prep in pose_preprocessors]) + "}"


self.signature_info.update(
{
'pose_preprocessers': prep_string if pose_preprocessors else None
}
)




class PoseMetric(BaseMetric[Pose]):

_SIGNATURE_TYPE = PoseMetricSignature

def __init__(self, name: str="PoseMetric", higher_is_better: bool = True,
pose_preprocessors: Union[None, List[PoseProcessor]] = None,
normalize_poses = True,
reduce_poses_to_common_points = True,
remove_legs = True,
remove_world_landmarks = True,
):

super().__init__(name, higher_is_better)
if pose_preprocessors is None:
self.pose_preprocessers = [
]
else:
self.pose_preprocessers = pose_preprocessors



if normalize_poses:
self.pose_preprocessers.append(NormalizePosesProcessor())
if reduce_poses_to_common_points:
self.pose_preprocessers.append(ReducePosesToCommonComponentsProcessor())
if remove_legs:
self.pose_preprocessers.append(RemoveLegsPosesProcessor())
if remove_world_landmarks:
self.pose_preprocessers.append(RemoveWorldLandmarksProcessor())



def score(self, hypothesis: Pose, reference: Pose) -> float:
hypothesis, reference = self.preprocess_poses([hypothesis, reference])
return self.score(hypothesis, reference)

def preprocess_poses(self, poses:List[Pose])->List[Pose]:
for preprocessor in self.pose_preprocessers:
preprocessor = cast(PoseProcessor, preprocessor)
poses = preprocessor.process_poses(poses)
return poses
# self.set_coordinate_point_distance_function(coordinate_point_distance_kind)

from pose_evaluation.metrics.base import BaseMetric

PoseMetric = BaseMetric[Pose]
Loading
Loading