Skip to content

WIP: Pointing Gesture Recognition (and Gesture Classification) #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
cmake_minimum_required(VERSION 2.8.3)
project(mdr_recognize_gesture_action)

find_package(catkin REQUIRED COMPONENTS
rospy
roslint
actionlib
actionlib_msgs
genmsg
message_generation
geometry_msgs
)

roslint_python()
catkin_python_setup()

add_action_files(DIRECTORY ros/action
FILES
RecognizeGesture.action
)

generate_messages(
DEPENDENCIES
actionlib_msgs
geometry_msgs
)

catkin_package(
CATKIN_DEPENDS
rospy
actionlib
actionlib_msgs
message_runtime
geometry_msgs
)

include_directories(
${catkin_INCLUDE_DIRS}
)

install(PROGRAMS
ros/scripts/recognize_gesture_action
ros/scripts/recognize_gesture_action_client_test
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}/scripts
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# mdr_recognize_gesture_action

*(Work In Progress)*

An action for recognizing bodily gestures, which involves two stages:
* Gesture classification: classifying an observed gesture acccording to a set of known gesture examples
* Gesture processing: reasoning about the classified gesture; for example, determining the object being pointed to after observing a pointing gesture.

The gesture classification procedure is inspired by the approach presented [in this paper](https://arxiv.org/abs/1906.12171). As a person performs an example of a gesture, the 3D positions of their joints (or keypoints) are captured from RGB camera frames using the `openpose` library. The resulting pose array data is then pre-processed and compared to a set of known (labeled) gesture examples, and classified according to the class of the most similar example (1NN classifier). The similarity of the observed gesture to all examples is determined using the Dynamic Time Warping (DTW) algorithm (provided by the [fastdtw](https://github.com/slaypni/fastdtw) library). Note that the input to the classifier can be either a camera stream or a video file.

The gesture processing step depends on the classified procedure. Currently, processing of only the pointing gesture is considered, and it involves inferring the pointing direction and using the output of an object classifier (*WIP*) to find the class of the object being pointed to. This relies on a simple projection of the line connecting the elbow and wrist joints, and determining the bounding box being intersected by the line (with simple heuristic rules for cases of multiple hypotheses).

### Scripts:
* ``save_gesture_video``: saves a video of a gesture of a given duration
* ``save_gesture_pose_data``: extracts and saves a pose array of a gesture from a camera or video file; pose arrays are of shape *(num_timesteps, num_keypoints, num_spatial_dims)*, and are saved in `npy` format
* ``gesture_classifier_demo``: runs the gesture classifier on data from a camera or video file, and prints the inferred class
* ``pointing_gesture_demo``: demonstrates the inference of pointing direction by highlighting the bounding box of the object being pointed to got a camera stream.

## Action definition

### Goal:

* ``string gesture_type``: Optional type of gesture; passing this in bypasses the gesture classification step


### Result:

* ``bool success``

### Feedback:

* ``string current_state``
* ``string message``

## Launch file parameters

### Action server
* ``gesture_type``: The type of gesture (default: 'pointing')

## Action execution summary

TODO


## Dependencies

* ``numpy``
* ``openpose``
* ``cv2``
* ``fastdtw``
* ``pytorch``
* ``pyftsm``
* ``actionlib``
* ``geometry_msgs``
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#!/usr/bin/env python

from __future__ import print_function
import os
import time
import copy

import cv2
import numpy as np

from fastdtw import fastdtw
from openpose import pyopenpose as op
from scipy.spatial.distance import euclidean
from scipy.ndimage import median_filter, gaussian_filter1d


class GestureClassifier(object):
def __init__(self, gesture_distance_threshold=None,
model_path='/home/lucy/.models/openpose_models/',
gesture_types=['waving', 'nodding', 'shaking_head', 'go_away', 'come_closer', 'pointing'],
known_example_data_dir='data/gesture_examples/',
debug=False):
self.gesture_distance_threshold = gesture_distance_threshold
self.known_example_data_dir = known_example_data_dir
self.debug = debug

self.gesture_example_data_dict = dict(zip(gesture_types, [[] for _ in range(len(gesture_types))]))
self.gesture_example_pc_indices_dict = copy.deepcopy(self.gesture_example_data_dict) # pc = principal component i.e. most varying component
self.gesture_distances_dict = copy.deepcopy(self.gesture_example_data_dict)
self.video_device = None

self._load_known_example_data()

self.opWrapper = op.WrapperPython()
self.opWrapper.configure({"model_folder" : model_path})
self.opWrapper.start()

def capture_gesture_data(self, device_id=0, capture_duration=5., save_data=False, filename=None):
try:
self.video_device = cv2.VideoCapture(int(device_id))
except ValueError:
self.video_device = cv2.VideoCapture(device_id)

if self.video_device.isOpened():
print('Successfully opened video device: {}'.format(device_id))
else:
print('Could not open video device!')

end_time = time.time() + capture_duration
full_poses = []

print('Starting gesture recording...')
while(time.time() < end_time):
ret, frame = self.video_device.read()

datum = op.Datum()
datum.cvInputData = frame
self.opWrapper.emplaceAndPop([datum])

if datum.poseKeypoints.shape == ():
if self.debug:
print('Body probably too close to adequately estimate pose.'
'Please move away from the camera.')

else:
if datum.poseKeypoints.shape[0] > 1:
datum.poseKeypoints = datum.poseKeypoints[0, :, :][np.newaxis]
# Note: the following still includes hip joints; this will be removed soon:
full_poses.append(datum.poseKeypoints[:, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 12, 15, 16, 17, 18], :])

print('Finished recording.')
self.video_device.release()

full_pose_array = np.vstack(full_poses)
if save_data:
print('Saving recorded pose data to npy binary files...')
if filename is None:
filename = 'saved_gesture_pose_data'
np.save(os.path.join('data/gesture_examples/', filename), full_pose_array)

return full_pose_array

def classify_gesture(self, sample_pose_array):
sample_pose_array = np.delete(sample_pose_array, [8, 9, 10], axis=1)
sample_pose_array = self._preprocess_data(sample_pose_array)
sample_most_varying_indices = self._get_n_most_varying_dim_indices(sample_pose_array)

most_likely_class, min_distance = self._get_nearest_neighbour_class(sample_pose_array, sample_most_varying_indices)

if self.debug:
print(self.gesture_distances_dict)

if self.gesture_distance_threshold is not None:
if min_distance < self.gesture_distance_threshold:
return most_likely_class
else:
return None
else:
return most_likely_class

def _load_known_example_data(self):
filenames = [filename for filename in os.listdir(self.known_example_data_dir) if filename.endswith('npy')]

for filename in filenames:
for gesture_type in self.gesture_example_data_dict.keys():
if gesture_type in filename:
break
else:
print('WARNING: Loaded file does not match any known gesture type!!')
pose_array = self._preprocess_data(np.load(os.path.join(self.known_example_data_dir, filename)))
pose_array = np.delete(pose_array, [8, 9, 10], axis=1)
self.gesture_example_data_dict[gesture_type].append(pose_array)
self.gesture_example_pc_indices_dict[gesture_type].append(self._get_n_most_varying_dim_indices(pose_array))

def _get_nearest_neighbour_class(self, sample_data, sample_most_varying_indices):
min_distance = np.inf
most_likely_class = None

for gesture_type in self.gesture_example_data_dict.keys():
for example_num in range(len(self.gesture_example_data_dict[gesture_type])):
distance = self._get_gesture_distance(sample_data,
self.gesture_example_data_dict[gesture_type][example_num],
sample_most_varying_indices,
self.gesture_example_pc_indices_dict[gesture_type][example_num])
self.gesture_distances_dict[gesture_type].append(distance)

if distance < min_distance:
min_distance = distance
most_likely_class = gesture_type

return most_likely_class, min_distance

def _perform_shoulder_scaling(self, pose_array):
abs_shoulder_distances = np.linalg.norm(pose_array[:, 2, :] - pose_array[:, 5, :], axis=1)
return pose_array / abs_shoulder_distances[np.newaxis][np.newaxis].T

def _perform_neck_scaling(self, pose_array):
neck_array = pose_array[:, 1, :].reshape(pose_array[:, 1, :].shape[0], 1, pose_array[:, 1, :].shape[1])
return pose_array - neck_array

def _normalize_mean(self, pose_array):
return pose_array - np.mean(pose_array, axis=0)[np.newaxis]

def _preprocess_data(self, pose_array):
pose_array = self._perform_neck_scaling(pose_array)
pose_array = self._perform_shoulder_scaling(pose_array)
pose_array = median_filter(pose_array, size=2)
pose_array = self._normalize_mean(pose_array)
pose_array = gaussian_filter1d(pose_array, axis=0, sigma=1)

return pose_array

def _get_n_most_varying_dim_indices(self, pose_array, n=5):
list_of_indices = []
for spatial_dim in range(pose_array.shape[2]):
most_varying_indices = np.flip(np.argsort(np.var(pose_array, axis=0)[:, spatial_dim]))[:n]
list_of_indices.append(set(most_varying_indices))

return list_of_indices

def _get_gesture_distance(self, pose_array_1, pose_array_2, array_1_indices, array_2_indices):
distances = []
for dim in range(3):
dist, _ = fastdtw(pose_array_1[:, list(array_1_indices[dim] | array_2_indices[dim]), dim],
pose_array_2[:, list(array_1_indices[dim] | array_2_indices[dim]), dim],
dist=euclidean)
distances.append(dist)

return np.linalg.norm(distances)
Loading