Code:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import threading
import time
from queue import Queue, Empty
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from pose_estimation_pipeline import GStreamerPoseEstimationApp
# Import your libraries up here as usual
# Inside this function is where you place the rest of your code as usual
def custom_processing_thread(pose_estimator):
# This sleep gives enough time for the HAT to fire up and start detecting - important but not ma mandatory
time.sleep(2)
while True:
# We can call this function to get the latest position of a specific keypoint
position = pose_estimator.get_body_part_coordinates('left_wrist')
print(position)
# Another function but this time we input 3 different keypoints and get the angle between then
angle = pose_estimator.calculate_body_part_angle('left_shoulder', 'left_elbow', 'left_wrist')
print(angle)
time.sleep(0.1)
# The rest of the code starts here and handles the operation of the hat and all other neccesary calculations
# The hat should update all of its detection data 30 times a second.
class PoseDataManager:
def __init__(self):
"""
Manages pose estimation data across threads
Allows safe access to the latest detection data
"""
self.latest_detection_lock = threading.Lock()
self.latest_detection = None
self.latest_width = None
self.latest_height = None
def update_detection(self, detection, width, height):
"""
Update the latest detection data thread-safely
:param detection: Hailo detection object
:param width: Frame width
:param height: Frame height
"""
with self.latest_detection_lock:
self.latest_detection = detection
self.latest_width = width
self.latest_height = height
def get_latest_detection(self):
"""
Retrieve the latest detection data thread-safely
:return: Tuple of (detection, width, height) or (None, None, None)
"""
with self.latest_detection_lock:
return (
self.latest_detection,
self.latest_width,
self.latest_height
)
class PoseEstimator:
def __init__(self, pose_data_manager):
"""
Initialize PoseEstimator with a PoseDataManager
:param pose_data_manager: Shared data management object
"""
self.pose_data_manager = pose_data_manager
self.keypoints = self._get_keypoints()
def _get_keypoints(self):
"""Get the COCO keypoints correspondence map."""
return {
'nose': 0,
'left_eye': 1,
'right_eye': 2,
'left_ear': 3,
'right_ear': 4,
'left_shoulder': 5,
'right_shoulder': 6,
'left_elbow': 7,
'right_elbow': 8,
'left_wrist': 9,
'right_wrist': 10,
'left_hip': 11,
'right_hip': 12,
'left_knee': 13,
'right_knee': 14,
'left_ankle': 15,
'right_ankle': 16,
}
def get_body_part_coordinates(self, body_part, significant_figures=4):
"""
Get normalized coordinates for a specific body part from latest detection
:param body_part: Name of the body part (e.g., 'left_eye')
:param significant_figures: Number of decimal places to round to
:return: Tuple of normalized (x, y) coordinates or None
"""
# Get latest detection
detection, width, height = self.pose_data_manager.get_latest_detection()
if detection is None or width is None or height is None:
return None
# If no landmarks, return None
landmarks = detection.get_objects_typed(hailo.HAILO_LANDMARKS)
if len(landmarks) == 0:
return None
# Get bbox and points
bbox = detection.get_bbox()
points = landmarks[0].get_points()
# Get the specific keypoint
keypoint_index = self.keypoints[body_part]
point = points[keypoint_index]
# Directly use the normalized coordinates from the point
# Clamp the values between 0 and 1, then round to specified significant figures
norm_x = round(max(0, min(1, point.x())), significant_figures)
norm_y = round(max(0, min(1, point.y())), significant_figures)
return (norm_x, norm_y)
def calculate_body_part_angle(self, point_a_name, point_b_name, point_c_name):
"""
Calculate angle between three body parts directly by name, returning an angle in the full 0 to 360 degree range.
:param point_a_name: First body part name (e.g., 'left_shoulder')
:param point_b_name: Vertex body part name (e.g., 'left_elbow')
:param point_c_name: Third body part name (e.g., 'left_wrist')
:return: Angle in degrees or None if coordinates can't be retrieved
"""
# Get coordinates for each body part
point_a = self.get_body_part_coordinates(point_a_name)
point_b = self.get_body_part_coordinates(point_b_name)
point_c = self.get_body_part_coordinates(point_c_name)
# Check if any coordinates are None
if any(point is None for point in [point_a, point_b, point_c]):
return None
# Convert to numpy arrays
a = np.array(point_a)
b = np.array(point_b)
c = np.array(point_c)
# Calculate vectors
ba = a - b
bc = c - b
# Calculate angle using arctan2 for full 360-degree range
angle = np.degrees(np.arctan2(np.linalg.det([ba, bc]), np.dot(ba, bc)))
# Ensure the angle is between 0 and 360 degrees
if angle < 0:
angle += 360
return angle
class user_app_callback_class(app_callback_class):
def __init__(self, pose_data_manager):
"""
Initialize with a PoseDataManager
:param pose_data_manager: Shared data management object
"""
super().__init__()
self.pose_data_manager = pose_data_manager
def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
# Get the caps from the pad
format, width, height = get_caps_from_pad(pad)
# Get the detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# Find the person detection
person_detection = None
for detection in detections:
if detection.get_label() == "person":
person_detection = detection
break
# If a person is detected, update the shared data
if person_detection is not None:
user_data.pose_data_manager.update_detection(person_detection, width, height)
return Gst.PadProbeReturn.OK
if __name__ == "__main__":
# Create PoseDataManager first
pose_data_manager = PoseDataManager()
# Create an instance of the user app callback class with pose_data_manager
user_data = user_app_callback_class(pose_data_manager)
# Create pose estimator
pose_estimator = PoseEstimator(pose_data_manager)
# Start the custom processing thread
processing_thread = threading.Thread(
target=custom_processing_thread,
args=(pose_estimator,),
daemon=True
)
processing_thread.start()
# Run the GStreamer pipeline
app = GStreamerPoseEstimationApp(app_callback, user_data)
app.run()
Error:
>>> %Run -c $EDITOR_CONTENT
Traceback (most recent call last):
File "<string>", line 13, in <module>
ModuleNotFoundError: No module named 'hailo_rpi_common'
>>>