DDS stands for Data Distribution Service. Published and maintained by the Object Management Group (OMG) in 2004, it is a data distribution/subscription standard designed specifically for real-time systems. It was first adopted by the US Navy to address compatibility issues with large-scale software upgrades within the complex network environments of ships. It has now become a mandatory standard.
DDS emphasizes data-centricity and provides a rich set of quality-of-service policies to ensure real-time, efficient, and flexible data distribution, meeting the needs of various distributed real-time communication applications.
References
The topics, services, and actions we learned about in the previous course all rely on DDS for their underlying communication implementation. DDS is equivalent to the neural network in the ROS robot system.
The core of DDS is communication. Numerous models and software frameworks exist to implement this communication. Here we list four commonly used models.

Of these communication models, DDS has a clear advantage.
DDS plays a crucial role in the ROS2 system. All upper-layer infrastructure is built on DDS. In this ROS2 architecture diagram, the blue and red components represent DDS.

Among the four major components of ROS, the addition of DDS greatly enhances the comprehensive capabilities of the distributed communication system. This allows us to avoid worrying about communication issues during robot development and focus more time on other application development components.
The basic structure in DDS is the domain. A domain binds applications together for communication. Recall that when we configured communication between the Raspberry Pi and the computer, the domain ID we configured earlier defines the grouping of the global data space. Only nodes within the same domain group can communicate with each other. This prevents useless data from occupying resources.
Another important feature of DDS is the quality of service (QoS).
QoS is a network transmission policy. Applications specify the desired network transmission quality behavior, and the QoS service implements this behavior to meet the client's communication quality requirements as much as possible. It can be thought of as a contract between data providers and receivers.
The policies are as follows:
xxxxxxxxxxros2 topic pub /chatter std_msgs/msg/Int32 "data: 66" --qos-reliability best_effort
xxxxxxxxxxros2 topic echo /chatter --qos-reliability reliable
xxxxxxxxxxros2 topic echo /chatter --qos-reliability best_effort
xxxxxxxxxxros2 pkg create learning_dds --build-type ament_python --dependencies rclpy std_msgsxxxxxxxxxximport rclpyfrom rclpy.node import Nodefrom std_msgs.msg import String# Import QoS related classesfrom rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicyclass ControllerPublisher(Node): def __init__(self, name): super().__init__(name) #1. Configure QoS policy: reliable transmission, retain the last historical data self.qos_profile = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, # Reliable transmission (retransmission of lost data) history=QoSHistoryPolicy.KEEP_LAST, # Keep the last N pieces of data depth=1 # Keep 1 historical data ) # 2. Create a publisher: topic name /robot_cmd, message type String, QoS policy self.publisher = self.create_publisher( String, "/robot_cmd", self.qos_profile ) # 3. Create a timer: send a command once per second self.timer = self.create_timer(1.0, self.timer_callback) self.cmd_list = ["forward", "backward", "stop"] # Instruction List self.cmd_index = 0 #Instruction index, loop switching def timer_callback(self): # Cycle switching instructions (forward → backward → stop → forward...) current_cmd = self.cmd_list[self.cmd_index % 3] # Create a message and fill it with data msg = String() msg.data = current_cmd # Publish a message self.publisher.publish(msg) # Print log (shows issued commands) self.get_logger().info(f"发布控制指令:{msg.data}") # Update command index self.cmd_index += 1def main(args=None): # Initializing ROS2 rclpy.init(args=args) # Create a publisher node node = ControllerPublisher("robot_controller_pub") # Looping nodes rclpy.spin(node) # Destroy the node and shut down ROS2 node.destroy_node() rclpy.shutdown()if __name__ == "__main__": main()
xxxxxxxxxximport rclpyfrom rclpy.node import Nodefrom std_msgs.msg import Stringfrom rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicyclass RobotSubscriber(Node): def __init__(self, name): super().__init__(name) # 1. Configure a QoS policy compatible with the publisher self.qos_profile = QoSProfile( reliability=QoSReliabilityPolicy.BEST_EFFORT, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) # 2. Create a subscriber: topic name/robot_cmd, callback function, QoS policy self.subscription = self.create_subscription( String, "/robot_cmd", self.cmd_callback, # Callback function executed after receiving data self.qos_profile ) def cmd_callback(self, msg): # Callback function: Processes received instructions self.get_logger().info(f"接收控制指令:{msg.data} → 执行对应动作")def main(args=None): rclpy.init(args=args) node = RobotSubscriber("robot_subscriber") rclpy.spin(node) node.destroy_node() rclpy.shutdown()if __name__ == "__main__": main()xxxxxxxxxxentry_points={ 'console_scripts': [ # Publisher node: command name = package name. file name: main function 'dds_controller_pub = learning_dds.dds_controller_pub:main', # Subscriber Node 'dds_robot_sub = learning_dds.dds_robot_sub:main', ],},
xxxxxxxxxxcolcon build --packages-select learning_dds
xxxxxxxxxxsource install/setup.bashxxxxxxxxxxros2 run learning_dds dds_controller_pubxxxxxxxxxxros2 run learning_dds dds_robot_sub
Advanced Robot Development:
