From 63ad217f832d8a0d56e78f20ec637f66854622ec Mon Sep 17 00:00:00 2001 From: cleong110 Date: Fri, 8 Mar 2024 11:42:08 -0500 Subject: [PATCH 01/13] CDL: minor doc typo fix --- docs/specs/v0.1.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/specs/v0.1.md b/docs/specs/v0.1.md index b3ea36b..32b2be1 100644 --- a/docs/specs/v0.1.md +++ b/docs/specs/v0.1.md @@ -24,10 +24,16 @@ \[`unsigned short` Green] \[`unsigned short` Blue] + +[Testing comment syntax from https://stackoverflow.com/a/32190021]: # + # Body -\[`unsined short` FPS] -\[`unsined short` Number of frames] # THIS IS A PROBLEM -\[`unsined short` Number of people] +\[`unsigned short` FPS] + + +[THIS IS A PROBLEM]: # +\[`unsigned short` Number of frames] +\[`unsigned short` Number of people] ## For every frame #### For every person: From 6d18ed642f883ab128e783915193f27900eef975 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:39:31 -0500 Subject: [PATCH 02/13] Undoing some changes that got mixed in --- docs/specs/v0.1.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/specs/v0.1.md b/docs/specs/v0.1.md index a75b0a2..2ee3aaa 100644 --- a/docs/specs/v0.1.md +++ b/docs/specs/v0.1.md @@ -26,12 +26,9 @@ # Body -\[`unsigned short` FPS] - - -[THIS IS A PROBLEM]: # -\[`unsigned short` Number of frames] -\[`unsigned short` Number of people] +\[`unsined short` FPS] +\[`unsined short` Number of frames] # THIS IS A PROBLEM +\[`unsined short` Number of people] ## For every frame #### For every person: From 28341e3958a703641540ea21b6d1289db8af77b9 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Tue, 11 Feb 2025 12:14:48 -0500 Subject: [PATCH 03/13] deepcopy for header and components also --- src/python/pose_format/pose.py | 2 +- src/python/pose_format/pose_header.py | 21 +++++++++++++++++++-- src/python/tests/pose_test.py | 16 ++++++++++++---- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/python/pose_format/pose.py b/src/python/pose_format/pose.py index 5031f64..d0480f4 100644 --- a/src/python/pose_format/pose.py +++ b/src/python/pose_format/pose.py @@ -278,7 +278,7 @@ def get_components(self, components: List[str], points: Union[Dict[str, List[str def copy(self): - return self.__class__(self.header, self.body.copy()) + return self.__class__(self.header.copy(), self.body.copy()) def bbox(self): """ diff --git a/src/python/pose_format/pose_header.py b/src/python/pose_format/pose_header.py index 225668c..dde6f60 100644 --- a/src/python/pose_format/pose_header.py +++ b/src/python/pose_format/pose_header.py @@ -65,8 +65,15 @@ def __init__(self, name: str, points: List[str], limbs: List[Tuple[int, int]], c self.relative_limbs = self.get_relative_limbs() + def copy(self) -> 'PoseHeaderComponent': + return PoseHeaderComponent(name = self.name, + points = self.points, + limbs= self.limbs, + colors=self.colors, + point_format = self.format) + @staticmethod - def read(version: float, reader: BufferReader): + def read(version: float, reader: BufferReader) -> 'PoseHeaderComponent': """ Reads pose header dimensions from reader (BufferReader). @@ -182,8 +189,11 @@ def __init__(self, width: int, height: int, depth: int = 0, *args): self.height = math.ceil(height) self.depth = math.ceil(depth) + def copy(self) -> 'PoseHeaderDimensions': + return self.__class__(self.width, self.height, self.depth) + @staticmethod - def read(version: float, reader: BufferReader): + def read(version: float, reader: BufferReader) -> 'PoseHeaderDimensions': """ Reads and returns a PoseHeaderDimensions object from a buffer reader. @@ -293,6 +303,13 @@ def __init__(self, self.components = components self.is_bbox = is_bbox + def copy(self) -> 'PoseHeader': + return PoseHeader(version=self.version, + dimensions=self.dimensions.copy(), + components=[c.copy() for c in self.components], + is_bbox=self.is_bbox + ) + @staticmethod def read(reader: BufferReader) -> 'PoseHeader': """ diff --git a/src/python/tests/pose_test.py b/src/python/tests/pose_test.py index 897a0dc..55419c1 100644 --- a/src/python/tests/pose_test.py +++ b/src/python/tests/pose_test.py @@ -479,7 +479,7 @@ def create_pose_and_frame_dropout_uniform(example: tf.Tensor) -> tf.Tensor: def test_pose_tf_posebody_copy_creates_deepcopy(self): pose = _get_random_pose_object_with_tf_posebody(num_keypoints=5) - self.assertIsInstance(pose.body, TensorflowPoseBody) + self.assertIsInstance(pose.body, TensorflowPoseBody) self.assertIsInstance(pose.body.data, TensorflowMaskedTensor) pose_copy = pose.copy() @@ -488,7 +488,9 @@ def test_pose_tf_posebody_copy_creates_deepcopy(self): # Check that pose and pose_copy are not the same object self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") - + self.assertNotEqual(pose.header, pose_copy.header, "headers should be new objects as well") + self.assertNotEqual(pose.header.components, pose_copy.header.components, "components should be new objects as well") + # Ensure the data tensors are equal but independent self.assertTrue(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should match original") @@ -499,6 +501,9 @@ def test_pose_tf_posebody_copy_creates_deepcopy(self): # Create another copy and ensure it matches the first copy pose = pose_copy.copy() + self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + self.assertNotEqual(pose.header, pose_copy.header, "headers should be new objects as well") + self.assertNotEqual(pose.header.components, pose_copy.header.components, "Components should be new objects as well") self.assertTrue(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should match original again") @@ -560,8 +565,9 @@ def test_pose_numpy_posebody_copy_creates_deepcopy(self): pose = _get_random_pose_object_with_numpy_posebody(num_keypoints=5, frames_min=3) pose_copy = pose.copy() - self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + self.assertNotEqual(pose.header, pose_copy.header, "headers should be new objects as well") + self.assertNotEqual(pose.header.components, pose_copy.header.components, "components should be new objects as well") self.assertTrue(np.array_equal(pose.body.data, pose_copy.body.data), "Copy's data should match original") @@ -599,7 +605,9 @@ def test_pose_torch_posebody_copy_creates_deepcopy(self): self.assertIsInstance(pose_copy.body, TorchPoseBody) self.assertIsInstance(pose_copy.body.data, TorchMaskedTensor) - self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + self.assertNotEqual(pose.header, pose_copy.header, "headers should be new objects as well") + self.assertNotEqual(pose.header.components, pose_copy.header.components, "components should be new objects as well") self.assertTrue(pose.body.data.tensor.equal(pose_copy.body.data.tensor), "Copy's data should match original") self.assertTrue(pose.body.data.mask.equal(pose_copy.body.data.mask), "Copy's mask should match original") From 11fe8eb4b421067cd358c9a174dc6d1f22787a2c Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Thu, 13 Feb 2025 16:14:18 -0500 Subject: [PATCH 04/13] pose_remove_legs utility for removing the leg points from mediapipe and openpose --- src/python/pose_format/utils/generic.py | 64 +++++++++++++++--- src/python/pose_format/utils/generic_test.py | 70 +++++++++++++++----- 2 files changed, 109 insertions(+), 25 deletions(-) diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index 940d43a..7da8630 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -1,8 +1,7 @@ -from pathlib import Path from typing import Tuple, Literal, List, Union import copy import numpy as np -from numpy import ma +import numpy.ma as ma from pose_format.pose import Pose from pose_format.numpy import NumPyPoseBody from pose_format.pose_header import PoseHeader, PoseHeaderDimensions, PoseHeaderComponent, PoseNormalizationInfo @@ -62,6 +61,43 @@ def normalize_pose_size(pose: Pose, target_width: int = 512): pose.header.dimensions.height = pose.header.dimensions.width = target_width +def pose_remove_legs(pose: Pose) ->Pose: + """Remove (not _hide_) legs. Also does the hip + """ + known_pose_format = detect_known_pose_format(pose) + + if known_pose_format == "holistic": + mediapipe_point_names = ["KNEE", "ANKLE", "HEEL", "HIP", "FOOT_INDEX"] + mediapipe_sides = ["LEFT", "RIGHT"] + point_names_to_remove = [ + side + "_" + name + for name in mediapipe_point_names + for side in mediapipe_sides + ] + points_to_remove_dict ={ + "POSE_LANDMARKS": point_names_to_remove, + "POSE_WORLD_LANDMARKS": point_names_to_remove, + } + + elif known_pose_format == 'openpose': + openpose_point_names = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] + openpose_sides = ["L", "R", "Mid"] + point_names_to_remove = [ + side + name + for name in openpose_point_names + for side in openpose_sides + ] + points_to_remove_dict = { + "pose_keypoints_2d": point_names_to_remove + } + else: + raise NotImplementedError( + f"Unsupported pose header schema {known_pose_format} for {pose_remove_legs.__name__}: {pose.header}" + ) + + pose = pose.remove_components([], points_to_remove_dict) + return pose + def pose_hide_legs(pose: Pose): known_pose_format = detect_known_pose_format(pose) if known_pose_format == "holistic": @@ -109,13 +145,17 @@ def hands_indexes(pose_header: PoseHeader)-> List[int]: known_pose_format = detect_known_pose_format(pose_header) if known_pose_format == "holistic": return [ + # pylint: disable=protected-access pose_header._get_point_index("LEFT_HAND_LANDMARKS", "MIDDLE_FINGER_MCP"), + # pylint: disable=protected-access pose_header._get_point_index("RIGHT_HAND_LANDMARKS", "MIDDLE_FINGER_MCP"), ] if known_pose_format == "openpose": return [ + # pylint: disable=protected-access pose_header._get_point_index("hand_left_keypoints_2d", "M_CMC"), + # pylint: disable=protected-access pose_header._get_point_index("hand_right_keypoints_2d", "M_CMC"), ] raise NotImplementedError( @@ -148,12 +188,12 @@ def hands_components(pose_header: PoseHeader)-> Tuple[Tuple[str, str], Tuple[str def normalize_component_3d(pose, component_name: str, plane: Tuple[str, str, str], line: Tuple[str, str]): hand_pose = pose.get_components([component_name]) plane_info = hand_pose.header.normalization_info( - p1=(component_name, plane[0]), - p2=(component_name, plane[1]), + p1=(component_name, plane[0]), + p2=(component_name, plane[1]), p3=(component_name, plane[2]) ) line_info = hand_pose.header.normalization_info( - p1=(component_name, line[0]), + p1=(component_name, line[0]), p2=(component_name, line[1]) ) @@ -176,10 +216,11 @@ def normalize_hands_3d(pose: Pose, left_hand=True, right_hand=True): def get_standard_components_for_known_format(known_pose_format: KnownPoseFormat) -> List[PoseHeaderComponent]: if known_pose_format == "holistic": try: + # pylint: disable=import-outside-toplevel import pose_format.utils.holistic as holistic_utils return holistic_utils.holistic_components() except ImportError as e: - raise e + raise e if known_pose_format == "openpose": return OpenPose_Components if known_pose_format == "openpose_135": @@ -191,7 +232,7 @@ def get_standard_components_for_known_format(known_pose_format: KnownPoseFormat) def fake_pose(num_frames: int, fps: int=25, components: Union[List[PoseHeaderComponent],None]=None)->Pose: if components is None: components = copy.deepcopy(OpenPose_Components) # fixes W0102, dangerous default value - + if components[0].format == "XYZC": dimensions = PoseHeaderDimensions(width=1, height=1, depth=1) elif components[0].format == "XYC": @@ -204,7 +245,6 @@ def fake_pose(num_frames: int, fps: int=25, components: Union[List[PoseHeaderCom data = np.random.randn(num_frames, 1, total_points, header.num_dims()) confidence = np.random.randn(num_frames, 1, total_points) masked_data = ma.masked_array(data) - body = NumPyPoseBody(fps=int(fps), data=masked_data, confidence=confidence) @@ -214,8 +254,10 @@ def fake_pose(num_frames: int, fps: int=25, components: Union[List[PoseHeaderCom def get_hand_wrist_index(pose: Pose, hand: str)-> int: known_pose_format = detect_known_pose_format(pose) if known_pose_format == "holistic": + # pylint: disable=protected-access return pose.header._get_point_index(f"{hand.upper()}_HAND_LANDMARKS", "WRIST") if known_pose_format == "openpose": + # pylint: disable=protected-access return pose.header._get_point_index(f"hand_{hand.lower()}_keypoints_2d", "BASE") raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {get_hand_wrist_index.__name__}: {pose.header}" @@ -225,8 +267,10 @@ def get_hand_wrist_index(pose: Pose, hand: str)-> int: def get_body_hand_wrist_index(pose: Pose, hand: str)-> int: known_pose_format = detect_known_pose_format(pose) if known_pose_format == "holistic": + # pylint: disable=protected-access return pose.header._get_point_index("POSE_LANDMARKS", f"{hand.upper()}_WRIST") if known_pose_format == "openpose": + # pylint: disable=protected-access return pose.header._get_point_index("pose_keypoints_2d", f"{hand.upper()[0]}Wrist") raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {get_body_hand_wrist_index.__name__}: {pose.header}" @@ -244,7 +288,7 @@ def correct_wrist(pose: Pose, hand: str) -> Pose: body_wrist_conf = pose.body.confidence[:, :, body_wrist_index] point_coordinate_count = wrist.shape[-1] - stacked_conf = np.stack([wrist_conf] * point_coordinate_count, axis=-1) + stacked_conf = np.stack([wrist_conf] * point_coordinate_count, axis=-1) new_wrist_data = ma.where(stacked_conf == 0, body_wrist, wrist) new_wrist_conf = ma.where(wrist_conf == 0, body_wrist_conf, wrist_conf) @@ -263,7 +307,7 @@ def reduce_holistic(pose: Pose) -> Pose: known_pose_format = detect_known_pose_format(pose) if known_pose_format != "holistic": return pose - + # pylint: disable=pointless-string-statement """ # from mediapipe.python.solutions.face_mesh_connections import FACEMESH_CONTOURS # points_set = set([p for p_tup in list(FACEMESH_CONTOURS) for p in p_tup]) diff --git a/src/python/pose_format/utils/generic_test.py b/src/python/pose_format/utils/generic_test.py index 461f55e..f396afb 100644 --- a/src/python/pose_format/utils/generic_test.py +++ b/src/python/pose_format/utils/generic_test.py @@ -10,6 +10,7 @@ get_standard_components_for_known_format, KnownPoseFormat, pose_hide_legs, + pose_remove_legs, pose_shoulders, hands_indexes, normalize_pose_size, @@ -60,7 +61,6 @@ def test_get_component_names(fake_poses: List[Pose], known_pose_format: KnownPos @pytest.mark.parametrize("fake_poses", list(get_args(KnownPoseFormat)), indirect=["fake_poses"]) def test_pose_hide_legs(fake_poses: List[Pose]): for pose in fake_poses: - orig_nonzeros_count = np.count_nonzero(pose.body.data) detected_format = detect_known_pose_format(pose) @@ -120,12 +120,12 @@ def test_get_hand_wrist_index(fake_poses: List[Pose]): detected_format = detect_known_pose_format(pose) for hand in ["LEFT", "RIGHT"]: if detected_format == "openpose_135": - with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): - index = get_hand_wrist_index(pose, hand) + with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): + _ = get_hand_wrist_index(pose, hand) else: - index = get_hand_wrist_index(pose, hand) + _ = get_hand_wrist_index(pose, hand) - # TODO: what are the expected values? + # TODO: what are the expected values? @pytest.mark.parametrize("fake_poses", TEST_POSE_FORMATS, indirect=["fake_poses"]) @@ -135,10 +135,10 @@ def test_get_body_hand_wrist_index(fake_poses: List[Pose]): detected_format = detect_known_pose_format(pose) if detected_format == "openpose_135": with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): - index = get_body_hand_wrist_index(pose, hand) - # TODO: what are the expected values? - else: - index = get_body_hand_wrist_index(pose, hand) + _ = get_body_hand_wrist_index(pose, hand) + # TODO: what are the expected values? + else: + _ = get_body_hand_wrist_index(pose, hand) @@ -153,7 +153,7 @@ def test_correct_wrists(fake_poses: List[Pose]): else: corrected_pose = correct_wrists(pose) assert corrected_pose != pose - assert np.array_equal(corrected_pose.body.data, pose.body.data) is False + assert np.array_equal(corrected_pose.body.data, pose.body.data) is False @pytest.mark.parametrize("fake_poses", ["holistic"], indirect=["fake_poses"]) def test_remove_one_point_and_one_component(fake_poses: List[Pose]): @@ -182,7 +182,50 @@ def test_remove_one_point_and_one_component(fake_poses: List[Pose]): assert component_to_drop not in new_component_names - assert point_to_drop not in new_points_dict["POSE_LANDMARKS"] + assert point_to_drop not in new_points_dict["POSE_LANDMARKS"] + +@pytest.mark.parametrize("fake_poses", TEST_POSE_FORMATS, indirect=["fake_poses"]) +def test_pose_remove_legs(fake_poses: List[Pose]): + for pose in fake_poses: + known_pose_format = detect_known_pose_format(pose) + if known_pose_format == "holistic": + points_that_should_be_removed = ["LEFT_KNEE", "LEFT_HEEL", "LEFT_FOOT", "LEFT_TOE", "LEFT_FOOT_INDEX", + "RIGHT_KNEE", "RIGHT_HEEL", "RIGHT_FOOT", "RIGHT_TOE", "RIGHT_FOOT_INDEX",] + c_names = [c.name for c in pose.header.components] + assert "POSE_LANDMARKS" in c_names + pose_landmarks_index = c_names.index("POSE_LANDMARKS") + assert "LEFT_KNEE" in pose.header.components[pose_landmarks_index].points + + + pose_with_legs_removed = pose_remove_legs(pose) + assert pose_with_legs_removed != pose + new_c_names = [c.name for c in pose_with_legs_removed.header.components] + assert "POSE_LANDMARKS" in new_c_names + + for component in pose_with_legs_removed.header.components: + point_names = [point.upper() for point in component.points] + for point_name in point_names: + for point_that_should_be_hidden in points_that_should_be_removed: + assert point_that_should_be_hidden not in point_name, f"{component.name}: {point_names}" + + elif known_pose_format == "openpose": + c_names = [c.name for c in pose.header.components] + points_that_should_be_removed = ['LHip', 'RHip', 'MidHip', + 'LKnee', 'RKnee', + 'LAnkle', 'RAnkle', + 'LBigToe', 'RBigToe', + 'LSmallToe', 'RSmallToe', + 'LHeel', 'RHeel'] + component_index = c_names.index("pose_keypoints_2d") + pose_with_legs_removed = pose_remove_legs(pose) + + for point_name in points_that_should_be_removed: + assert point_name not in pose_with_legs_removed.header.components[component_index].points + assert point_name in pose.header.components[component_index].points + else: + with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): + pose_remove_legs(pose) + @pytest.mark.parametrize("fake_poses", TEST_POSE_FORMATS, indirect=["fake_poses"]) @@ -204,7 +247,7 @@ def test_fake_pose(known_pose_format: KnownPoseFormat): for frame_count in [1, 10, 100]: for fps in [1, 15, 25, 100]: standard_components = get_standard_components_for_known_format(known_pose_format) - + pose = fake_pose(frame_count, fps=fps, components=standard_components) point_formats = [c.format for c in pose.header.components] data_dimension_expected = 0 @@ -215,7 +258,6 @@ def test_fake_pose(known_pose_format: KnownPoseFormat): assert point_format == point_formats[0] data_dimension_expected = len(point_formats[0]) - 1 - detected_format = detect_known_pose_format(pose) @@ -231,5 +273,3 @@ def test_fake_pose(known_pose_format: KnownPoseFormat): assert pose.body.data.shape == (frame_count, 1, pose.header.total_points(), data_dimension_expected) assert pose.body.data.shape[0] == frame_count assert pose.header.num_dims() == pose.body.data.shape[-1] - - poses = [fake_pose(25) for _ in range(5)] From 3e30e4d9fc1fec0766098da2485d5b541d3125a7 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 09:25:33 -0500 Subject: [PATCH 05/13] PR requested changes: removed copy() functions, added get_index --- src/python/pose_format/pose.py | 2 +- src/python/pose_format/pose_header.py | 31 ++++++------------- src/python/pose_format/utils/generic.py | 30 ++++++------------ .../pose_format/utils/pose_converter.py | 4 +-- src/python/tests/pose_test.py | 4 --- 5 files changed, 22 insertions(+), 49 deletions(-) diff --git a/src/python/pose_format/pose.py b/src/python/pose_format/pose.py index 45fd723..084c5fd 100644 --- a/src/python/pose_format/pose.py +++ b/src/python/pose_format/pose.py @@ -278,7 +278,7 @@ def get_components(self, components: List[str], points: Union[Dict[str, List[str def copy(self): - return self.__class__(self.header.copy(), self.body.copy()) + return self.__class__(self.header, self.body.copy()) def bbox(self): """ diff --git a/src/python/pose_format/pose_header.py b/src/python/pose_format/pose_header.py index dde6f60..8b31fbb 100644 --- a/src/python/pose_format/pose_header.py +++ b/src/python/pose_format/pose_header.py @@ -1,7 +1,7 @@ import hashlib import math import struct -from typing import BinaryIO, List, Tuple +from typing import BinaryIO, List, Tuple, Optional, Union from .utils.reader import BufferReader, ConstStructs @@ -21,7 +21,7 @@ class PoseNormalizationInfo: Third pose value. Defaults to None. """ - def __init__(self, p1: int, p2: int, p3: int = None): + def __init__(self, p1: int, p2: int, p3: Optional[int] = None): """Initialize a PoseNormalizationInfo instance.""" self.p1 = p1 self.p2 = p2 @@ -65,13 +65,6 @@ def __init__(self, name: str, points: List[str], limbs: List[Tuple[int, int]], c self.relative_limbs = self.get_relative_limbs() - def copy(self) -> 'PoseHeaderComponent': - return PoseHeaderComponent(name = self.name, - points = self.points, - limbs= self.limbs, - colors=self.colors, - point_format = self.format) - @staticmethod def read(version: float, reader: BufferReader) -> 'PoseHeaderComponent': """ @@ -189,9 +182,6 @@ def __init__(self, width: int, height: int, depth: int = 0, *args): self.height = math.ceil(height) self.depth = math.ceil(depth) - def copy(self) -> 'PoseHeaderDimensions': - return self.__class__(self.width, self.height, self.depth) - @staticmethod def read(version: float, reader: BufferReader) -> 'PoseHeaderDimensions': """ @@ -303,12 +293,6 @@ def __init__(self, self.components = components self.is_bbox = is_bbox - def copy(self) -> 'PoseHeader': - return PoseHeader(version=self.version, - dimensions=self.dimensions.copy(), - components=[c.copy() for c in self.components], - is_bbox=self.is_bbox - ) @staticmethod def read(reader: BufferReader) -> 'PoseHeader': @@ -393,9 +377,12 @@ def _get_point_index(self, component: str, point: str): raise ValueError("Couldn't find component") + def get_point_index(self, component: str, point: str)-> int: + return self._get_point_index(component, point) + def normalization_info(self, p1: Tuple[str, str], p2: Tuple[str, str], p3: Tuple[str, str] = None): """ - Normalizates info for given points. + Normalization info for given points. Parameters ---------- @@ -411,9 +398,9 @@ def normalization_info(self, p1: Tuple[str, str], p2: Tuple[str, str], p3: Tuple PoseNormalizationInfo Normalization information for the points. """ - return PoseNormalizationInfo(p1=self._get_point_index(*p1), - p2=self._get_point_index(*p2), - p3=None if p3 is None else self._get_point_index(*p3)) + return PoseNormalizationInfo(p1=self.get_point_index(*p1), + p2=self.get_point_index(*p2), + p3=None if p3 is None else self.get_point_index(*p3)) def bbox(self): """ diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index 7da8630..a537787 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -102,9 +102,8 @@ def pose_hide_legs(pose: Pose): known_pose_format = detect_known_pose_format(pose) if known_pose_format == "holistic": point_names = ["KNEE", "ANKLE", "HEEL", "FOOT_INDEX"] - # pylint: disable=protected-access points = [ - pose.header._get_point_index("POSE_LANDMARKS", side + "_" + n) + pose.header.get_point_index("POSE_LANDMARKS", side + "_" + n) for n in point_names for side in ["LEFT", "RIGHT"] ] @@ -112,9 +111,8 @@ def pose_hide_legs(pose: Pose): pose.body.confidence[:, :, points] = 0 elif known_pose_format == "openpose": point_names = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] - # pylint: disable=protected-access points = [ - pose.header._get_point_index("pose_keypoints_2d", side + n) for n in point_names for side in ["L", "R"] + pose.header.get_point_index("pose_keypoints_2d", side + n) for n in point_names for side in ["L", "R"] ] pose.body.data[:, :, points, :] = 0 pose.body.confidence[:, :, points] = 0 @@ -145,18 +143,14 @@ def hands_indexes(pose_header: PoseHeader)-> List[int]: known_pose_format = detect_known_pose_format(pose_header) if known_pose_format == "holistic": return [ - # pylint: disable=protected-access - pose_header._get_point_index("LEFT_HAND_LANDMARKS", "MIDDLE_FINGER_MCP"), - # pylint: disable=protected-access - pose_header._get_point_index("RIGHT_HAND_LANDMARKS", "MIDDLE_FINGER_MCP"), + pose_header.get_point_index("LEFT_HAND_LANDMARKS", "MIDDLE_FINGER_MCP"), + pose_header.get_point_index("RIGHT_HAND_LANDMARKS", "MIDDLE_FINGER_MCP"), ] if known_pose_format == "openpose": return [ - # pylint: disable=protected-access - pose_header._get_point_index("hand_left_keypoints_2d", "M_CMC"), - # pylint: disable=protected-access - pose_header._get_point_index("hand_right_keypoints_2d", "M_CMC"), + pose_header.get_point_index("hand_left_keypoints_2d", "M_CMC"), + pose_header.get_point_index("hand_right_keypoints_2d", "M_CMC"), ] raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {hands_indexes.__name__}: {pose_header}" @@ -254,11 +248,9 @@ def fake_pose(num_frames: int, fps: int=25, components: Union[List[PoseHeaderCom def get_hand_wrist_index(pose: Pose, hand: str)-> int: known_pose_format = detect_known_pose_format(pose) if known_pose_format == "holistic": - # pylint: disable=protected-access - return pose.header._get_point_index(f"{hand.upper()}_HAND_LANDMARKS", "WRIST") + return pose.header.get_point_index(f"{hand.upper()}_HAND_LANDMARKS", "WRIST") if known_pose_format == "openpose": - # pylint: disable=protected-access - return pose.header._get_point_index(f"hand_{hand.lower()}_keypoints_2d", "BASE") + return pose.header.get_point_index(f"hand_{hand.lower()}_keypoints_2d", "BASE") raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {get_hand_wrist_index.__name__}: {pose.header}" ) @@ -267,11 +259,9 @@ def get_hand_wrist_index(pose: Pose, hand: str)-> int: def get_body_hand_wrist_index(pose: Pose, hand: str)-> int: known_pose_format = detect_known_pose_format(pose) if known_pose_format == "holistic": - # pylint: disable=protected-access - return pose.header._get_point_index("POSE_LANDMARKS", f"{hand.upper()}_WRIST") + return pose.header.get_point_index("POSE_LANDMARKS", f"{hand.upper()}_WRIST") if known_pose_format == "openpose": - # pylint: disable=protected-access - return pose.header._get_point_index("pose_keypoints_2d", f"{hand.upper()[0]}Wrist") + return pose.header.get_point_index("pose_keypoints_2d", f"{hand.upper()[0]}Wrist") raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {get_body_hand_wrist_index.__name__}: {pose.header}" ) diff --git a/src/python/pose_format/utils/pose_converter.py b/src/python/pose_format/utils/pose_converter.py index 04397f1..6912a3c 100644 --- a/src/python/pose_format/utils/pose_converter.py +++ b/src/python/pose_format/utils/pose_converter.py @@ -222,8 +222,8 @@ def convert_pose(pose: Pose, pose_components: List[PoseHeaderComponent]) -> Pose for (c1, p1), (c2, p2) in mapping.items(): p2 = tuple([p2]) if isinstance(p2, str) else p2 try: - p2s = [pose.header._get_point_index(c2, p) for p in list(p2)] - p1_index = pose_header._get_point_index(c1, p1) + p2s = [pose.header.get_point_index(c2, p) for p in list(p2)] + p1_index = pose_header.get_point_index(c1, p1) data[:, :, p1_index, :dims] = pose.body.data[:, :, p2s, :dims].mean(axis=2) conf[:, :, p1_index] = pose.body.confidence[:, :, p2s].mean(axis=2) except Exception as e: diff --git a/src/python/tests/pose_test.py b/src/python/tests/pose_test.py index fa30cbc..0dfb801 100644 --- a/src/python/tests/pose_test.py +++ b/src/python/tests/pose_test.py @@ -608,8 +608,6 @@ def test_pose_tf_posebody_copy_creates_deepcopy(self): # Create another copy and ensure it matches the first copy pose = pose_copy.copy() self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") - self.assertNotEqual(pose.header, pose_copy.header, "headers should be new objects as well") - self.assertNotEqual(pose.header.components, pose_copy.header.components, "Components should be new objects as well") self.assertTrue(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should match original again") @@ -680,8 +678,6 @@ def test_pose_numpy_posebody_copy_creates_deepcopy(self): pose_copy = pose.copy() self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") - self.assertNotEqual(pose.header, pose_copy.header, "headers should be new objects as well") - self.assertNotEqual(pose.header.components, pose_copy.header.components, "components should be new objects as well") self.assertTrue(np.array_equal(pose.body.data, pose_copy.body.data), "Copy's data should match original") From 10043ad2210f6db09610780bdd12c5fd38f02280 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 10:01:17 -0500 Subject: [PATCH 06/13] combine hide_legs and remove_legs, and fix error --- src/python/pose_format/pose_header.py | 8 +- src/python/pose_format/utils/conftest.py | 2 +- .../pose_format/utils/directorybackup.py | 192 ++++++++++++++++++ src/python/pose_format/utils/generic.py | 76 +++---- src/python/pose_format/utils/generic_test.py | 10 +- 5 files changed, 233 insertions(+), 55 deletions(-) create mode 100644 src/python/pose_format/utils/directorybackup.py diff --git a/src/python/pose_format/pose_header.py b/src/python/pose_format/pose_header.py index 8b31fbb..54db997 100644 --- a/src/python/pose_format/pose_header.py +++ b/src/python/pose_format/pose_header.py @@ -377,8 +377,12 @@ def _get_point_index(self, component: str, point: str): raise ValueError("Couldn't find component") - def get_point_index(self, component: str, point: str)-> int: - return self._get_point_index(component, point) + def get_point_index(self, component: str, point: str)-> Union[int, None]: + try: + return self._get_point_index(component, point) + except ValueError: + # if component or point doesn't exist. + return None def normalization_info(self, p1: Tuple[str, str], p2: Tuple[str, str], p3: Tuple[str, str] = None): """ diff --git a/src/python/pose_format/utils/conftest.py b/src/python/pose_format/utils/conftest.py index 4f4ae30..3da50ae 100644 --- a/src/python/pose_format/utils/conftest.py +++ b/src/python/pose_format/utils/conftest.py @@ -20,4 +20,4 @@ def fake_poses(request) -> List[Pose]: for i, pose in enumerate(fake_poses_list): for component in pose.header.components: component.name = f"unknown_component_{i}_formerly_{component.name}" - return copy.deepcopy(fake_poses_list) + return [pose.copy() for pose in fake_poses_list] diff --git a/src/python/pose_format/utils/directorybackup.py b/src/python/pose_format/utils/directorybackup.py new file mode 100644 index 0000000..a105e53 --- /dev/null +++ b/src/python/pose_format/utils/directorybackup.py @@ -0,0 +1,192 @@ +import os +import argparse +import logging +from typing import List +from pathlib import Path +from functools import partial +import psutil +from tqdm.contrib.concurrent import process_map +from pose_format.bin.pose_estimation import pose_video, parse_additional_config + + + + + + + +# Note: untested other than .mp4. Support for .webm may have issues: https://github.com/sign-language-processing/pose/pull/126 +SUPPORTED_VIDEO_FORMATS = [".mp4", ".mov", ".avi", ".mkv", ".flv", ".wmv", ".webm"] + + +def find_videos_with_missing_pose_files( + directory: Path, + video_suffixes: List[str] = None, + recursive: bool = False, + keep_video_suffixes: bool = False, +) -> List[Path]: + """ + Finds videos with missing .pose files. + + Parameters + ---------- + directory: Path, + Directory to search for videos in. + video_suffixes: List[str], optional + Suffixes to look for, e.g. [".mp4", ".webm"]. If None, will use _SUPPORTED_VIDEO_FORMATS + recursive: bool, optional + Whether to look for video files recursively, or just the top-level. Defaults to false. + keep_video_suffixes: bool, optional + If true, when checking will append .pose suffix (e.g. foo.mp4->foo.mp4.pose, foo.webm->foo.webm.pose), + If false, will replace it (foo.mp4 becomes foo.pose, and foo.webm ALSO becomes foo.pose). + Default is false, which can cause name collisions. + + Returns + ------- + List[Path] + List of video paths without corresponding .pose files. + """ + + # Prevents the common gotcha with mutable default arg lists: + # https://docs.python-guide.org/writing/gotchas/#mutable-default-arguments + if video_suffixes is None: + video_suffixes = SUPPORTED_VIDEO_FORMATS + + glob_method = getattr(directory, "rglob" if recursive else "glob") + all_files = list(glob_method(f"*")) + video_files = [path for path in all_files if path.suffix in video_suffixes] + pose_files = {path for path in all_files if path.suffix == ".pose"} + + videos_with_missing_pose_files = [] + + for vid_path in video_files: + corresponding_pose = get_corresponding_pose_path(video_path=vid_path, keep_video_suffixes=keep_video_suffixes) + if corresponding_pose not in pose_files: + videos_with_missing_pose_files.append(vid_path) + + return videos_with_missing_pose_files + + +def get_corresponding_pose_path(video_path: Path, keep_video_suffixes: bool = False) -> Path: + """ + Given a video path, and whether to keep the suffix, returns the expected corresponding path with .pose extension. + + Parameters + ---------- + video_path : Path + Path to a video file + keep_video_suffixes : bool, optional + Whether to keep suffix (e.g. foo.mp4 -> foo.mp4.pose) + or replace (foo.mp4->foo.pose). Defaults to replace. + + Returns + ------- + Path + pathlib Path + """ + if keep_video_suffixes: + return video_path.with_name(f"{video_path.name}.pose") + return video_path.with_suffix(".pose") + + +def process_video(keep_video_suffixes: bool, pose_format: str, additional_config: dict, vid_path: Path) -> bool: + print(f'Estimating {vid_path} on CPU {psutil.Process().cpu_num()}') + + try: + pose_path = get_corresponding_pose_path(video_path=vid_path, keep_video_suffixes=keep_video_suffixes) + if pose_path.is_file(): + print(f"Skipping {vid_path}, corresponding .pose file already created.") + else: + # pose_video function expects string, and passes it unchanged to cv2.VideoCapture(input_path) + # if you give cv2.VideoCapture(input_path) a Path it crashes on older versions. + # https://github.com/opencv/opencv/issues/15731 + pose_video(str(vid_path.resolve()), str(pose_path.resolve()), pose_format, additional_config, progress=False) + return True + + except ValueError as e: + print(f"ValueError on {vid_path}") + logging.exception(e) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-f", + "--format", + choices=["mediapipe"], + default="mediapipe", + type=str, + help="type of pose estimation to use", + ) + parser.add_argument( + "-d", + "--directory", + type=Path, + required=True, + help="Directory to search for videos in", + ) + parser.add_argument( + "-r", + "--recursive", + action="store_true", + help="Whether to search for videos recursively", + ) + parser.add_argument( + "--keep-video-suffixes", + action="store_true", + help="Whether to drop the video extension (output for foo.mp4 becomes foo.pose, and foo.webm ALSO becomes foo.pose) or append to it (foo.mp4 becomes foo.mp4.pose, foo.webm output is foo.webm.pose). If there are multiple videos with the same basename but different extensions, this will create a .pose file for each. Otherwise only the first video will be posed.", + ) + parser.add_argument( + "--video-suffixes", + type=str, + choices=SUPPORTED_VIDEO_FORMATS, + default=SUPPORTED_VIDEO_FORMATS, + help="Video extensions to search for. Defaults to searching for all supported.", + ) + parser.add_argument( + "--num-workers", + type=int, + default=1, + help="Number of multiprocessing workers.", + required=False + ) + parser.add_argument( + "--additional-config", + type=str, + help="additional configuration for the pose estimator", + ) + args = parser.parse_args() + + videos_with_missing_pose_files = find_videos_with_missing_pose_files( + args.directory, + video_suffixes=args.video_suffixes, + recursive=args.recursive, + keep_video_suffixes=args.keep_video_suffixes, + ) + + print(f"Found {len(videos_with_missing_pose_files)} videos missing pose files.") + + pose_files_that_will_be_created = {get_corresponding_pose_path(vid_path, args.keep_video_suffixes) for vid_path in videos_with_missing_pose_files} + + if len(pose_files_that_will_be_created) < len(videos_with_missing_pose_files): + continue_input = input( + f"With current naming strategy (without --keep-video-suffixes), name collisions will result in only {len(pose_files_that_will_be_created)} .pose files being created. Continue? [y/n]" + ) + if continue_input.lower() != "y": + print(f"Exiting. To keep video suffixes and avoid collisions, use --keep-video-suffixes") + exit() + + additional_config = parse_additional_config(args.additional_config) + + pose_with_no_errors_count = 0 + + if args.num_workers == 1: + print('Process sequentially ...') + else: + print(f'Multiprocessing with {args.num_workers} workers on {len(os.sched_getaffinity(0))} available CPUs ...') + + func = partial(process_video, args.keep_video_suffixes, args.format, additional_config) + for success in process_map(func, videos_with_missing_pose_files, max_workers=args.num_workers): + if success: + pose_with_no_errors_count += 1 + + print(f"Successfully created pose files for {pose_with_no_errors_count}/{len(videos_with_missing_pose_files)} video files") diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index a537787..ad88e9c 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -61,66 +61,50 @@ def normalize_pose_size(pose: Pose, target_width: int = 512): pose.header.dimensions.height = pose.header.dimensions.width = target_width -def pose_remove_legs(pose: Pose) ->Pose: - """Remove (not _hide_) legs. Also does the hip +def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: + """ + Hide or remove leg components from a pose. + + If `remove` is True, the leg components are removed; otherwise, they are hidden (zeroed out). """ known_pose_format = detect_known_pose_format(pose) - + if known_pose_format == "holistic": - mediapipe_point_names = ["KNEE", "ANKLE", "HEEL", "HIP", "FOOT_INDEX"] - mediapipe_sides = ["LEFT", "RIGHT"] - point_names_to_remove = [ - side + "_" + name - for name in mediapipe_point_names - for side in mediapipe_sides - ] - points_to_remove_dict ={ + point_names = ["KNEE", "ANKLE", "HEEL", "FOOT_INDEX", "HIP"] + sides = ["LEFT", "RIGHT"] + point_names_to_remove = [f"{side}_{name}" for side in sides for name in point_names] + points_to_remove_dict = { "POSE_LANDMARKS": point_names_to_remove, "POSE_WORLD_LANDMARKS": point_names_to_remove, } - elif known_pose_format == 'openpose': - openpose_point_names = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] - openpose_sides = ["L", "R", "Mid"] - point_names_to_remove = [ - side + name - for name in openpose_point_names - for side in openpose_sides - ] - points_to_remove_dict = { - "pose_keypoints_2d": point_names_to_remove - } - else: - raise NotImplementedError( - f"Unsupported pose header schema {known_pose_format} for {pose_remove_legs.__name__}: {pose.header}" - ) - - pose = pose.remove_components([], points_to_remove_dict) - return pose - -def pose_hide_legs(pose: Pose): - known_pose_format = detect_known_pose_format(pose) - if known_pose_format == "holistic": - point_names = ["KNEE", "ANKLE", "HEEL", "FOOT_INDEX"] - points = [ - pose.header.get_point_index("POSE_LANDMARKS", side + "_" + n) - for n in point_names - for side in ["LEFT", "RIGHT"] - ] - pose.body.data[:, :, points, :] = 0 - pose.body.confidence[:, :, points] = 0 elif known_pose_format == "openpose": point_names = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] - points = [ - pose.header.get_point_index("pose_keypoints_2d", side + n) for n in point_names for side in ["L", "R"] - ] - pose.body.data[:, :, points, :] = 0 - pose.body.confidence[:, :, points] = 0 + sides = ["L", "R", "Mid"] + point_names_to_remove = [f"{side}{name}" for side in sides for name in point_names] + points_to_remove_dict = {"pose_keypoints_2d": point_names_to_remove} + else: raise NotImplementedError( f"Unsupported pose header schema {known_pose_format} for {pose_hide_legs.__name__}: {pose.header}" ) + if remove: + return pose.remove_components([], points_to_remove_dict) + + # Hide the points instead of removing them + # Some of the generated point_names_to_remove don't exist, e.g. MidHip, so get_point_index gives None + points = [] + for name in point_names_to_remove: + point = pose.header.get_point_index(list(points_to_remove_dict.keys())[0], name) + if point is not None: # point not found + points.append(point) + + pose.body.data[:, :, points, :] = 0 + pose.body.confidence[:, :, points] = 0 + + return pose + def pose_shoulders(pose_header: PoseHeader) -> Tuple[Tuple[str, str], Tuple[str, str]]: known_pose_format = detect_known_pose_format(pose_header) diff --git a/src/python/pose_format/utils/generic_test.py b/src/python/pose_format/utils/generic_test.py index f396afb..68db3ce 100644 --- a/src/python/pose_format/utils/generic_test.py +++ b/src/python/pose_format/utils/generic_test.py @@ -10,7 +10,6 @@ get_standard_components_for_known_format, KnownPoseFormat, pose_hide_legs, - pose_remove_legs, pose_shoulders, hands_indexes, normalize_pose_size, @@ -67,7 +66,6 @@ def test_pose_hide_legs(fake_poses: List[Pose]): if detected_format == "openpose_135": with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): pose_hide_legs(pose) - return else: pose_hide_legs(pose) new_nonzeros_count = np.count_nonzero(pose.body.data) @@ -197,7 +195,7 @@ def test_pose_remove_legs(fake_poses: List[Pose]): assert "LEFT_KNEE" in pose.header.components[pose_landmarks_index].points - pose_with_legs_removed = pose_remove_legs(pose) + pose_with_legs_removed = pose_hide_legs(pose, remove=True) assert pose_with_legs_removed != pose new_c_names = [c.name for c in pose_with_legs_removed.header.components] assert "POSE_LANDMARKS" in new_c_names @@ -217,14 +215,14 @@ def test_pose_remove_legs(fake_poses: List[Pose]): 'LSmallToe', 'RSmallToe', 'LHeel', 'RHeel'] component_index = c_names.index("pose_keypoints_2d") - pose_with_legs_removed = pose_remove_legs(pose) + pose_with_legs_removed = pose_hide_legs(pose, remove=True) for point_name in points_that_should_be_removed: - assert point_name not in pose_with_legs_removed.header.components[component_index].points + assert point_name not in pose_with_legs_removed.header.components[component_index].points, f"{pose_with_legs_removed.header.components[component_index].name},{pose_with_legs_removed.header.components[component_index].points}" assert point_name in pose.header.components[component_index].points else: with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): - pose_remove_legs(pose) + pose_hide_legs(pose, remove=True) From 3a09b7c30af88744e891edaadd55b4beaf139172 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:10:38 -0500 Subject: [PATCH 07/13] Remove unintended file --- .../pose_format/utils/directorybackup.py | 192 ------------------ 1 file changed, 192 deletions(-) delete mode 100644 src/python/pose_format/utils/directorybackup.py diff --git a/src/python/pose_format/utils/directorybackup.py b/src/python/pose_format/utils/directorybackup.py deleted file mode 100644 index a105e53..0000000 --- a/src/python/pose_format/utils/directorybackup.py +++ /dev/null @@ -1,192 +0,0 @@ -import os -import argparse -import logging -from typing import List -from pathlib import Path -from functools import partial -import psutil -from tqdm.contrib.concurrent import process_map -from pose_format.bin.pose_estimation import pose_video, parse_additional_config - - - - - - - -# Note: untested other than .mp4. Support for .webm may have issues: https://github.com/sign-language-processing/pose/pull/126 -SUPPORTED_VIDEO_FORMATS = [".mp4", ".mov", ".avi", ".mkv", ".flv", ".wmv", ".webm"] - - -def find_videos_with_missing_pose_files( - directory: Path, - video_suffixes: List[str] = None, - recursive: bool = False, - keep_video_suffixes: bool = False, -) -> List[Path]: - """ - Finds videos with missing .pose files. - - Parameters - ---------- - directory: Path, - Directory to search for videos in. - video_suffixes: List[str], optional - Suffixes to look for, e.g. [".mp4", ".webm"]. If None, will use _SUPPORTED_VIDEO_FORMATS - recursive: bool, optional - Whether to look for video files recursively, or just the top-level. Defaults to false. - keep_video_suffixes: bool, optional - If true, when checking will append .pose suffix (e.g. foo.mp4->foo.mp4.pose, foo.webm->foo.webm.pose), - If false, will replace it (foo.mp4 becomes foo.pose, and foo.webm ALSO becomes foo.pose). - Default is false, which can cause name collisions. - - Returns - ------- - List[Path] - List of video paths without corresponding .pose files. - """ - - # Prevents the common gotcha with mutable default arg lists: - # https://docs.python-guide.org/writing/gotchas/#mutable-default-arguments - if video_suffixes is None: - video_suffixes = SUPPORTED_VIDEO_FORMATS - - glob_method = getattr(directory, "rglob" if recursive else "glob") - all_files = list(glob_method(f"*")) - video_files = [path for path in all_files if path.suffix in video_suffixes] - pose_files = {path for path in all_files if path.suffix == ".pose"} - - videos_with_missing_pose_files = [] - - for vid_path in video_files: - corresponding_pose = get_corresponding_pose_path(video_path=vid_path, keep_video_suffixes=keep_video_suffixes) - if corresponding_pose not in pose_files: - videos_with_missing_pose_files.append(vid_path) - - return videos_with_missing_pose_files - - -def get_corresponding_pose_path(video_path: Path, keep_video_suffixes: bool = False) -> Path: - """ - Given a video path, and whether to keep the suffix, returns the expected corresponding path with .pose extension. - - Parameters - ---------- - video_path : Path - Path to a video file - keep_video_suffixes : bool, optional - Whether to keep suffix (e.g. foo.mp4 -> foo.mp4.pose) - or replace (foo.mp4->foo.pose). Defaults to replace. - - Returns - ------- - Path - pathlib Path - """ - if keep_video_suffixes: - return video_path.with_name(f"{video_path.name}.pose") - return video_path.with_suffix(".pose") - - -def process_video(keep_video_suffixes: bool, pose_format: str, additional_config: dict, vid_path: Path) -> bool: - print(f'Estimating {vid_path} on CPU {psutil.Process().cpu_num()}') - - try: - pose_path = get_corresponding_pose_path(video_path=vid_path, keep_video_suffixes=keep_video_suffixes) - if pose_path.is_file(): - print(f"Skipping {vid_path}, corresponding .pose file already created.") - else: - # pose_video function expects string, and passes it unchanged to cv2.VideoCapture(input_path) - # if you give cv2.VideoCapture(input_path) a Path it crashes on older versions. - # https://github.com/opencv/opencv/issues/15731 - pose_video(str(vid_path.resolve()), str(pose_path.resolve()), pose_format, additional_config, progress=False) - return True - - except ValueError as e: - print(f"ValueError on {vid_path}") - logging.exception(e) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "-f", - "--format", - choices=["mediapipe"], - default="mediapipe", - type=str, - help="type of pose estimation to use", - ) - parser.add_argument( - "-d", - "--directory", - type=Path, - required=True, - help="Directory to search for videos in", - ) - parser.add_argument( - "-r", - "--recursive", - action="store_true", - help="Whether to search for videos recursively", - ) - parser.add_argument( - "--keep-video-suffixes", - action="store_true", - help="Whether to drop the video extension (output for foo.mp4 becomes foo.pose, and foo.webm ALSO becomes foo.pose) or append to it (foo.mp4 becomes foo.mp4.pose, foo.webm output is foo.webm.pose). If there are multiple videos with the same basename but different extensions, this will create a .pose file for each. Otherwise only the first video will be posed.", - ) - parser.add_argument( - "--video-suffixes", - type=str, - choices=SUPPORTED_VIDEO_FORMATS, - default=SUPPORTED_VIDEO_FORMATS, - help="Video extensions to search for. Defaults to searching for all supported.", - ) - parser.add_argument( - "--num-workers", - type=int, - default=1, - help="Number of multiprocessing workers.", - required=False - ) - parser.add_argument( - "--additional-config", - type=str, - help="additional configuration for the pose estimator", - ) - args = parser.parse_args() - - videos_with_missing_pose_files = find_videos_with_missing_pose_files( - args.directory, - video_suffixes=args.video_suffixes, - recursive=args.recursive, - keep_video_suffixes=args.keep_video_suffixes, - ) - - print(f"Found {len(videos_with_missing_pose_files)} videos missing pose files.") - - pose_files_that_will_be_created = {get_corresponding_pose_path(vid_path, args.keep_video_suffixes) for vid_path in videos_with_missing_pose_files} - - if len(pose_files_that_will_be_created) < len(videos_with_missing_pose_files): - continue_input = input( - f"With current naming strategy (without --keep-video-suffixes), name collisions will result in only {len(pose_files_that_will_be_created)} .pose files being created. Continue? [y/n]" - ) - if continue_input.lower() != "y": - print(f"Exiting. To keep video suffixes and avoid collisions, use --keep-video-suffixes") - exit() - - additional_config = parse_additional_config(args.additional_config) - - pose_with_no_errors_count = 0 - - if args.num_workers == 1: - print('Process sequentially ...') - else: - print(f'Multiprocessing with {args.num_workers} workers on {len(os.sched_getaffinity(0))} available CPUs ...') - - func = partial(process_video, args.keep_video_suffixes, args.format, additional_config) - for success in process_map(func, videos_with_missing_pose_files, max_workers=args.num_workers): - if success: - pose_with_no_errors_count += 1 - - print(f"Successfully created pose files for {pose_with_no_errors_count}/{len(videos_with_missing_pose_files)} video files") From ef87828a7144262257d73f566af471f752cbfd28 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:43:17 -0500 Subject: [PATCH 08/13] update hide_legs to not crash when trying to hide invalid points --- src/python/pose_format/utils/generic.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index ad88e9c..c2c4294 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -7,6 +7,7 @@ from pose_format.pose_header import PoseHeader, PoseHeaderDimensions, PoseHeaderComponent, PoseNormalizationInfo from pose_format.utils.normalization_3d import PoseNormalizer from pose_format.utils.openpose import OpenPose_Components +from pose_format.utils.openpose import BODY_POINTS as OPENPOSE_BODY_POINTS from pose_format.utils.openpose_135 import OpenPose_Components as OpenPose135_Components # from pose_format.utils.holistic import holistic_components @@ -79,9 +80,10 @@ def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: } elif known_pose_format == "openpose": - point_names = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] - sides = ["L", "R", "Mid"] - point_names_to_remove = [f"{side}{name}" for side in sides for name in point_names] + words_to_look_for = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] + point_names_to_remove = [point for point in OPENPOSE_BODY_POINTS if any(word in point for word in words_to_look_for)] + + # if any of the items in point_ points_to_remove_dict = {"pose_keypoints_2d": point_names_to_remove} else: @@ -93,12 +95,14 @@ def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: return pose.remove_components([], points_to_remove_dict) # Hide the points instead of removing them - # Some of the generated point_names_to_remove don't exist, e.g. MidHip, so get_point_index gives None points = [] for name in point_names_to_remove: - point = pose.header.get_point_index(list(points_to_remove_dict.keys())[0], name) - if point is not None: # point not found + try: + point = pose.header.get_point_index(list(points_to_remove_dict.keys())[0], name) points.append(point) + except ValueError: # point not found, maybe removed earlier in other preprocessing steps + pass + pose.body.data[:, :, points, :] = 0 pose.body.confidence[:, :, points] = 0 From b95abc9378ac6c736b4bb60b6a783a869b327723 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:51:06 -0500 Subject: [PATCH 09/13] Add some more tests --- src/python/pose_format/utils/generic_test.py | 4 +++ src/python/tests/pose_test.py | 28 +++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/python/pose_format/utils/generic_test.py b/src/python/pose_format/utils/generic_test.py index 68db3ce..1353eb1 100644 --- a/src/python/pose_format/utils/generic_test.py +++ b/src/python/pose_format/utils/generic_test.py @@ -60,6 +60,7 @@ def test_get_component_names(fake_poses: List[Pose], known_pose_format: KnownPos @pytest.mark.parametrize("fake_poses", list(get_args(KnownPoseFormat)), indirect=["fake_poses"]) def test_pose_hide_legs(fake_poses: List[Pose]): for pose in fake_poses: + pose_copy = pose.copy() orig_nonzeros_count = np.count_nonzero(pose.body.data) detected_format = detect_known_pose_format(pose) @@ -71,6 +72,9 @@ def test_pose_hide_legs(fake_poses: List[Pose]): new_nonzeros_count = np.count_nonzero(pose.body.data) assert orig_nonzeros_count > new_nonzeros_count + assert len(pose_copy.header.components) == len(pose.header.components) + for c_orig, c_copy in zip(pose.header.components, pose_copy.header.components): + assert len(c_orig.points) == len(c_copy.points) @pytest.mark.parametrize("fake_poses", TEST_POSE_FORMATS, indirect=["fake_poses"]) diff --git a/src/python/tests/pose_test.py b/src/python/tests/pose_test.py index 0dfb801..17753ca 100644 --- a/src/python/tests/pose_test.py +++ b/src/python/tests/pose_test.py @@ -367,6 +367,10 @@ def test_pose_remove_components(self): self.assertIn(component_to_remove, [c.name for c in pose_copy.header.components]) pose_copy = pose_copy.remove_components(component_to_remove) self.assertNotIn(component_to_remove, [c.name for c in pose_copy.header.components]) + self.assertEqual(pose_copy.header.components[0].name, "1") + # quickly check to make sure other components/points weren't removed + self.assertIn("1_a", pose_copy.header.components[0].points) + self.assertEqual(pose_copy.header.components[0].points, pose.header.components[1].points) # Remove a point only @@ -375,9 +379,13 @@ def test_pose_remove_components(self): self.assertIn(point_to_remove, pose_copy.header.components[0].points) pose_copy = pose_copy.remove_components([], {point_to_remove[0]:[point_to_remove]}) self.assertNotIn(point_to_remove, pose_copy.header.components[0].points) + # quickly check to make sure other components/points weren't removed + self.assertIn("1_a", pose_copy.header.components[1].points) + self.assertEqual(pose_copy.header.components[1].points, pose.header.components[1].points) # Can we remove two things at once + pose_copy = pose.copy() component_to_remove = "1" point_to_remove = "2_a" component_to_remove_point_from = "2" @@ -388,6 +396,8 @@ def test_pose_remove_components(self): pose_copy = pose_copy.remove_components([component_to_remove], {component_to_remove_point_from:[point_to_remove]}) self.assertNotIn(component_to_remove, [c.name for c in pose_copy.header.components]) self.assertIn(component_to_remove_point_from, [c.name for c in pose_copy.header.components]) # this should still be around + self.assertIn("0_a", pose_copy.header.components[0].points) + self.assertEqual(pose_copy.header.components[0].points, pose.header.components[0].points) # should be unaffected # can we remove a component and a point FROM that component without crashing component_to_remove = "0" @@ -397,6 +407,7 @@ def test_pose_remove_components(self): pose_copy = pose_copy.remove_components([component_to_remove], {component_to_remove:[point_to_remove]}) self.assertNotIn(component_to_remove, [c.name for c in pose_copy.header.components]) self.assertNotIn(point_to_remove, pose_copy.header.components[0].points) + self.assertEqual(pose_copy.header.components[0].points, pose.header.components[1].points) # should be unaffected # can we "remove" a component that doesn't exist without crashing @@ -405,6 +416,10 @@ def test_pose_remove_components(self): initial_count = len(pose_copy.header.components) pose_copy = pose_copy.remove_components([component_to_remove]) self.assertEqual(initial_count, len(pose_copy.header.components)) + for c_orig, c_copy in zip(pose.header.components, pose_copy.header.components): + self.assertNotEqual(c_copy, c_orig) # should be a new object... + self.assertEqual(c_copy.name, c_orig.name) # with the same name + self.assertEqual(c_copy.points, c_orig.points) # and the same points @@ -416,6 +431,10 @@ def test_pose_remove_components(self): self.assertNotIn(point_to_remove, pose_copy.header.components[2].points) pose_copy = pose_copy.remove_components([], {component_to_remove_point_from:[point_to_remove]}) self.assertNotIn(point_to_remove, pose_copy.header.components[2].points) + for c_orig, c_copy in zip(pose.header.components, pose_copy.header.components): + self.assertNotEqual(c_copy, c_orig) # should be a new object... + self.assertEqual(c_copy.name, c_orig.name) # with the same name + self.assertEqual(c_copy.points, c_orig.points) # and the same points # can we "remove" an empty list of points @@ -426,7 +445,10 @@ def test_pose_remove_components(self): pose_copy = pose_copy.remove_components([], {component_to_remove_point_from:[]}) self.assertEqual(initial_component_count, len(pose_copy.header.components)) self.assertEqual(len(pose_copy.header.components[2].points), initial_point_count) - + for c_orig, c_copy in zip(pose.header.components, pose_copy.header.components): + self.assertNotEqual(c_copy, c_orig) # should be a new object... + self.assertEqual(c_copy.name, c_orig.name) # with the same name + self.assertEqual(c_copy.points, c_orig.points) # and the same points # can we remove a point from a component that doesn't exist point_to_remove = "2_x" @@ -435,6 +457,10 @@ def test_pose_remove_components(self): self.assertNotIn(point_to_remove, pose_copy.header.components[2].points) pose_copy = pose_copy.remove_components([], {component_to_remove_point_from:[point_to_remove]}) self.assertNotIn(point_to_remove, pose_copy.header.components[2].points) + for c_orig, c_copy in zip(pose.header.components, pose_copy.header.components): + self.assertNotEqual(c_copy, c_orig) # should be a new object... + self.assertEqual(c_copy.name, c_orig.name) # with the same name + self.assertEqual(c_copy.points, c_orig.points) # and the same points From a5861052bd34cfdd55b1ba98363e353488c8e139 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:51:42 -0500 Subject: [PATCH 10/13] pose_header.get_point_indexRemove try/catch --- src/python/pose_format/pose_header.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/python/pose_format/pose_header.py b/src/python/pose_format/pose_header.py index 54db997..fb9f7a1 100644 --- a/src/python/pose_format/pose_header.py +++ b/src/python/pose_format/pose_header.py @@ -377,12 +377,18 @@ def _get_point_index(self, component: str, point: str): raise ValueError("Couldn't find component") - def get_point_index(self, component: str, point: str)-> Union[int, None]: - try: - return self._get_point_index(component, point) - except ValueError: - # if component or point doesn't exist. - return None + def get_point_index(self, component: str, point: str) -> int: + """ + Returns the index of a given point within the pose. + + Args: + component (str): The name of the component containing the point. + point (str): The name of the point whose index is to be retrieved. + + Raises: + ValueError: If the specified component or point is not found. + """ + return self._get_point_index(component, point) def normalization_info(self, p1: Tuple[str, str], p2: Tuple[str, str], p3: Tuple[str, str] = None): """ From 15b063a184731c6b4ba13068905045a858426e3c Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:06:53 -0500 Subject: [PATCH 11/13] iterate over points_to_remove_dict in pose_hide_legs --- src/python/pose_format/utils/generic.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index c2c4294..41169fc 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -95,17 +95,18 @@ def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: return pose.remove_components([], points_to_remove_dict) # Hide the points instead of removing them - points = [] - for name in point_names_to_remove: - try: - point = pose.header.get_point_index(list(points_to_remove_dict.keys())[0], name) - points.append(point) - except ValueError: # point not found, maybe removed earlier in other preprocessing steps - pass - - - pose.body.data[:, :, points, :] = 0 - pose.body.confidence[:, :, points] = 0 + point_indices = [] + for component, points in points_to_remove_dict.items(): + for point_name in points: + try: + point_index = pose.header.get_point_index(component, point_name) + point_indices.append(point_index) + except ValueError: # point not found, maybe removed earlier in other preprocessing steps + pass + + + pose.body.data[:, :, point_indices, :] = 0 + pose.body.confidence[:, :, point_indices] = 0 return pose From a9030dbd95f04d67e74f696b5382b67294c9ed28 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:18:21 -0500 Subject: [PATCH 12/13] Adding a few more test updates --- src/python/pose_format/utils/generic_test.py | 11 +++++++---- src/python/tests/pose_test.py | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/python/pose_format/utils/generic_test.py b/src/python/pose_format/utils/generic_test.py index 1353eb1..e1b5912 100644 --- a/src/python/pose_format/utils/generic_test.py +++ b/src/python/pose_format/utils/generic_test.py @@ -66,15 +66,18 @@ def test_pose_hide_legs(fake_poses: List[Pose]): detected_format = detect_known_pose_format(pose) if detected_format == "openpose_135": with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): - pose_hide_legs(pose) + pose = pose_hide_legs(pose) else: - pose_hide_legs(pose) + pose = pose_hide_legs(pose) new_nonzeros_count = np.count_nonzero(pose.body.data) assert orig_nonzeros_count > new_nonzeros_count assert len(pose_copy.header.components) == len(pose.header.components) for c_orig, c_copy in zip(pose.header.components, pose_copy.header.components): - assert len(c_orig.points) == len(c_copy.points) + assert len(c_orig.points) == len(c_copy.points) + # what if we remove the legs before hiding them first? It should not crash. + pose = pose_hide_legs(pose, remove=True) + pose = pose_hide_legs(pose, remove=False) @pytest.mark.parametrize("fake_poses", TEST_POSE_FORMATS, indirect=["fake_poses"]) @@ -226,7 +229,7 @@ def test_pose_remove_legs(fake_poses: List[Pose]): assert point_name in pose.header.components[component_index].points else: with pytest.raises(NotImplementedError, match="Unsupported pose header schema"): - pose_hide_legs(pose, remove=True) + pose = pose_hide_legs(pose, remove=True) diff --git a/src/python/tests/pose_test.py b/src/python/tests/pose_test.py index 17753ca..019b0d1 100644 --- a/src/python/tests/pose_test.py +++ b/src/python/tests/pose_test.py @@ -347,6 +347,23 @@ def test_pose_object_should_be_callable(self): """ assert callable(Pose) + def test_get_index(self): + pose = _get_random_pose_object_with_numpy_posebody(num_keypoints=5) + expected_index = 0 + self.assertEqual(0, pose.header.get_point_index("0", "0_a")) + for component in pose.header.components: + for point in component.points: + self.assertEqual(expected_index, pose.header.get_point_index(component.name, point)) + expected_index +=1 + + with self.assertRaises(ValueError): + pose.header.get_point_index("component that doesn't exist", "") + + with self.assertRaises(ValueError): + pose.header.get_point_index("0", "point that doesn't exist") + + + def test_pose_remove_components(self): pose = _get_random_pose_object_with_numpy_posebody(num_keypoints=5) assert pose.body.data.shape[-2] == 5 From fbcc474d2cd0340617d0c63766018ad6ed2958c1 Mon Sep 17 00:00:00 2001 From: Colin Leong <122366389+cleong110@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:39:57 -0500 Subject: [PATCH 13/13] a few style fixes for generic utils --- src/python/pose_format/utils/generic.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/python/pose_format/utils/generic.py b/src/python/pose_format/utils/generic.py index 41169fc..653d9ae 100644 --- a/src/python/pose_format/utils/generic.py +++ b/src/python/pose_format/utils/generic.py @@ -69,7 +69,7 @@ def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: If `remove` is True, the leg components are removed; otherwise, they are hidden (zeroed out). """ known_pose_format = detect_known_pose_format(pose) - + if known_pose_format == "holistic": point_names = ["KNEE", "ANKLE", "HEEL", "FOOT_INDEX", "HIP"] sides = ["LEFT", "RIGHT"] @@ -81,7 +81,8 @@ def pose_hide_legs(pose: Pose, remove: bool = False) -> Pose: elif known_pose_format == "openpose": words_to_look_for = ["Hip", "Knee", "Ankle", "BigToe", "SmallToe", "Heel"] - point_names_to_remove = [point for point in OPENPOSE_BODY_POINTS if any(word in point for word in words_to_look_for)] + point_names_to_remove = [point for point in OPENPOSE_BODY_POINTS + if any(word in point for word in words_to_look_for)] # if any of the items in point_ points_to_remove_dict = {"pose_keypoints_2d": point_names_to_remove}