从节点记录包 (Python)

目标:将数据从您自己的 Python 节点记录到包中。

教程级别:高级

时间:20 分钟

背景

rosbag2 不仅提供 ros2 bag 命令行工具。 它还提供了一个 Python API,用于从您自己的源代码读取和写入包。 这允许您订阅主题并将接收到的数据保存到包中,同时对该数据执行您选择的任何其他处理。 例如,您可以这样做来保存来自主题的数据和处理该数据的结果,而无需将处理后的数据发送到主题上以进行记录。 因为任何数据都可以记录在包中,所以也可以保存由主题以外的其他来源生成的数据,例如用于训练集的合成数据。 这很有用,例如,可以快速生成包含大量样本的包,这些样本分布在较长的播放时间内。

先决条件

您应该在常规 ROS 2 设置中安装 rosbag2 包。

如果您已从 Linux 上的 deb 包安装,则默认情况下可能会安装它。 如果没有,您可以使用此命令进行安装。

sudo apt install ros-rolling-rosbag2

本教程讨论了如何使用 ROS 2 包,包括从终端使用。

您应该已经完成​​了 基本 ROS 2 包教程

任务

1 创建一个包

打开一个新终端并 source 您的 ROS 2 安装,以便 ros2 命令可以正常工作。

按照 这些说明 创建一个名为 ros2_ws 的新工作区。

导航到 ros2_ws/src 目录并创建一个新包:

ros2 pkg create --build-type ament_python --license Apache-2.0 bag_recorder_nodes_py --dependencies rclpy rosbag2_py example_interfaces std_msgs

您的终端将返回一条消息,验证您的包“bag_recorder_nodes_py”及其所有必要的文件和文件夹的创建。 “–dependencies”参数将自动将必要的依赖项行添加到“package.xml”中。 在这种情况下,包将使用“rosbag2_py”包以及“rclpy”包。 消息定义还需要对“example_interfaces”包的依赖。

1.1 更新“package.xml”和“setup.py”

由于您在包创建期间使用了“–dependencies”选项,因此您不必手动将依赖项添加到“package.xml”中。 不过,与往常一样,请确保将描述、维护者电子邮件和姓名以及许可证信息添加到“package.xml”中。

<description>Python bag writing tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>

还要确保将此信息添加到“setup.py”文件中。

maintainer='Your Name',
maintainer_email='you@email.com',
description='Python bag writing tutorial',
license='Apache License 2.0',

2 编写 Python 节点

ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录中,创建一个名为 simple_bag_recorder.py 的新文件,并将以下代码粘贴到其中。

import rclpy
from rclpy.node import Node
from rclpy.serialization import serialize_message
from std_msgs.msg import String

import rosbag2_py

class SimpleBagRecorder(Node):
    def __init__(self):
        super().__init__('simple_bag_recorder')
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='my_bag',
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            name='chatter',
            type='std_msgs/msg/String',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.topic_callback,
            10)
        self.subscription

    def topic_callback(self, msg):
        self.writer.write(
            'chatter',
            serialize_message(msg),
            self.get_clock().now().nanoseconds)


def main(args=None):
    rclpy.init(args=args)
    sbr = SimpleBagRecorder()
    rclpy.spin(sbr)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

2.1 检查代码

顶部的 import 语句是包依赖项。

请注意,导入 rosbag2_py 包以获取处理包文件所需的函数和结构。

在类构造函数中,我们首先创建用于写入包的 writer 对象。 我们正在创建一个 SequentialWriter,它按照收到消息的顺序将消息写入包中。 rosbag2 中可能提供具有不同行为的其他 writer。

self.writer = rosbag2_py.SequentialWriter()

现在我们有了一个编写器对象,我们可以使用它来打开包。 我们指定要创建的包的 URI 和格式(“sqlite3”),其他选项保留默认设置。 使用默认转换选项,它将不执行任何转换并以接收消息的序列化格式存储消息。

storage_options = rosbag2_py._storage.StorageOptions(
    uri='my_bag',
    storage_id='sqlite3')
converter_options = rosbag2_py._storage.ConverterOptions('', '')
self.writer.open(storage_options, converter_options)

接下来,我们需要告诉写入器我们希望存储的主题。 这是通过创建一个“TopicMetadata”对象并将其注册到写入器来完成的。 此对象指定主题名称、主题数据类型和使用的序列化格式。

topic_info = rosbag2_py._storage.TopicMetadata(
    name='chatter',
    type='std_msgs/msg/String',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

现在,编写器已设置为记录我们传递给它的数据,我们创建一个订阅并为其指定一个回调。 我们将在回调中将数据写入包中。

self.subscription = self.create_subscription(
    String,
    'chatter',
    self.topic_callback,
    10)
self.subscription

回调以非序列化形式接收消息(这是 rclpy API 的标准),并将消息传递给编写器,指定数据所属的主题以及与消息一起记录的时间戳。 但是,编写器需要序列化消息来存储在包中。 这意味着我们需要在将数据传递给编写器之前对其进行序列化。 因此,我们调用 serialize_message() 并将结果传递给编写器,而不是直接传递消息。

def topic_callback(self, msg):
    self.writer.write(
        'chatter',
        serialize_message(msg),
        self.get_clock().now().nanoseconds)

该文件以“main”函数结束,该函数用于创建节点实例并启动 ROS 对其进行处理。

def main(args=None):
    rclpy.init(args=args)
    sbr = SimpleBagRecorder()
    rclpy.spin(sbr)
    rclpy.shutdown()

2.2 添加入口点

打开“bag_recorder_nodes_py”包中的“setup.py”文件并为您的节点添加一个入口点。

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
    ],
},

3 构建并运行

导航回工作区的根目录“ros2_ws”,并构建新包。

colcon build --packages-select bag_recorder_nodes_py

打开一个新终端,导航到“ros2_ws”,并获取安装文件。

source install/setup.bash

现在运行节点:

ros2 run bag_recorder_nodes_py simple_bag_recorder

打开第二个终端并运行“talker”示例节点。

ros2 run demo_nodes_cpp talker

这将开始在“chatter”主题上发布数据。 当写入包的节点接收到此数据时,它会将其写入“my_bag”包。 如果“my_bag”目录已经存在,则必须先将其删除,然后再运行“simple_bag_recorder”节点。 这是因为“rosbag2”默认不会覆盖现有包,因此目标目录不能存在。

终止两个节点。 然后,在一个终端中启动“listener”示例节点。

ros2 run demo_nodes_cpp listener

在另一个终端中,使用“ros2 bag”播放你的节点记录的包。

ros2 bag play my_bag

您将看到“listener”节点接收到来自包的消息。

如果您希望再次运行包写入节点,则首先需要删除“my_bag”目录。

4 从节点记录合成数据

任何数据都可以记录到包中,而不仅仅是通过主题接收的数据。 从您自己的节点写入包的一个常见用例是生成和存储合成数据。 在本节中,您将学习如何编写一个生成一些数据并将其存储在包中的节点。 我们将演示两种方法。 第一种方法是使用带计时器的节点;如果您的数据生成在节点外部,例如直接从硬件(例如相机)读取数据,则可以使用这种方法。 第二种方法不使用节点;当您不需要使用 ROS 基础架构中的任何功能时,可以使用此方法。

4.1 编写 Python 节点

ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录中,创建一个名为 data_generator_node.py 的新文件,并将以下代码粘贴到其中。

import rclpy
from rclpy.node import Node
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py

class DataGeneratorNode(Node):
    def __init__(self):
        super().__init__('data_generator_node')
        self.data = Int32()
        self.data.data = 0
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='timed_synthetic_bag',
            storage_id='sqlite3')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            name='synthetic',
            type='example_interfaces/msg/Int32',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.timer = self.create_timer(1, self.timer_callback)

    def timer_callback(self):
        self.writer.write(
            'synthetic',
            serialize_message(self.data),
            self.get_clock().now().nanoseconds)
        self.data.data += 1


def main(args=None):
    rclpy.init(args=args)
    dgn = DataGeneratorNode()
    rclpy.spin(dgn)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

4.2 检查代码

此代码大部分与第一个例子相同。 这里描述了重要的区别。

首先,包的名称发生了变化。

storage_options = rosbag2_py._storage.StorageOptions(
    uri='timed_synthetic_bag',
    storage_id='sqlite3')

主题的名称也会改变,存储的数据类型也会改变。

topic_info = rosbag2_py._storage.TopicMetadata(
    name='synthetic',
    type='example_interfaces/msg/Int32',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

此节点没有订阅主题,而是有一个计时器。 计时器以一秒为周期触发,并在触发时调用给定的成员函数。

self.timer = self.create_timer(1, self.timer_callback)

在计时器回调中,我们生成(或以其他方式获取,例如从连接到某些硬件的串行端口读取)我们希望存储在包中的数据。 与前面的示例一样,数据尚未序列化,因此我们必须在将其传递给写入器之前对其进行序列化。

self.writer.write(
    'synthetic',
    serialize_message(self.data),
    self.get_clock().now().nanoseconds)

4.3 添加可执行文件

打开“bag_recorder_nodes_py”包中的“setup.py”文件并为您的节点添加一个入口点。

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
    ],
},

4.4 构建并运行

导航回工作区的根目录“ros2_ws”,然后构建包。

colcon build --packages-select bag_recorder_nodes_py

打开一个新终端,导航到“ros2_ws”,并获取安装文件。

source install/setup.bash

如果“timed_synthetic_bag”目录已经存在,则必须先将其删除,然后再运行节点。

现在运行节点:

ros2 run bag_recorder_nodes_py data_generator_node

等待 30 秒左右,然后使用 ctrl-c 终止节点。 接下来,播放创建的包。

ros2 bag play timed_synthetic_bag

打开第二个终端并回显“/synthetic”主题。

ros2 topic echo /synthetic

您将看到生成并存储在包中的数据以每秒一条消息的速度打印到控制台。

5 从可执行文件记录合成数据

现在您可以创建一个存储来自主题以外来源的数据的包,您将学习如何从非节点可执行文件生成和记录合成数据。 这种方法的优点是代码更简单,可以快速创建大量数据。

5.1 编写 Python 可执行文件

在“ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py”目录中,创建一个名为“data_generator_executable.py”的新文件,并将以下代码粘贴到其中。

from rclpy.clock import Clock
from rclpy.duration import Duration
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py


def main(args=None):
    writer = rosbag2_py.SequentialWriter()

    storage_options = rosbag2_py._storage.StorageOptions(
        uri='big_synthetic_bag',
        storage_id='sqlite3')
    converter_options = rosbag2_py._storage.ConverterOptions('', '')
    writer.open(storage_options, converter_options)

    topic_info = rosbag2_py._storage.TopicMetadata(
        name='synthetic',
        type='example_interfaces/msg/Int32',
        serialization_format='cdr')
    writer.create_topic(topic_info)

    time_stamp = Clock().now()
    for ii in range(0, 100):
        data = Int32()
        data.data = ii
        writer.write(
            'synthetic',
            serialize_message(data),
            time_stamp.nanoseconds)
        time_stamp += Duration(seconds=1)

if __name__ == '__main__':
    main()

5.2 检查代码

比较此示例和上一个示例,您会发现它们并没有什么不同。 唯一显著的区别是使用 for 循环而不是计时器来驱动数据生成。

请注意,我们现在还为数据生成时间戳,而不是依赖于每个样本的当前系统时间。 时间戳可以是您需要的任何值。 数据将以这些时间戳给出的速率播放,因此这是一种控制样本默认播放速度的有用方法。 还请注意,虽然每个样本之间的间隔是整整一秒,但此可执行文件不需要在每个样本之间等待一秒。 这使我们能够在比播放所需的时间短得多的时间内生成大量涵盖广泛时间跨度的数据。

time_stamp = Clock().now()
for ii in range(0, 100):
    data = Int32()
    data.data = ii
    writer.write(
        'synthetic',
        serialize_message(data),
        time_stamp.nanoseconds)
    time_stamp += Duration(seconds=1)

5.3 添加可执行文件

打开“bag_recorder_nodes_py”包中的“setup.py”文件并为您的节点添加一个入口点。

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
        'data_generator_executable = bag_recorder_nodes_py.data_generator_executable:main',
    ],
},

5.4 构建并运行

导航回工作区的根目录“ros2_ws”,然后构建包。

colcon build --packages-select bag_recorder_nodes_py

打开终端,导航到“ros2_ws”,并获取安装文件。

source install/setup.bash

如果“big_synthetic_bag”目录已经存在,则必须先将其删除,然后再运行可执行文件。

现在运行可执行文件:

ros2 run bag_recorder_nodes_py data_generator_executable

请注意,可执行文件运行并完成得非常快。

现在播放创建的包。

ros2 bag play big_synthetic_bag

打开第二个终端并回显“/synthetic”主题。

ros2 topic echo /synthetic

您将看到生成并存储在包中的数据以每秒一条消息的速率打印到控制台。 即使包生成得很快,它仍然以时间戳指示的速率播放。

摘要

您创建了一个节点,它将它在某个主题上收到的数据记录到包中。 您测试了使用该节点记录包,并通过播放包来验证数据是否已记录。 此方法可用于记录包中比它在某个主题上收到的数据更多的数据,例如,记录从处理接收到的数据中获得的结果。 然后,您继续创建一个节点和一个可执行文件来生成合成数据并将其存储在包中。 后一种方法特别适用于生成可用作训练集等的合成数据。