编写一个简单的发布者和订阅者 (Python)

目标: 使用 Python 创建并运行发布者和订阅者节点。

教程级别: 初学者

时间: 20 分钟

背景

在本教程中,您将创建 节点,这些节点通过 主题 以字符串消息的形式相互传递信息。

此处使用的示例是一个简单的“谈话者”和“侦听者”系统; 一个节点发布数据,另一个节点订阅该主题,以便可以接收该数据。

这些示例中使用的代码可以在 此处 找到。

先决条件

在之前的教程中,您学习了如何:doc:创建工作区创建包

建议对 Python 有基本的了解,但并非完全必要。

任务

1 创建一个包

打开一个新终端并:doc:source 您的 ROS 2 安装,以便 ros2 命令可以正常工作。

导航到在 上一个教程 中创建的 ros2_ws 目录。

回想一下,应该在“src”目录中创建包,而不是在工作区的根目录中创建包。 因此,导航到“ros2_ws/src”,然后运行包创建命令:

ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub

您的终端将返回一条消息,验证您的包“py_pubsub”及其所有必要的文件和文件夹的创建。

2 编写发布者节点

导航到“ros2_ws/src/py_pubsub/py_pubsub”。 回想一下,这个目录是一个“Python 包 <https://docs.python.org/3/tutorial/modules.html#packages>” ,其名称与它嵌套的 ROS 2 包相同。

通过输入以下命令下载示例 talker 代码:

wget https://raw.githubusercontent.com/ros2/examples/rolling/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py

现在将在“__init__.py”旁边出现一个名为“publisher_member_function.py”的新文件。

使用您喜欢的文本编辑器打开该文件。

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1


def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_publisher.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

2.1 检查代码

注释后的第一行代码导入了“rclpy”,因此可以使用其“Node”类。

import rclpy
from rclpy.node import Node

下一个语句导入节点用来构造其在主题上传递的数据的内置字符串消息类型。

from std_msgs.msg import String

这些行代表节点的依赖项。 回想一下,必须将依赖项添加到“package.xml”中,您将在下一节中执行此操作。

接下来,创建“MinimalPublisher”类,它继承自“Node”(或为“Node”的子类)。

class MinimalPublisher(Node):

以下是该类构造函数的定义。 super().__init__ 调用 Node 类的构造函数并为其指定节点名称,在本例中为 minimal_publisher

create_publisher 声明该节点通过名为 topic 的主题发布 String 类型的消息(从 std_msgs.msg 模块导入),并且“队列大小”为 10。

队列大小是必需的 QoS(服务质量)设置,如果订阅者接收消息的速度不够快,它会限制排队消息的数量。

接下来,创建一个计时器,并每 0.5 秒执行一次回调。 self.i 是回调中使用的计数器。

def __init__(self):
    super().__init__('minimal_publisher')
    self.publisher_ = self.create_publisher(String, 'topic', 10)
    timer_period = 0.5  # seconds
    self.timer = self.create_timer(timer_period, self.timer_callback)
    self.i = 0

“timer_callback” 创建一条附加了计数器值的消息,并使用“get_logger().info”将其发布到控制台。

def timer_callback(self):
    msg = String()
    msg.data = 'Hello World: %d' % self.i
    self.publisher_.publish(msg)
    self.get_logger().info('Publishing: "%s"' % msg.data)
    self.i += 1

最后,定义主要函数。

def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_publisher.destroy_node()
    rclpy.shutdown()

首先初始化 rclpy 库,然后创建节点,然后“旋转”节点,以便调用其回调。

2.2 添加依赖项

导航回上一级到 ros2_ws/src/py_pubsub 目录,其中已为您创建了 setup.pysetup.cfgpackage.xml 文件。

使用文本编辑器打开 package.xml

上一个教程 中所述,请确保填写 <description><maintainer><license> 标签:

<description>Examples of minimal publisher/subscriber using rclpy</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache-2.0</license>

在上述行之后,添加与节点的导入语句相对应的以下依赖项:

<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>

这声明了包在执行其代码时需要“rclpy”和“std_msgs”。

确保保存文件。

2.3 添加入口点

打开“setup.py”文件。

再次将“maintainer”、“maintainer_email”、“description”和“license”字段与您的“package.xml”匹配:

maintainer='YourName',
maintainer_email='you@email.com',
description='Examples of minimal publisher/subscriber using rclpy',
license='Apache-2.0',

在“entry_points”字段的“console_scripts”括号内添加以下行:

entry_points={
        'console_scripts': [
                'talker = py_pubsub.publisher_member_function:main',
        ],
},

别忘了保存。

2.4 检查 setup.cfg

“setup.cfg”文件的内容应该会自动正确填充,如下所示:

[develop]
script_dir=$base/lib/py_pubsub
[install]
install_scripts=$base/lib/py_pubsub

这只是告诉 setuptools 将可执行文件放入“lib”中,因为“ros2 run”会在那里查找它们。

您现在可以构建您的包,获取本地安装文件并运行它,但让我们先创建订阅者节点,以便您可以看到整个系统在工作。

3 编写订阅者节点

返回“ros2_ws/src/py_pubsub/py_pubsub”以创建下一个节点。 在您的终端中输入以下代码:

wget https://raw.githubusercontent.com/ros2/examples/rolling/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py

现在目录应该包含以下文件:

__init__.py  publisher_member_function.py  subscriber_member_function.py

3.1 检查代码

使用文本编辑器打开“subscriber_member_function.py”。

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

订阅者节点的代码与发布者的代码几乎相同。 构造函数使用与发布者相同的参数创建订阅者。 回想一下:doc:topics tutorial,发布者和订阅者使用的主题名称和消息类型必须匹配才能进行通信。

self.subscription = self.create_subscription(
    String,
    'topic',
    self.listener_callback,
    10)

订阅者的构造函数和回调不包含任何计时器定义,因为它不需要。 一旦收到消息,它的回调就会被调用。

回调定义只是将信息消息连同它收到的数据一起打印到控制台。 回想一下,发布者定义了“msg.data = ‘Hello World: %d’ % self.i”

def listener_callback(self, msg):
    self.get_logger().info('I heard: "%s"' % msg.data)

“主要”定义几乎完全相同,用订阅者取代发布者的创建和旋转。

minimal_subscriber = MinimalSubscriber()

rclpy.spin(minimal_subscriber)

由于此节点具有与发布者相同的依赖关系,因此无需向 package.xml 添加任何新内容。 setup.cfg 文件也可以保持不变。

3.2 添加入口点

重新打开 setup.py 并在发布者的入口点下方添加订阅者节点的入口点。 entry_points 字段现在应如下所示:

entry_points={
        'console_scripts': [
                'talker = py_pubsub.publisher_member_function:main',
                'listener = py_pubsub.subscriber_member_function:main',
        ],
},

确保保存文件,然后您的发布/订阅系统就准备好了。

4 构建并运行

您可能已经安装了“rclpy”和“std_msgs”包作为 ROS 2 系统的一部分。 在构建之前,最好在工作区(“ros2_ws”)的根目录中运行“rosdep”来检查缺少的依赖项:

rosdep install -i --from-path src --rosdistro rolling -y

仍然在工作区的根目录“ros2_ws”中构建新包:

colcon build --packages-select py_pubsub

打开一个新终端,导航到“ros2_ws”,并获取安装文件:

source install/setup.bash

现在运行 talker 节点:

ros2 run py_pubsub talker

终端应该每 0.5 秒开始发布一次信息消息,如下所示:

[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [minimal_publisher]: Publishing: "Hello World: 4"
...

打开另一个终端,再次从“ros2_ws”内部获取安装文件,然后启动监听器节点:

ros2 run py_pubsub listener

监听器将开始将消息打印到控制台,从发布者当时的消息计数开始,如下所示:

[INFO] [minimal_subscriber]: I heard: "Hello World: 10"
[INFO] [minimal_subscriber]: I heard: "Hello World: 11"
[INFO] [minimal_subscriber]: I heard: "Hello World: 12"
[INFO] [minimal_subscriber]: I heard: "Hello World: 13"
[INFO] [minimal_subscriber]: I heard: "Hello World: 14"

在每个终端中输入“Ctrl+C”以停止节点旋转。

摘要

您创建了两个节点来发布和订阅主题上的数据。 在运行它们之前,您已将它们的依赖项和入口点添加到包配置文件中。

后续步骤

接下来,您将使用服务/客户端模型创建另一个简单的 ROS 2 包。 同样,您可以选择使用 C++Python 编写它。

相关内容

有几种方法可以用 Python 编写发布者和订阅者;查看 ros2/examples repo 中的 minimal_publisherminimal_subscriber 包。