MediaPipe is an open-source data stream processing machine learning application development framework developed by Google. It is a graph-based data processing pipeline used to build data sources in various forms, such as video, audio, sensor data, and any time series data. MediaPipe is cross-platform and can run on embedded platforms (such as Raspberry Pi), mobile devices (iOS and Android), workstations and servers, and supports mobile GPU acceleration. MediaPipe provides cross-platform, customizable ML solutions for real-time and streaming media. The core framework of MediaPipe is implemented in C++ and provides support for languages such as Java and Objective C. The main concepts of MediaPipe include packets, streams, calculators, graphs, and subgraphs.
Features of MediaPipe:
DLIB is a modern C++ toolkit that contains machine learning algorithms and tools for creating complex software in C++ to solve real-world problems. It is widely used by industry and academia in fields such as robotics, embedded devices, mobile phones and large high-performance computing environments. The dlib library uses 68 points to mark important parts of the face, such as 18-22 points mark the right eyebrow, 51-68 marks the mouth. Use the get_frontal_face_detector module of the dlib library to detect faces, and use the shape_predictor_68_face_landmarks.dat feature data to predict facial feature values.

Start the camera
ros2 launch ascamera hp60c.launch.py
Open a new terminal and enter,
xxxxxxxxxxros2 run yahboomcar_mediapipe 04_FaceMesh

Source code location: ~/ascam_ros2_ws/src/yahboomcar_mediapipe/yahboomcar_mediapipe/04_FaceMesh.py
x#!/usr/bin/env python3# encoding: utf-8import rclpyfrom rclpy.node import Nodefrom geometry_msgs.msg import Pointfrom sensor_msgs.msg import Imageimport mediapipe as mpfrom yahboomcar_msgs.msg import PointArrayimport cv2 as cvimport numpy as npimport timefrom cv_bridge import CvBridgeprint("import done")class FaceMesh(Node): def __init__(self, name, staticMode=False, maxFaces=2, minDetectionCon=0.5, minTrackingCon=0.5): super().__init__(name) self.mpDraw = mp.solutions.drawing_utils self.mpFaceMesh = mp.solutions.face_mesh self.faceMesh = self.mpFaceMesh.FaceMesh( static_image_mode=staticMode, max_num_faces=maxFaces, min_detection_confidence=minDetectionCon, min_tracking_confidence=minTrackingCon ) # Publisher for detected points self.pub_point = self.create_publisher(PointArray, '/mediapipe/points', 1000) # Initialize CvBridge self.bridge = CvBridge() # Create a subscriber to the image topic self.create_subscription(Image, '/ascamera_hp60c/camera_publisher/rgb0/image', self.image_callback, 10) self.lmDrawSpec = mp.solutions.drawing_utils.DrawingSpec(color=(0, 0, 255), thickness=-1, circle_radius=3) self.drawSpec = self.mpDraw.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1) self.pTime = 0 self.exit_flag = False # Flag for exit condition def image_callback(self, msg): # Convert the ROS Image message to OpenCV format frame = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') # Process the frame for face mesh detection frame, img = self.pubFaceMeshPoint(frame, draw=False) # Calculate FPS cTime = time.time() fps = 1 / (cTime - self.pTime) self.pTime = cTime text = "FPS : " + str(int(fps)) # Display FPS on frame cv.putText(frame, text, (20, 30), cv.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 1) # Combine the original frame and the face mesh result dst = self.frame_combine(frame, img) # Show the combined image cv.imshow('FaceMesh Detection', dst) # Exit the program if 'q' is pressed if cv.waitKey(1) & 0xFF == ord('q'): self.exit_flag = True def pubFaceMeshPoint(self, frame, draw=True): pointArray = PointArray() img = np.zeros(frame.shape, np.uint8) imgRGB = cv.cvtColor(frame, cv.COLOR_BGR2RGB) self.results = self.faceMesh.process(imgRGB) if self.results.multi_face_landmarks: for i in range(len(self.results.multi_face_landmarks)): if draw: self.mpDraw.draw_landmarks(frame, self.results.multi_face_landmarks[i], self.mpFaceMesh.FACEMESH_CONTOURS, self.lmDrawSpec, self.drawSpec) self.mpDraw.draw_landmarks(img, self.results.multi_face_landmarks[i], self.mpFaceMesh.FACEMESH_CONTOURS, self.lmDrawSpec, self.drawSpec) for id, lm in enumerate(self.results.multi_face_landmarks[i].landmark): point = Point() point.x, point.y, point.z = lm.x, lm.y, lm.z pointArray.points.append(point) # Publish the detected points self.pub_point.publish(pointArray) return frame, img def frame_combine(self, frame, src): if len(frame.shape) == 3: frameH, frameW = frame.shape[:2] srcH, srcW = src.shape[:2] dst = np.zeros((max(frameH, srcH), frameW + srcW, 3), np.uint8) dst[:, :frameW] = frame[:, :] dst[:, frameW:] = src[:, :] else: src = cv.cvtColor(src, cv.COLOR_BGR2GRAY) frameH, frameW = frame.shape[:2] imgH, imgW = src.shape[:2] dst = np.zeros((frameH, frameW + imgW), np.uint8) dst[:, :frameW] = frame[:, :] dst[:, frameW:] = src[:, :] return dst def run(self): # Custom loop for handling ROS 2 callback and OpenCV events while rclpy.ok() and not self.exit_flag: rclpy.spin_once(self) # Process one callback if self.exit_flag: break cv.destroyAllWindows()def main(): print("start it") rclpy.init() face_mesh = FaceMesh('face_mesh') try: face_mesh.run() except KeyboardInterrupt: pass finally: face_mesh.destroy_node() rclpy.shutdown()if __name__ == '__main__': main()