No module named "hailo_rpi_common'

Code:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import threading
import time
from queue import Queue, Empty

from hailo_rpi_common import (
    get_caps_from_pad,
    get_numpy_from_buffer,
    app_callback_class,
)
from pose_estimation_pipeline import GStreamerPoseEstimationApp
# Import your libraries up here as usual


# Inside this function is where you place the rest of your code as usual
def custom_processing_thread(pose_estimator):
    # This sleep gives enough time for the HAT to fire up and start detecting - important but not ma mandatory
    time.sleep(2)
    
    while True:
        # We can call this function to get the latest position of a specific keypoint
        position = pose_estimator.get_body_part_coordinates('left_wrist')
        print(position)
       
        # Another function but this time we input 3 different keypoints and get the angle between then
        angle = pose_estimator.calculate_body_part_angle('left_shoulder', 'left_elbow', 'left_wrist')
        print(angle)
        
        time.sleep(0.1) 


# The rest of the code starts here and handles the operation of the hat and all other neccesary calculations
# The hat should update all of its detection data 30 times a second.
class PoseDataManager:
    def __init__(self):
        """
        Manages pose estimation data across threads
        Allows safe access to the latest detection data
        """
        self.latest_detection_lock = threading.Lock()
        self.latest_detection = None
        self.latest_width = None
        self.latest_height = None
    
    def update_detection(self, detection, width, height):
        """
        Update the latest detection data thread-safely
        
        :param detection: Hailo detection object
        :param width: Frame width
        :param height: Frame height
        """
        with self.latest_detection_lock:
            self.latest_detection = detection
            self.latest_width = width
            self.latest_height = height
    
    def get_latest_detection(self):
        """
        Retrieve the latest detection data thread-safely
        
        :return: Tuple of (detection, width, height) or (None, None, None)
        """
        with self.latest_detection_lock:
            return (
                self.latest_detection, 
                self.latest_width, 
                self.latest_height
            )

class PoseEstimator:
    def __init__(self, pose_data_manager):
        """
        Initialize PoseEstimator with a PoseDataManager
        
        :param pose_data_manager: Shared data management object
        """
        self.pose_data_manager = pose_data_manager
        self.keypoints = self._get_keypoints()
    
    def _get_keypoints(self):
        """Get the COCO keypoints correspondence map."""
        return {
            'nose': 0,
            'left_eye': 1,
            'right_eye': 2,
            'left_ear': 3,
            'right_ear': 4,
            'left_shoulder': 5,
            'right_shoulder': 6,
            'left_elbow': 7,
            'right_elbow': 8,
            'left_wrist': 9,
            'right_wrist': 10,
            'left_hip': 11,
            'right_hip': 12,
            'left_knee': 13,
            'right_knee': 14,
            'left_ankle': 15,
            'right_ankle': 16,
        }
        
    def get_body_part_coordinates(self, body_part, significant_figures=4):
        """
        Get normalized coordinates for a specific body part from latest detection
        
        :param body_part: Name of the body part (e.g., 'left_eye')
        :param significant_figures: Number of decimal places to round to
        :return: Tuple of normalized (x, y) coordinates or None
        """
        # Get latest detection
        detection, width, height = self.pose_data_manager.get_latest_detection()
        
        if detection is None or width is None or height is None:
            return None
        
        # If no landmarks, return None
        landmarks = detection.get_objects_typed(hailo.HAILO_LANDMARKS)
        if len(landmarks) == 0:
            return None
        
        # Get bbox and points
        bbox = detection.get_bbox()
        points = landmarks[0].get_points()
        
        # Get the specific keypoint
        keypoint_index = self.keypoints[body_part]
        point = points[keypoint_index]
        
        # Directly use the normalized coordinates from the point
        # Clamp the values between 0 and 1, then round to specified significant figures
        norm_x = round(max(0, min(1, point.x())), significant_figures)
        norm_y = round(max(0, min(1, point.y())), significant_figures)
        
        return (norm_x, norm_y)
        
    def calculate_body_part_angle(self, point_a_name, point_b_name, point_c_name):
        """
        Calculate angle between three body parts directly by name, returning an angle in the full 0 to 360 degree range.
        
        :param point_a_name: First body part name (e.g., 'left_shoulder')
        :param point_b_name: Vertex body part name (e.g., 'left_elbow')
        :param point_c_name: Third body part name (e.g., 'left_wrist')
        :return: Angle in degrees or None if coordinates can't be retrieved
        """
        # Get coordinates for each body part
        point_a = self.get_body_part_coordinates(point_a_name)
        point_b = self.get_body_part_coordinates(point_b_name)
        point_c = self.get_body_part_coordinates(point_c_name)
        
        # Check if any coordinates are None
        if any(point is None for point in [point_a, point_b, point_c]):
            return None
        
        # Convert to numpy arrays
        a = np.array(point_a)
        b = np.array(point_b)
        c = np.array(point_c)
        
        # Calculate vectors
        ba = a - b
        bc = c - b
        
        # Calculate angle using arctan2 for full 360-degree range
        angle = np.degrees(np.arctan2(np.linalg.det([ba, bc]), np.dot(ba, bc)))
        
        # Ensure the angle is between 0 and 360 degrees
        if angle < 0:
            angle += 360
            
        return angle


class user_app_callback_class(app_callback_class):
    def __init__(self, pose_data_manager):
        """
        Initialize with a PoseDataManager
        
        :param pose_data_manager: Shared data management object
        """
        super().__init__()
        self.pose_data_manager = pose_data_manager

def app_callback(pad, info, user_data):
    # Get the GstBuffer from the probe info
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    # Get the caps from the pad
    format, width, height = get_caps_from_pad(pad)

    # Get the detections from the buffer
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    # Find the person detection
    person_detection = None
    for detection in detections:
        if detection.get_label() == "person":
            person_detection = detection
            break

    # If a person is detected, update the shared data
    if person_detection is not None:
        user_data.pose_data_manager.update_detection(person_detection, width, height)

    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    # Create PoseDataManager first
    pose_data_manager = PoseDataManager()
    
    # Create an instance of the user app callback class with pose_data_manager
    user_data = user_app_callback_class(pose_data_manager)
    
    # Create pose estimator
    pose_estimator = PoseEstimator(pose_data_manager)
    
    # Start the custom processing thread
    processing_thread = threading.Thread(
        target=custom_processing_thread, 
        args=(pose_estimator,), 
        daemon=True
    )
    processing_thread.start()

    # Run the GStreamer pipeline
    app = GStreamerPoseEstimationApp(app_callback, user_data)
    app.run()

Error:

>>> %Run -c $EDITOR_CONTENT
Traceback (most recent call last):
  File "<string>", line 13, in <module>
ModuleNotFoundError: No module named 'hailo_rpi_common'
>>>

Hi @darrin273365, welcome to the forums!

Would you be able to tell us a little more about this error you are encountering? Could you share a link to where you found this code and let us know what program you are using to edit it?

This error seems to indicate you haven’t imported the module ‘hailo_rpi_common’ correctly. If you let us know more about your setup we can help you correct import this and get your code working again!

Hello Samuel this is the code I downloaded off your website

import sys

import gi

gi.require_version('Gst', '1.0')

from gi.repository import Gst, GLib, GObject

import os

import argparse

import multiprocessing

import numpy as np

import setproctitle

import cv2

import time

import signal

import subprocess

# Try to import hailo python module

try:

import hailo

except ImportError:

sys.exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# -----------------------------------------------------------------------------------------------

# User-defined class to be used in the callback function

# -----------------------------------------------------------------------------------------------

# A sample class to be used in the callback function

# This example allows to:

# 1. Count the number of frames

# 2. Setup a multiprocessing queue to pass the frame to the main thread

# Additional variables and functions can be added to this class as needed

class app_callback_class:

def __init__(self):

self.frame_count = 0

self.use_frame = False

self.frame_queue = multiprocessing.Queue(maxsize=3)

self.running = True

def increment(self):

self.frame_count += 1

def get_count(self):

return self.frame_count

def set_frame(self, frame):

if not self.frame_queue.full():

self.frame_queue.put(frame)

def get_frame(self):

if not self.frame_queue.empty():

return self.frame_queue.get()

else:

return None

def dummy_callback(pad, info, user_data):

"""

A minimal dummy callback function that returns immediately.

Args:

pad: The GStreamer pad

info: The probe info

user_data: User-defined data passed to the callback

Returns:

Gst.PadProbeReturn.OK

"""

return Gst.PadProbeReturn.OK

# -----------------------------------------------------------------------------------------------

# Common functions

# -----------------------------------------------------------------------------------------------

def detect_hailo_arch():

try:

# Run the hailortcli command to get device information

result = subprocess.run(['hailortcli', 'fw-control', 'identify'], capture_output=True, text=True)

# Check if the command was successful

if result.returncode != 0:

print(f"Error running hailortcli: {result.stderr}")

return None

# Search for the "Device Architecture" line in the output

for line in result.stdout.split('\n'):

if "Device Architecture" in line:

if "HAILO8L" in line:

return "hailo8l"

elif "HAILO8" in line:

return "hailo8"

print("Could not determine Hailo architecture from device information.")

return None

except Exception as e:

print(f"An error occurred while detecting Hailo architecture: {e}")

return None

def get_caps_from_pad(pad: Gst.Pad):

caps = pad.get_current_caps()

if caps:

# We can now extract information from the caps

structure = caps.get_structure(0)

if structure:

# Extracting some common properties

format = structure.get_value('format')

width = structure.get_value('width')

height = structure.get_value('height')

return format, width, height

else:

return None, None, None

# This function is used to display the user data frame

def display_user_data_frame(user_data: app_callback_class):

while user_data.running:

frame = user_data.get_frame()

if frame is not None:

cv2.imshow("User Frame", frame)

cv2.waitKey(1)

cv2.destroyAllWindows()

def get_default_parser():

parser = argparse.ArgumentParser(description="Hailo App Help")

current_path = os.path.dirname(os.path.abspath(__file__))

default_video_source = os.path.join(current_path, '../resources/detection0.mp4')

parser.add_argument(

"--input", "-i", type=str, default=default_video_source,

help="Input source. Can be a file, USB or RPi camera (CSI camera module). \

For RPi camera use '-i rpi' (Still in Beta). \

Defaults to example video resources/detection0.mp4"

)

parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")

parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")

parser.add_argument(

"--arch",

default=None,

choices=['hailo8', 'hailo8l'],

help="Specify the Hailo architecture (hailo8 or hailo8l). Default is None , app will run check.",

)

parser.add_argument(

"--hef-path",

default=None,

help="Path to HEF file",

)

parser.add_argument(

"--disable-sync", action="store_true",

help="Disables display sink sync, will run as fast as possible. Relevant when using file source."

)

parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")

return parser

it doesn’t recognize hailo_rpi_common

import gi

gi.require_version('Gst', '1.0')

from gi.repository import Gst, GLib

import os

import numpy as np

import cv2

import hailo

import threading

import time

from queue import Queue, Empty

from hailo_rpi_common import (

get_caps_from_pad,

get_numpy_from_buffer,

app_callback_class,

)

from pose_estimation_pipeline import GStreamerPoseEstimationApp

# Import your libraries up here as usual

# Inside this function is where you place the rest of your code as usual

def custom_processing_thread(pose_estimator):

# This sleep gives enough time for the HAT to fire up and start detecting - important but not ma mandatory

time.sleep(2)

while True:

# We can call this function to get the latest position of a specific keypoint

position = pose_estimator.get_body_part_coordinates('left_wrist')

print(position)

# Another function but this time we input 3 different keypoints and get the angle between then

angle = pose_estimator.calculate_body_part_angle('left_shoulder', 'left_elbow', 'left_wrist')

print(angle)

time.sleep(0.1)

# The rest of the code starts here and handles the operation of the hat and all other neccesary calculations

# The hat should update all of its detection data 30 times a second.

class PoseDataManager:

def __init__(self):

"""

Manages pose estimation data across threads

Allows safe access to the latest detection data

"""

self.latest_detection_lock = threading.Lock()

self.latest_detection = None

self.latest_width = None

self.latest_height = None

def update_detection(self, detection, width, height):

"""

Update the latest detection data thread-safely

:param detection: Hailo detection object

:param width: Frame width

:param height: Frame height

"""

with self.latest_detection_lock:

self.latest_detection = detection

self.latest_width = width

self.latest_height = height

def get_latest_detection(self):

"""

Retrieve the latest detection data thread-safely

:return: Tuple of (detection, width, height) or (None, None, None)

"""

with self.latest_detection_lock:

return (

self.latest_detection,

self.latest_width,

self.latest_height

)

class PoseEstimator:

def __init__(self, pose_data_manager):

"""

Initialize PoseEstimator with a PoseDataManager

:param pose_data_manager: Shared data management object

"""

self.pose_data_manager = pose_data_manager

self.keypoints = self._get_keypoints()

def _get_keypoints(self):

"""Get the COCO keypoints correspondence map."""

return {

'nose': 0,

'left_eye': 1,

'right_eye': 2,

'left_ear': 3,

'right_ear': 4,

'left_shoulder': 5,

'right_shoulder': 6,

'left_elbow': 7,

'right_elbow': 8,

'left_wrist': 9,

'right_wrist': 10,

'left_hip': 11,

'right_hip': 12,

'left_knee': 13,

'right_knee': 14,

'left_ankle': 15,

'right_ankle': 16,

}

def get_body_part_coordinates(self, body_part, significant_figures=4):

"""

Get normalized coordinates for a specific body part from latest detection

:param body_part: Name of the body part (e.g., 'left_eye')

:param significant_figures: Number of decimal places to round to

:return: Tuple of normalized (x, y) coordinates or None

"""

# Get latest detection

detection, width, height = self.pose_data_manager.get_latest_detection()

if detection is None or width is None or height is None:

return None

# If no landmarks, return None

landmarks = detection.get_objects_typed(hailo.HAILO_LANDMARKS)

if len(landmarks) == 0:

return None

# Get bbox and points

bbox = detection.get_bbox()

points = landmarks[0].get_points()

# Get the specific keypoint

keypoint_index = self.keypoints[body_part]

point = points[keypoint_index]

# Directly use the normalized coordinates from the point

# Clamp the values between 0 and 1, then round to specified significant figures

norm_x = round(max(0, min(1, point.x())), significant_figures)

norm_y = round(max(0, min(1, point.y())), significant_figures)

return (norm_x, norm_y)

def calculate_body_part_angle(self, point_a_name, point_b_name, point_c_name):

"""

Calculate angle between three body parts directly by name, returning an angle in the full 0 to 360 degree range.

:param point_a_name: First body part name (e.g., 'left_shoulder')

:param point_b_name: Vertex body part name (e.g., 'left_elbow')

:param point_c_name: Third body part name (e.g., 'left_wrist')

:return: Angle in degrees or None if coordinates can't be retrieved

"""

# Get coordinates for each body part

point_a = self.get_body_part_coordinates(point_a_name)

point_b = self.get_body_part_coordinates(point_b_name)

point_c = self.get_body_part_coordinates(point_c_name)

# Check if any coordinates are None

if any(point is None for point in [point_a, point_b, point_c]):

return None

# Convert to numpy arrays

a = np.array(point_a)

b = np.array(point_b)

c = np.array(point_c)

# Calculate vectors

ba = a - b

bc = c - b

# Calculate angle using arctan2 for full 360-degree range

angle = np.degrees(np.arctan2(np.linalg.det([ba, bc]), np.dot(ba, bc)))

# Ensure the angle is between 0 and 360 degrees

if angle < 0:

angle += 360

return angle

class user_app_callback_class(app_callback_class):

def __init__(self, pose_data_manager):

"""

Initialize with a PoseDataManager

:param pose_data_manager: Shared data management object

"""

super().__init__()

self.pose_data_manager = pose_data_manager

def app_callback(pad, info, user_data):

# Get the GstBuffer from the probe info

buffer = info.get_buffer()

if buffer is None:

return Gst.PadProbeReturn.OK

# Get the caps from the pad

format, width, height = get_caps_from_pad(pad)

# Get the detections from the buffer

roi = hailo.get_roi_from_buffer(buffer)

detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

# Find the person detection

person_detection = None

for detection in detections:

if detection.get_label() == "person":

person_detection = detection

break

# If a person is detected, update the shared data

if person_detection is not None:

user_data.pose_data_manager.update_detection(person_detection, width, height)

return Gst.PadProbeReturn.OK

if __name__ == "__main__":

# Create PoseDataManager first

pose_data_manager = PoseDataManager()

# Create an instance of the user app callback class with pose_data_manager

user_data = user_app_callback_class(pose_data_manager)

# Create pose estimator

pose_estimator = PoseEstimator(pose_data_manager)

# Start the custom processing thread

processing_thread = threading.Thread(

target=custom_processing_thread,

args=(pose_estimator,),

daemon=True

)

processing_thread.start()

# Run the GStreamer pipeline

app = GStreamerPoseEstimationApp(app_callback, user_data)

app.run()

>>> %Run -c $EDITOR_CONTENT

Traceback (most recent call last):

File "<string>", line 13, in <module>

ModuleNotFoundError: No module named 'hailo_rpi_common'

Hey @darrin273365,

Thanks for that. We have lots of tutorials and code available on our website. Would you be able to let me know specifically where you found this code? A link to the tutorial or product page would be ideal so I can run through the setup steps to figure out what is causing this error.

The code itself is not super useful at the moment but if you could let me know:

  • What parts you are using for this project (product page links would be ideal)
  • Send a link to the tutorial you are following or at least where you have found this code

Hope this helps! :slight_smile:

/home/dw/hailo-rpi5-examples/hailo-rpi5-examples/basic_pipelines

/home/dw/hailo-rpi5-examples/basic_pipelines

It;s in 2 different folders,would that confuse the code?

thanks for getting back to me,the hailo_rpi_common is in 2 folders ,maybe the folder I was using had different code ,changed over to the other one,seems to recognise it now

Hey @darrin273365,

It’s possible that having the required files split between two folders may be causing some kind of issue here.

I’m still unsure as to what you are trying to achieve with this code so I can’t say for sure what is contributing to this problem. Would you be able to let me know:

  • What parts you are using for this project (product page links would be ideal)
  • Send a link to the tutorial you are following or at least where you have found this code

Hey @darrin273365,

This AI HAT code is used for a very specific set up and can be a little finicky. I think the code may need to be in the same folder and it also needs to be run out of the virtual environment that we set up in the guide. If you are just trying to take the code and run it separately you will run into some issues.

To get the AI hat running you will need to set everything up as outlined in the guide - including how we download this Python pipeline code and how we run it out of the command line (you will run into virtual environment issues running it out of Thonny).

If you run into some more issues with this, give us a little information about which guide you are following (either object detection or pose estimation), and the steps you have taken.

Hope this helps!

1 Like

thanks I managed to get hailo working ,is there anyway of running hailo in the terminal and getting it to control the stepper motors in thonny?

Hey Darrin,

In the written guide we have some example code controlling hardware with the detection results. In that demo code, we control a servo but there is plenty of information in that section on how to implement your own custom code. If you have existing code that controls the stepper motor, you should be able to paste it in there.

In this section (the video also demonstrates it well). We open and edit the code in Thonny, then we save it and run it from the command line. Unfortunately, it is very difficult to run it from inside Thonny because of how the Hailo software handles the virtual environment.

Best of luck!

thanks

3 Likes

Morning have you used the global shutter camera CE09593 with the arducam ethernet kit AC-U6248
The camera works with PI 5 ,when I use the arducam ethernet extension kit it doesn’t.

from picamera2 import Picamera2

from libcamera import controls

import time

# Create Picamera2 object

picam2 = Picamera2()

# Start the camera preview

picam2.start(show_preview=True)

# Allow the camera to warm up

time.sleep(2)


%Run goodcam.py

[0:00:41.463302029] [1706] INFO Camera camera_manager.cpp:325 libcamera v0.3.2+99-1230f78d

[0:00:41.472963196] [1711] INFO RPI pisp.cpp:695 libpisp version v1.0.7 28196ed6edcf 29-08-2024 (16:42:16)

[0:00:41.487755603] [1711] INFO RPI pisp.cpp:1154 Registered camera /base/axi/pcie@120000/rp1/i2c@80000/imx296@1a to CFE device /dev/media2 and ISP device /dev/media0 using PiSP variant BCM2712_C0

[0:00:41.490898085] [1706] INFO Camera camera.cpp:1197 configuring streams: (0) 640x480-XBGR8888 (1) 1456x1088-BGGR_PISP_COMP1

[0:00:41.491007418] [1711] INFO RPI pisp.cpp:1450 Sensor: /base/axi/pcie@120000/rp1/i2c@80000/imx296@1a - Selected sensor format: 1456x1088-SBGGR10_1X10 - Selected CFE format: 1456x1088-PC1B

[0:00:47.562364081] [1711] WARN V4L2 v4l2_videodevice.cpp:2127 /dev/video4[20:cap]: Dequeue timer of 1000000.00us has expired!

[0:00:47.562401081] [1711] ERROR RPI pipeline_base.cpp:1366 Camera frontend has timed out!

[0:00:47.562407563] [1711] ERROR RPI pipeline_base.cpp:1367 Please check that your camera sensor connector is attached securely.

[0:00:47.562412914] [1711] ERROR RPI pipeline_base.cpp:1368 Alternatively, try another cable and/or sensor.

Hi @darrin273365

If you haven’t already I would recommend ensuring the ethernet cable that you’re using its damaged and works as intended. Otherwise you could look at increasing the Dequeue Timer to more than 10 seconds

Hello Dan do you know how to increase dequeue timer?

Hey @darrin273365,

Changing the dequeue timer looks like a bit of an involved process and is unlikely to fix this issue given the default time should already be more than enough for a valid connection to work.

It doesn’t look like that cable extension kit you are using has been tested for that specific camera so it could be a compatibility issue. Do you have any way to test the cable kit with a camera that has confirmed compatibility?

yes it works with camera module 3 and hq camera.it states on arducam website that it is compatible but I’m starting to think it might not be.it says to do this and it corrupted the sd card.

For Raspberry Pi Bookworm users running on Pi 5, please execute:

sudo nano /boot/firmware/config.txt 
#Find the line: camera_auto_detect=1, update it to:
camera_auto_detect=0
#Find the line: [all], add the following item under it:
dtoverlay=imx296
#Save and reboot.

hey @darrin273365,

Interesting behaviour. I will pull these items out of stock and replicate your project tomorrow to see if I can help you make some progress.

I will update you tomorrow afternoon!

thanks for that

1 Like

Hey @darrin273365,

Sorry for the delay but I have an update for you.

I have tested your setup and can consistently get the same error when using known good components. This issue is also present when using a Pi Cam 3.

I have reached out to the supplier with this issue and will keep you updated when they get back to me.

Thanks for your patience!

1 Like