MediaPipe is an open-source data stream processing machine learning application development framework developed by Google. It is a graph-based data processing pipeline used to build data sources in various forms, such as video, audio, sensor data, and any time series data. MediaPipe is cross-platform and can run on embedded platforms (such as Jetson nano), mobile devices (iOS and Android), workstations and servers, and supports mobile GPU acceleration. MediaPipe provides cross-platform, customizable ML solutions for real-time and streaming media.
The core framework of MediaPipe is implemented in C++ and provides support for languages such as Java and Objective C. The main concepts of MediaPipe include packets, streams, calculators, graphs, and subgraphs.
Features of MediaPipe:
End-to-end acceleration: built-in fast ML inference and processing can be accelerated even on commodity hardware.
Build once, deploy anywhere: unified solution for Android, iOS, desktop/cloud, web and IoT.
Ready-to-use solution: cutting-edge ML solution that demonstrates the full capabilities of the framework.
Free and open source: framework and solution under Apache2.0, fully extensible and customizable.
MediaPipe Pose is an ML solution for high-fidelity body pose tracking, leveraging BlazePose research to infer 33 3D coordinates and full-body background segmentation masks from RGB video frames, which also provides power for the ML Kit pose detection API.
The landmark model in MediaPipe Pose predicts the location of 33 pose coordinates (see the figure below).

Enter the following command to start the program
xxxxxxxxxxros2 run dofbot_pro_mediapipe 02_PoseDetector

Source code location: ~/dofbot_pro_ws/src/dofbot_pro_mediapipe/dofbot_pro_mediapipe/02_PoseDetector.py
xxxxxxxxxx#!/usr/bin/env python3# encoding: utf-8import timeimport rclpyfrom rclpy.node import Nodeimport cv2 as cvimport numpy as npimport mediapipe as mpfrom geometry_msgs.msg import Pointfrom dofbot_pro_msgs.msg import PointArrayfrom sensor_msgs.msg import Imagefrom cv_bridge import CvBridge
class PoseDetector(Node): def __init__(self): super().__init__('pose_detector') self.publisher_ = self.create_publisher(PointArray, '/mediapipe/points', 10) self.bridge = CvBridge() self.mpPose = mp.solutions.pose self.mpDraw = mp.solutions.drawing_utils self.pose = self.mpPose.Pose( static_image_mode=False, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 )
self.lmDrawSpec = mp.solutions.drawing_utils.DrawingSpec(color=(0, 0, 255), thickness=-1, circle_radius=6) self.drawSpec = mp.solutions.drawing_utils.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
self.capture = cv.VideoCapture(0, cv.CAP_V4L2) self.capture.set(6, cv.VideoWriter.fourcc('M', 'J', 'P', 'G')) self.capture.set(cv.CAP_PROP_FRAME_WIDTH, 640) self.capture.set(cv.CAP_PROP_FRAME_HEIGHT, 480)
if not self.capture.isOpened(): self.get_logger().error("Failed to open the camera") return
self.get_logger().info(f"Camera FPS: {self.capture.get(cv.CAP_PROP_FPS)}") self.pTime = time.time()
self.timer = self.create_timer(0.03, self.process_frame)
def process_frame(self): ret, frame = self.capture.read() if not ret: self.get_logger().error("Failed to read frame") return frame, img = self.pubPosePoint(frame, draw=True) cTime = time.time() fps = 1 / (cTime - self.pTime) self.pTime = cTime text = "FPS : " + str(int(fps)) cv.putText(frame, text, (20, 30), cv.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 1)
combined_frame = self.frame_combine(frame, img) cv.imshow('PoseDetector', combined_frame)
if cv.waitKey(1) & 0xFF == ord('q'): self.get_logger().info("Exiting program") self.capture.release() cv.destroyAllWindows() self.destroy_node() rclpy.shutdown() exit(0)
def pubPosePoint(self, frame, draw=True): pointArray = PointArray() img = np.zeros(frame.shape, np.uint8) img_RGB = cv.cvtColor(frame, cv.COLOR_BGR2RGB) self.results = self.pose.process(img_RGB)
if self.results.pose_landmarks: if draw: self.mpDraw.draw_landmarks(frame, self.results.pose_landmarks, self.mpPose.POSE_CONNECTIONS, self.lmDrawSpec, self.drawSpec) self.mpDraw.draw_landmarks(img, self.results.pose_landmarks, self.mpPose.POSE_CONNECTIONS, self.lmDrawSpec, self.drawSpec) for id, lm in enumerate(self.results.pose_landmarks.landmark): point = Point() point.x, point.y, point.z = lm.x, lm.y, lm.z pointArray.points.append(point)
self.publisher_.publish(pointArray) return frame, img
def frame_combine(self, frame, src): if len(frame.shape) == 3: frameH, frameW = frame.shape[:2] srcH, srcW = src.shape[:2] dst = np.zeros((max(frameH, srcH), frameW + srcW, 3), np.uint8) dst[:, :frameW] = frame[:, :] dst[:, frameW:] = src[:, :] else: src = cv.cvtColor(src, cv.COLOR_BGR2GRAY) frameH, frameW = frame.shape[:2] imgH, imgW = src.shape[:2] dst = np.zeros((frameH, frameW + imgW), np.uint8) dst[:, :frameW] = frame[:, :] dst[:, frameW:] = src[:, :] return dst
def main(args=None): rclpy.init(args=args) node = PoseDetector() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.capture.release() cv.destroyAllWindows() node.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()