MediaPipe is an open-source data stream processing machine learning application development framework developed by Google. It is a graph-based data processing pipeline used to build data sources in various forms, such as video, audio, sensor data, and any time series data. MediaPipe is cross-platform and can run on embedded platforms (such as Raspberry Pi), mobile devices (iOS and Android), workstations and servers, and supports mobile GPU acceleration. MediaPipe provides cross-platform, customizable ML solutions for real-time and streaming media.
The core framework of MediaPipe is implemented in C++ and provides support for languages such as Java and Objective C. The main concepts of MediaPipe include packets, streams, calculators, graphs, and subgraphs.
Features of MediaPipe:
MediaPipe Pose is an ML solution for high-fidelity body pose tracking, leveraging the BlazePose research to infer 33 3D coordinates and full-body background segmentation masks from RGB video frames, which also powers the ML Kit pose detection API.
The landmark model in MediaPipe Pose predicts the locations of the 33 pose coordinates (see the figure below).

Start the camera
roslaunch ascamera hp60c.launch
Terminal input,
xxxxxxxxxxroslaunch yahboomcar_mediapipe 02_PoseDetector.launch

Source code location: /home/yahboom/ascam_ws/src/yahboomcar_mediapipe/scripts/02_PoseDetector.py
x#!/usr/bin/env python3# encoding: utf-8import timeimport rospyimport cv2 as cvimport numpy as npimport mediapipe as mpfrom geometry_msgs.msg import Pointfrom yahboomcar_msgs.msg import PointArrayfrom sensor_msgs.msg import Imagefrom cv_bridge import CvBridgeclass PoseDetector: def __init__(self, mode=False, smooth=True, detectionCon=0.5, trackCon=0.5): self.mpPose = mp.solutions.pose self.mpDraw = mp.solutions.drawing_utils self.pose = self.mpPose.Pose( static_image_mode=mode, smooth_landmarks=smooth, min_detection_confidence=detectionCon, min_tracking_confidence=trackCon ) self.pub_point = rospy.Publisher('/mediapipe/points', PointArray, queue_size=1000) self.lmDrawSpec = mp.solutions.drawing_utils.DrawingSpec(color=(0, 0, 255), thickness=-1, circle_radius=6) self.drawSpec = mp.solutions.drawing_utils.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2) self.bridge = CvBridge() self.sub_image = rospy.Subscriber('/ascamera_hp60c/rgb0/image', Image, self.image_callback, queue_size=1) self.pTime = 0 def pubPosePoint(self, frame, draw=True): pointArray = PointArray() img = np.zeros(frame.shape, np.uint8) img_RGB = cv.cvtColor(frame, cv.COLOR_BGR2RGB) self.results = self.pose.process(img_RGB) if self.results.pose_landmarks: if draw: self.mpDraw.draw_landmarks(frame, self.results.pose_landmarks, self.mpPose.POSE_CONNECTIONS, self.lmDrawSpec, self.drawSpec) self.mpDraw.draw_landmarks(img, self.results.pose_landmarks, self.mpPose.POSE_CONNECTIONS, self.lmDrawSpec, self.drawSpec) for id, lm in enumerate(self.results.pose_landmarks.landmark): point = Point() point.x, point.y, point.z = lm.x, lm.y, lm.z pointArray.points.append(point) self.pub_point.publish(pointArray) return frame, img def frame_combine(slef,frame, src): if len(frame.shape) == 3: frameH, frameW = frame.shape[:2] srcH, srcW = src.shape[:2] dst = np.zeros((max(frameH, srcH), frameW + srcW, 3), np.uint8) dst[:, :frameW] = frame[:, :] dst[:, frameW:] = src[:, :] else: src = cv.cvtColor(src, cv.COLOR_BGR2GRAY) frameH, frameW = frame.shape[:2] imgH, imgW = src.shape[:2] dst = np.zeros((frameH, frameW + imgW), np.uint8) dst[:, :frameW] = frame[:, :] dst[:, frameW:] = src[:, :] return dst def image_callback(self, msg): try: # Convert ROS Image message to OpenCV image frame = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8") # Process the frame and publish pose points frame, img = self.pubPosePoint(frame, draw=True) # Calculate and display FPS cTime = time.time() fps = 1 / (cTime - self.pTime) self.pTime = cTime text = "FPS : " + str(int(fps)) cv.putText(frame, text, (20, 30), cv.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) # Combine and display the frames dist = self.frame_combine(frame, img) cv.imshow('dist', dist) # Check for 'q' key press to quit if cv.waitKey(1) & 0xFF == ord('q'): rospy.signal_shutdown("User requested shutdown") except Exception as e: rospy.logerr("Could not process image: {e}")if __name__ == '__main__': rospy.init_node('PoseDetector', anonymous=True) pose_detector = PoseDetector() rospy.spin() cv.destroyAllWindows()