Open
Description
Have I written custom code (as opposed to using a stock example script provided in MediaPipe)
Yes
OS Platform and Distribution
macOS 14.6.1
MediaPipe Tasks SDK version
mediapipe 0.10.14
Task name (e.g. Image classification, Gesture recognition etc.)
Hand landmark detection, custom object detection from model maker
Programming Language and version (e.g. C++, Python, Java)
Python
Describe the actual behavior
Memory usage peaked at 30 GB when processing 1 minute 1080x1920@30fps video. And when runing a 4 min video memory used got to 90GB and my computer crashed
Describe the expected behaviour
Less memory used
Standalone code/steps you may have used to try to get what you need
from video_mode import HandDetector
from visualize_utils import draw_landmarks_on_image
import cv2 as cv
videoPath = input("Enter the path of the video file: ")
videoOutputPath = "outputs/sample.mp4"
csvOutputPath = "outputs/times.csv"
cap = cv.VideoCapture(videoPath) # arg: name of the video file or device index
# for saving the annotated video
fps = cap.get(cv.CAP_PROP_FPS)
width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
frame_size = (width, height)
fourcc = cv.VideoWriter_fourcc(*'mp4v')
out_vid = cv.VideoWriter(videoOutputPath, fourcc, fps, frame_size)
HandDetector = HandDetector("hand_landmarker.task")
# BoxDetector = BoxDetector("transfer_learning/exported_model/model.tflite")
# for saving the detection results
HandDetector.start_logging("outputs/hand_landmarks.csv")
# BoxDetector.start_logging("outputs/box_detections.csv")
# Start the processing loop
while cap.isOpened():
# Capture frame-by-frame
ok, frame = cap.read()
frame_idx = cap.get(cv.CAP_PROP_POS_FRAMES) # frame idx to be decoded next ... so 1-indexed
time_stamp = int(cap.get(cv.CAP_PROP_POS_MSEC))
# exit
if not ok:
print("Can't receive frame (stream end?). Exiting ...")
break
# Detect the box from the input image.
# ok, box_detection = BoxDetector.detect(frame, time_stamp)
# if ok:
# # save the detection results and visualize the bounding box
# BoxDetector.append(time_stamp, box_detection)
# output_frame = visualize_box(frame, box_detection)
# else:
# # guarantee that output_frame is defined
# output_frame = frame
# Detect hand landmarks from the input image.
ok, current_detection = HandDetector.detect(frame, time_stamp)
if ok:
# save the detection results and visualize the landmarks
HandDetector.append(time_stamp, current_detection)
output_frame = draw_landmarks_on_image(frame, current_detection)
else:
output_frame = frame
# output the annotated video
out_vid.write(output_frame)
# Display the resulting frame
# cv.imshow('frame', output_frame)
# if cv.waitKey(1) & 0xFF == ord('q'):
# break
# When everything done, release the capture
cap.release()
out_vid.release() # release the video writer
# cv.destroyAllWindows()
# close the log files
HandDetector.close_log()
# BoxDetector.close_log()
print("Detection results saved to outputs/hand_landmarks.csv and outputs/box_detections.csv")
Below the video_mode imported
import mediapipe as mp
import cv2 as cv
import pandas as pd
import csv
BaseOptions = mp.tasks.BaseOptions
HandLandmarker = mp.tasks.vision.HandLandmarker
HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions
HandLandmarkerResult = mp.tasks.vision.HandLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode
HandLandmarkerResult = mp.tasks.vision.HandLandmarkerResult
# hand detector detect 21 hand landmarks
class HandDetector:
# name of 21 hand landmarks
columnNames = ['Timestamp', 'Handedness', 'Wrist', 'Thumb_CMC', 'Thumb_MCP', 'Thumb_IP', 'Thumb_Tip', 'Index_MCP', 'Index_PIP', 'Index_DIP', 'Index_Tip', 'Middle_MCP', 'Middle_PIP', 'Middle_DIP', 'Middle_Tip', 'Ring_MCP', 'Ring_PIP', 'Ring_DIP', 'Ring_Tip', 'Pinky_MCP', 'Pinky_PIP', 'Pinky_DIP', 'Pinky_Tip']
# constructor
def __init__(self, modelPath):
# Create a hand landmarker instance with the video mode:
options = HandLandmarkerOptions(
base_options=BaseOptions(model_asset_path=modelPath, delegate=BaseOptions.Delegate.GPU),
running_mode=VisionRunningMode.VIDEO,
num_hands=1, # The maximum number of hands detected by the Hand landmark detector.
min_hand_detection_confidence=0.5, # The minimum confidence score for the hand detection to be considered successful in palm detection model.
min_hand_presence_confidence=0.5, # The minimum confidence score for the hand presence score in the hand landmark detection model. In Video mode and Live stream mode, if the hand presence confidence score from the hand landmark model is below this threshold, Hand Landmarker triggers the palm detection model. Otherwise, a lightweight hand tracking algorithm determines the location of the hand(s) for subsequent landmark detections.
min_tracking_confidence=0.5, # The minimum confidence score for the hand tracking to be considered successful. This is the bounding box IoU threshold between hands in the current frame and the last frame. In Video mode and Stream mode of Hand Landmarker, if the tracking fails, Hand Landmarker triggers hand detection. Otherwise, it skips the hand detection.
)
self.landmarker = HandLandmarker.create_from_options(options)
# public methods
def detect(self, frame, timestamp) -> tuple[bool, HandLandmarkerResult]:
# Convert the frame received from OpenCV to a MediaPipe’s Image object.
# mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
# for GPU delegate, the image format should be SRGBA for apple metal
mp_image = mp.Image(image_format=mp.ImageFormat.SRGBA, data=cv.cvtColor(frame, cv.COLOR_BGR2RGBA))
# Detect hand landmarks from the input image.
detection_result = self.landmarker.detect_for_video(mp_image, timestamp)
# bool() will return False if detection_result is None/empty
return (bool(detection_result.handedness), detection_result)
def start_logging(self, savePath):
self.dataFrame = pd.DataFrame(columns=self.columnNames)
self.savePath = savePath
def append(self, time, detection_result):
# Loop through each detected hand
rows = []
for hand_idx, hand in enumerate(detection_result.handedness):
# Initialize the row with time and handedness
row = [time, hand[0].category_name]
# Extract landmarks for this hand
landmarks = detection_result.hand_landmarks[hand_idx]
# Convert landmarks to a list of tuples (x, y, z)
landmarks_list = [(landmark.x, landmark.y, landmark.z) for landmark in landmarks]
# Combine time, handedness, and landmarks
row.extend(landmarks_list)
# Append the row to the rows list
rows.append(row)
# concatenate the rows to the dataFrame
self.dataFrame = pd.concat([self.dataFrame, pd.DataFrame(rows, columns=self.columnNames)], ignore_index=True)
def close_log(self):
print(f"Saving the dataFrame to {self.savePath}")
self.dataFrame.to_csv(self.savePath, index=False)
ObjectDetector = mp.tasks.vision.ObjectDetector
ObjectDetectorOptions = mp.tasks.vision.ObjectDetectorOptions
ObjectDetectionResult = mp.tasks.components.containers.Detection
# box detector detect front of the box and the divider in between
class BoxDetector:
# constructor
def __init__(self, modelPath):
options = ObjectDetectorOptions(
base_options=BaseOptions(model_asset_path=modelPath, delegate=BaseOptions.Delegate.GPU),
max_results=2,
running_mode=VisionRunningMode.VIDEO)
self.detector = ObjectDetector.create_from_options(options)
# public methods
def detect(self, frame, timestamp):
# Convert the frame received from OpenCV to a MediaPipe’s Image object.
mp_image = mp.Image(image_format=mp.ImageFormat.SRGBA, data=cv.cvtColor(frame, cv.COLOR_BGR2RGBA))
# Detect hand landmarks from the input image.
detection_result = self.detector.detect_for_video(mp_image, timestamp)
# bool() will return False if detection_result is None/empty
return (bool(detection_result.detections), detection_result)
def start_logging(self, savePath):
self.savePath = savePath
self.file = open(self.savePath, 'w', newline='')
self.writer = csv.writer(self.file)
self.writer.writerow(['Timestamp', 'Detection ID', 'Category Name', 'Score', 'Bounding Box Origin X', 'Bounding Box Origin Y', 'Bounding Box Width', 'Bounding Box Height'])
def append(self, time, detection_result: ObjectDetectionResult):
for idx, detection in enumerate(detection_result.detections):
for category in detection.categories:
self.writer.writerow([time, idx, category.category_name, category.score, detection.bounding_box.origin_x, detection.bounding_box.origin_y, detection.bounding_box.width, detection.bounding_box.height])
def close_log(self):
self.file.close()
print(f"Saved the data to {self.savePath}")
### Other info / Complete Logs
_No response_