No module named "hailo_rpi_common'


import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import threading
import time
from queue import Queue, Empty

from hailo_rpi_common import (
from pose_estimation_pipeline import GStreamerPoseEstimationApp
# Import your libraries up here as usual

# Inside this function is where you place the rest of your code as usual
def custom_processing_thread(pose_estimator):
    # This sleep gives enough time for the HAT to fire up and start detecting - important but not ma mandatory
    while True:
        # We can call this function to get the latest position of a specific keypoint
        position = pose_estimator.get_body_part_coordinates('left_wrist')
        # Another function but this time we input 3 different keypoints and get the angle between then
        angle = pose_estimator.calculate_body_part_angle('left_shoulder', 'left_elbow', 'left_wrist')

# The rest of the code starts here and handles the operation of the hat and all other neccesary calculations
# The hat should update all of its detection data 30 times a second.
class PoseDataManager:
    def __init__(self):
        Manages pose estimation data across threads
        Allows safe access to the latest detection data
        self.latest_detection_lock = threading.Lock()
        self.latest_detection = None
        self.latest_width = None
        self.latest_height = None
    def update_detection(self, detection, width, height):
        Update the latest detection data thread-safely
        :param detection: Hailo detection object
        :param width: Frame width
        :param height: Frame height
        with self.latest_detection_lock:
            self.latest_detection = detection
            self.latest_width = width
            self.latest_height = height
    def get_latest_detection(self):
        Retrieve the latest detection data thread-safely
        :return: Tuple of (detection, width, height) or (None, None, None)
        with self.latest_detection_lock:
            return (

class PoseEstimator:
    def __init__(self, pose_data_manager):
        Initialize PoseEstimator with a PoseDataManager
        :param pose_data_manager: Shared data management object
        self.pose_data_manager = pose_data_manager
        self.keypoints = self._get_keypoints()
    def _get_keypoints(self):
        """Get the COCO keypoints correspondence map."""
        return {
            'nose': 0,
            'left_eye': 1,
            'right_eye': 2,
            'left_ear': 3,
            'right_ear': 4,
            'left_shoulder': 5,
            'right_shoulder': 6,
            'left_elbow': 7,
            'right_elbow': 8,
            'left_wrist': 9,
            'right_wrist': 10,
            'left_hip': 11,
            'right_hip': 12,
            'left_knee': 13,
            'right_knee': 14,
            'left_ankle': 15,
            'right_ankle': 16,
    def get_body_part_coordinates(self, body_part, significant_figures=4):
        Get normalized coordinates for a specific body part from latest detection
        :param body_part: Name of the body part (e.g., 'left_eye')
        :param significant_figures: Number of decimal places to round to
        :return: Tuple of normalized (x, y) coordinates or None
        # Get latest detection
        detection, width, height = self.pose_data_manager.get_latest_detection()
        if detection is None or width is None or height is None:
            return None
        # If no landmarks, return None
        landmarks = detection.get_objects_typed(hailo.HAILO_LANDMARKS)
        if len(landmarks) == 0:
            return None
        # Get bbox and points
        bbox = detection.get_bbox()
        points = landmarks[0].get_points()
        # Get the specific keypoint
        keypoint_index = self.keypoints[body_part]
        point = points[keypoint_index]
        # Directly use the normalized coordinates from the point
        # Clamp the values between 0 and 1, then round to specified significant figures
        norm_x = round(max(0, min(1, point.x())), significant_figures)
        norm_y = round(max(0, min(1, point.y())), significant_figures)
        return (norm_x, norm_y)
    def calculate_body_part_angle(self, point_a_name, point_b_name, point_c_name):
        Calculate angle between three body parts directly by name, returning an angle in the full 0 to 360 degree range.
        :param point_a_name: First body part name (e.g., 'left_shoulder')
        :param point_b_name: Vertex body part name (e.g., 'left_elbow')
        :param point_c_name: Third body part name (e.g., 'left_wrist')
        :return: Angle in degrees or None if coordinates can't be retrieved
        # Get coordinates for each body part
        point_a = self.get_body_part_coordinates(point_a_name)
        point_b = self.get_body_part_coordinates(point_b_name)
        point_c = self.get_body_part_coordinates(point_c_name)
        # Check if any coordinates are None
        if any(point is None for point in [point_a, point_b, point_c]):
            return None
        # Convert to numpy arrays
        a = np.array(point_a)
        b = np.array(point_b)
        c = np.array(point_c)
        # Calculate vectors
        ba = a - b
        bc = c - b
        # Calculate angle using arctan2 for full 360-degree range
        angle = np.degrees(np.arctan2(np.linalg.det([ba, bc]),, bc)))
        # Ensure the angle is between 0 and 360 degrees
        if angle < 0:
            angle += 360
        return angle

class user_app_callback_class(app_callback_class):
    def __init__(self, pose_data_manager):
        Initialize with a PoseDataManager
        :param pose_data_manager: Shared data management object
        self.pose_data_manager = pose_data_manager

def app_callback(pad, info, user_data):
    # Get the GstBuffer from the probe info
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    # Get the caps from the pad
    format, width, height = get_caps_from_pad(pad)

    # Get the detections from the buffer
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    # Find the person detection
    person_detection = None
    for detection in detections:
        if detection.get_label() == "person":
            person_detection = detection

    # If a person is detected, update the shared data
    if person_detection is not None:
        user_data.pose_data_manager.update_detection(person_detection, width, height)

    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    # Create PoseDataManager first
    pose_data_manager = PoseDataManager()
    # Create an instance of the user app callback class with pose_data_manager
    user_data = user_app_callback_class(pose_data_manager)
    # Create pose estimator
    pose_estimator = PoseEstimator(pose_data_manager)
    # Start the custom processing thread
    processing_thread = threading.Thread(

    # Run the GStreamer pipeline
    app = GStreamerPoseEstimationApp(app_callback, user_data)


Traceback (most recent call last):
  File "<string>", line 13, in <module>
ModuleNotFoundError: No module named 'hailo_rpi_common'

Hi @darrin273365, welcome to the forums!

Would you be able to tell us a little more about this error you are encountering? Could you share a link to where you found this code and let us know what program you are using to edit it?

This error seems to indicate you haven’t imported the module ‘hailo_rpi_common’ correctly. If you let us know more about your setup we can help you correct import this and get your code working again!

Hey @darrin273365,

Thanks for that. We have lots of tutorials and code available on our website. Would you be able to let me know specifically where you found this code? A link to the tutorial or product page would be ideal so I can run through the setup steps to figure out what is causing this error.

The code itself is not super useful at the moment but if you could let me know:

  • What parts you are using for this project (product page links would be ideal)
  • Send a link to the tutorial you are following or at least where you have found this code

Hope this helps! :slight_smile:



It;s in 2 different folders,would that confuse the code?

thanks for getting back to me,the hailo_rpi_common is in 2 folders ,maybe the folder I was using had different code ,changed over to the other one,seems to recognise it now

Hey @darrin273365,

It’s possible that having the required files split between two folders may be causing some kind of issue here.

I’m still unsure as to what you are trying to achieve with this code so I can’t say for sure what is contributing to this problem. Would you be able to let me know:

  • What parts you are using for this project (product page links would be ideal)
  • Send a link to the tutorial you are following or at least where you have found this code

Hey @darrin273365,

This AI HAT code is used for a very specific set up and can be a little finicky. I think the code may need to be in the same folder and it also needs to be run out of the virtual environment that we set up in the guide. If you are just trying to take the code and run it separately you will run into some issues.

To get the AI hat running you will need to set everything up as outlined in the guide - including how we download this Python pipeline code and how we run it out of the command line (you will run into virtual environment issues running it out of Thonny).

If you run into some more issues with this, give us a little information about which guide you are following (either object detection or pose estimation), and the steps you have taken.

Hope this helps!

thanks I managed to get hailo working ,is there anyway of running hailo in the terminal and getting it to control the stepper motors in thonny?

Hey Darrin,

In the written guide we have some example code controlling hardware with the detection results. In that demo code, we control a servo but there is plenty of information in that section on how to implement your own custom code. If you have existing code that controls the stepper motor, you should be able to paste it in there.

In this section (the video also demonstrates it well). We open and edit the code in Thonny, then we save it and run it from the command line. Unfortunately, it is very difficult to run it from inside Thonny because of how the Hailo software handles the virtual environment.

Best of luck!



Morning have you used the global shutter camera CE09593 with the arducam ethernet kit AC-U6248
The camera works with PI 5 ,when I use the arducam ethernet extension kit it doesn’t.

from picamera2 import Picamera2

from libcamera import controls

import time

# Create Picamera2 object

picam2 = Picamera2()

# Start the camera preview


# Allow the camera to warm up



[0:00:41.463302029] [1706] INFO Camera camera_manager.cpp:325 libcamera v0.3.2+99-1230f78d

[0:00:41.472963196] [1711] INFO RPI pisp.cpp:695 libpisp version v1.0.7 28196ed6edcf 29-08-2024 (16:42:16)

[0:00:41.487755603] [1711] INFO RPI pisp.cpp:1154 Registered camera /base/axi/pcie@120000/rp1/i2c@80000/imx296@1a to CFE device /dev/media2 and ISP device /dev/media0 using PiSP variant BCM2712_C0

[0:00:41.490898085] [1706] INFO Camera camera.cpp:1197 configuring streams: (0) 640x480-XBGR8888 (1) 1456x1088-BGGR_PISP_COMP1

[0:00:41.491007418] [1711] INFO RPI pisp.cpp:1450 Sensor: /base/axi/pcie@120000/rp1/i2c@80000/imx296@1a - Selected sensor format: 1456x1088-SBGGR10_1X10 - Selected CFE format: 1456x1088-PC1B

[0:00:47.562364081] [1711] WARN V4L2 v4l2_videodevice.cpp:2127 /dev/video4[20:cap]: Dequeue timer of 1000000.00us has expired!

[0:00:47.562401081] [1711] ERROR RPI pipeline_base.cpp:1366 Camera frontend has timed out!

[0:00:47.562407563] [1711] ERROR RPI pipeline_base.cpp:1367 Please check that your camera sensor connector is attached securely.

[0:00:47.562412914] [1711] ERROR RPI pipeline_base.cpp:1368 Alternatively, try another cable and/or sensor.

Hi @darrin273365

If you haven’t already I would recommend ensuring the ethernet cable that you’re using its damaged and works as intended. Otherwise you could look at increasing the Dequeue Timer to more than 10 seconds

Hello Dan do you know how to increase dequeue timer?

Hey @darrin273365,

Changing the dequeue timer looks like a bit of an involved process and is unlikely to fix this issue given the default time should already be more than enough for a valid connection to work.

It doesn’t look like that cable extension kit you are using has been tested for that specific camera so it could be a compatibility issue. Do you have any way to test the cable kit with a camera that has confirmed compatibility?

yes it works with camera module 3 and hq states on arducam website that it is compatible but I’m starting to think it might not says to do this and it corrupted the sd card.

For Raspberry Pi Bookworm users running on Pi 5, please execute:

sudo nano /boot/firmware/config.txt 
#Find the line: camera_auto_detect=1, update it to:
#Find the line: [all], add the following item under it:
#Save and reboot.

hey @darrin273365,

Interesting behaviour. I will pull these items out of stock and replicate your project tomorrow to see if I can help you make some progress.

I will update you tomorrow afternoon!

thanks for that

Hey @darrin273365,

Sorry for the delay but I have an update for you.

I have tested your setup and can consistently get the same error when using known good components. This issue is also present when using a Pi Cam 3.

I have reached out to the supplier with this issue and will keep you updated when they get back to me.

Thanks for your patience!

