迁移 Python 包示例

本指南介绍如何将示例 Python 包从 ROS 1 迁移到 ROS 2。

先决条件

您需要一个可运行的 ROS 2 安装,例如 ROS rolling

ROS 1 代码

您将不会在本指南中使用 catkin,因此您不需要可运行的 ROS 1 安装。 您将改用 ROS 2 的构建工具 Colcon

本节为您提供 ROS 1 Python 包的代码。 该包名为 talker_py,它有一个名为 talker_py_node 的节点。 为了便于以后运行 Colcon,这些说明让您在 Colcon 工作区 内创建包,

首先,在 ~/ros2_talker_py 创建一个文件夹作为 Colcon 工作区的根目录。

mkdir -p ~/ros2_talker_py/src

接下来,创建 ROS 1 包的文件。

cd ~/ros2_talker_py
mkdir -p src/talker_py/src/talker_py
mkdir -p src/talker_py/scripts
touch src/talker_py/package.xml
touch src/talker_py/CMakeLists.txt
touch src/talker_py/src/talker_py/__init__.py
touch src/talker_py/scripts/talker_py_node
touch src/talker_py/setup.py

将以下内容放入每个文件中。

src/talker_py/package.xml:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
    <name>talker_py</name>
    <version>1.0.0</version>
    <description>The talker_py package</description>
    <maintainer email="gerkey@example.com">Brian Gerkey</maintainer>
    <license>BSD</license>

    <buildtool_depend>catkin</buildtool_depend>

    <depend>rospy</depend>
    <depend>std_msgs</depend>
</package>

src/talker_py/CMakeLists.txt:

cmake_minimum_required(VERSION 3.0.2)
project(talker_py)

find_package(catkin REQUIRED)

catkin_python_setup()

catkin_package()

catkin_install_python(PROGRAMS
    scripts/talker_py_node
    DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

src/talker/src/talker_py/__init__.py:

import rospy
from std_msgs.msg import String

def main():
    rospy.init_node('talker')
    pub = rospy.Publisher('chatter', String, queue_size=10)
    rate = rospy.Rate(10)  # 10hz
    while not rospy.is_shutdown():
        hello_str = "hello world %s" % rospy.get_time()
        rospy.loginfo(hello_str)
        pub.publish(hello_str)
        rate.sleep()

src/talker_py/scripts/talker_py_node:

#!/usr/bin/env python

import talker_py

if __name__ == '__main__':
    talker_py.main()

src/talker_py/setup.py:

from setuptools import setup
from catkin_pkg.python_setup import generate_distutils_setup

setup_args = generate_distutils_setup(
    packages=['talker_py'],
    package_dir={'': 'src'}
)

setup(**setup_args)

这是完整的 ROS 1 Python 包。

迁移 package.xml

将包迁移到 ROS 2 时,请先迁移构建系统文件,以便您可以随时构建和运行代码来检查您的工作。 始终从迁移您的 package.xml 开始。

首先,ROS 2 不使用 catkin。 删除其上的 <buildtool_depend>

<!-- delete this -->
<buildtool_depend>catkin</buildtool_depend>

接下来,ROS 2 使用“rclpy”而不是“rospy”。 删除对“rospy”的依赖。

<!-- Delete this -->
<depend>rospy</depend>

用对“rclpy”的新依赖项替换它。

<depend>rclpy</depend>

添加一个“<export>”部分来告诉 ROS 2 的构建工具“Colcon <https://colcon.readthedocs.io/>”这是一个“ament_python”包而不是“catkin”包。

<export>
  <build_type>ament_python</build_type>
</export>

您的“package.xml”已完全迁移。 它现在应如下所示:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
    <name>talker_py</name>
    <version>1.0.0</version>
    <description>The talker_py package</description>
    <maintainer email="gerkey@example.com">Brian Gerkey</maintainer>
    <license>BSD</license>

    <depend>rclpy</depend>
    <depend>std_msgs</depend>

    <export>
        <build_type>ament_python</build_type>
    </export>
</package>

删除``CMakeLists.txt``

ROS 2 中的 Python 包不使用 CMake,因此请删除``CMakeLists.txt``。

迁移``setup.py``

setup.py 中``setup()`` 的参数不再能通过``catkin_pkg`` 自动生成。

您必须手动传递这些参数,这意味着您的``package.xml`` 会有一些重复。

首先删除来自``catkin_pkg`` 的导入。

# Delete this
from catkin_pkg.python_setup import generate_distutils_setup

将传递给“generate_distutils_setup()”的所有参数移至对“setup()”的调用,然后添加“install_requires”和“zip_safe”参数。 您对“setup()”的调用应如下所示:

setup(
    packages=['talker_py'],
    package_dir={'': 'src'},
    install_requires=['setuptools'],
    zip_safe=True,
)

删除对“generate_distutils_setup()”的调用。

# Delete this
setup_args = generate_distutils_setup(
    packages=['talker_py'],
    package_dir={'': 'src'}
)

setup() 的调用需要从 package.xml 复制一些 附加元数据 <https://docs.python.org/3.11/distutils/setupscript.html#additional-meta-data>

  • 通过 name 参数指定包名称

  • 通过 version 参数指定包版本

  • 通过 maintainermaintainer_email 参数指定维护者

  • 通过 description 参数指定描述

  • 通过 license 参数指定许可证

包名称将被多次使用。 在对 setup() 的调用上方创建一个名为 package_name 的变量。

package_name = 'talker_py'

将所有剩余信息复制到“setup.py”中“setup()”的参数中。 您对“setup()”的调用应如下所示:

setup(
    name=package_name,
    version='1.0.0',
    install_requires=['setuptools'],
    zip_safe=True,
    packages=['talker_py'],
    package_dir={'': 'src'},
    maintainer='Brian Gerkey',
    maintainer_email='gerkey@example.com',
    description='The talker_py package',
    license='BSD',
)

ROS 2 软件包必须安装两个数据文件:

  • package.xml

  • 软件包标记文件

您的软件包已经有一个 package.xml。 它描述了您的软件包的依赖关系。 软件包标记文件会告诉 ros2 run 等工具在哪里找到您的软件包。

package.xml 旁边创建一个名为 resource 的目录。 在 resource 目录中创建一个与软件包同名的空文件。

mkdir resource
touch resource/talker_py

“setup.py” 中的“setup()”调用必须告诉“setuptools”如何安装这些文件。 将以下“data_files”参数添加到对“setup()”的调用中。

data_files=[
    ('share/ament_index/resource_index/packages',
        ['resource/' + package_name]),
    ('share/' + package_name, ['package.xml']),
],

您的 setup.py 几乎已完成。

迁移 Python 脚本并创建 setup.cfg

ROS 2 Python 包使用 console_scripts 入口点 将 Python 脚本安装为可执行文件。 配置文件 setup.cfg 告诉 setuptools 将这些可执行文件安装在特定于包的目录中,以便 ros2 run 等工具可以找到它们。 在 package.xml 旁边创建一个 setup.cfg 文件。

touch setup.cfg

将下面的内容放入其中:

[develop]
script_dir=$base/lib/talker_py
[install]
install_scripts=$base/lib/talker_py

您需要使用 console_scripts 入口点来定义要安装的可执行文件。 每个条目的格式为 executable_name = some.module:function。 第一部分指定要创建的可执行文件的名称。 第二部分指定可执行文件启动时应运行的函数。 此包需要创建一个名为 talker_py_node 的可执行文件,并且可执行文件需要调用 talker_py 模块中的函数 main。 将以下入口点规范作为另一个参数添加到您的 setup.py 中的 setup()

entry_points={
    'console_scripts': [
        'talker_py_node = talker_py:main',
    ],
},

talker_py_node 文件不再需要。 删除文件 talker_py_node 并删除 scripts/ 目录。

rm scripts/talker_py_node
rmdir scripts

添加“console_scripts”是对您的“setup.py”的最后一次更改。 您的最终“setup.py”应如下所示:

from setuptools import setup

package_name = 'talker_py'

setup(
    name=package_name,
    version='1.0.0',
    packages=['talker_py'],
    package_dir={'': 'src'},
    install_requires=['setuptools'],
    zip_safe=True,
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    maintainer='Brian Gerkey',
    maintainer_email='gerkey@example.com',
    description='The talker_py package',
    license='BSD',
    entry_points={
        'console_scripts': [
            'talker_py_node = talker_py:main',
        ],
    },
)

src/talker_py/__init__.py 中迁移 Python 代码

ROS 2 改变了许多 Python 代码的最佳实践。 首先按原样迁移代码。 在代码运行正常后,稍后重构代码会更容易。

使用 rclpy 而不是 rospy

ROS 2 软件包使用 rclpy <https://index.ros.org/p/rclpy>`__ 而不是 ``rospy。 要使用 rclpy,您必须做两件事:

  1. Import rclpy

  2. Initialize rclpy

删除导入“rospy”的语句。

# Remove this
import rospy

用导入“rclpy”的语句替换它。

import rclpy

在“main()”函数中添加对“rclpy.init()”的调用作为第一个语句。

def main():
    # Add this line
    rclpy.init()

在后台执行回调

ROS 1 和 ROS 2 都使用 callbacks。 在 ROS 1 中,回调始终在后台线程中执行,用户可以自由地使用 rate.sleep() 之类的调用来阻止主线程。 在 ROS 2 中,rclpy 使用 Executors 让用户更好地控制回调的调用位置。 在移植使用 rate.sleep() 之类的阻止调用的代码时,必须确保这些调用不会干扰执行器。 一种方法是为执行器创建专用线程。

首先,添加这两个导入语句。

import threading

from rclpy.executors import ExternalShutdownException

接下来,添加名为“spin_in_background()”的顶级函数。 此函数要求默认执行器执行回调,直到某些东西将其关闭。

def spin_in_background():
    executor = rclpy.get_global_executor()
    try:
        executor.spin()
    except ExternalShutdownException:
        pass

在“main()”函数中,在调用“rclpy.init()”之后添加以下代码,以启动一个调用“spin_in_background()”的线程。

# In rospy callbacks are always called in background threads.
# Spin the executor in another thread for similar behavior in ROS 2.
t = threading.Thread(target=spin_in_background)
t.start()

最后,通过将此语句放在“main()”函数的底部,在程序结束时加入线程。

t.join()

创建节点

在 ROS 1 中,Python 脚本每个进程只能创建一个节点,API init_node() 创建它。

在 ROS 2 中,单个 Python 脚本可以创建多个节点,创建节点的 API 名为 create_node

删除对 rospy.init_node() 的调用:

rospy.init_node('talker')

添加对“rclpy.create_node()”的新调用,并将结果存储在名为“node”的变量中:

node = rclpy.create_node('talker')

我们必须告诉执行器有关这个节点的信息。 在节点创建的正下方添加以下行:

rclpy.get_global_executor().add_node(node)

创建发布者

在 ROS 1 中,用户通过实例化 Publisher 类来创建发布者。 在 ROS 2 中,用户通过节点的 create_publisher() API 创建发布者。 create_publisher() API 与 ROS 1 有一个不幸的区别:主题名称和主题类型参数被交换了。

删除 rospy.Publisher 实例的创建。

pub = rospy.Publisher('chatter', String, queue_size=10)

将其替换为对“node.create_publisher()”的调用。

pub = node.create_publisher(String, 'chatter', 10)

创建费率

在 ROS 1 中,用户直接创建“Rate”实例,而在 ROS 2 中,用户通过节点的“create_rate()”API 创建它们。

删除“rospy.Rate”实例的创建。 .. code-block:

rate = rospy.Rate(10)  # 10hz

用对“node.create_rate()”的调用替换它。

rate = node.create_rate(10)  # 10hz

rclpy.ok() 上循环

在 ROS 1 中,rospy.is_shutdown() API 指示是否已要求关闭进程。

在 ROS 2 中,rclpy.ok() API 执行此操作。

删除语句 not rospy.is_shutdown()

while not rospy.is_shutdown():

用对“rclpy.ok()”的调用替换它。

while rclpy.ok():

创建一个包含当前时间的“字符串”消息

您必须对此行进行一些更改

hello_str = "hello world %s" % rospy.get_time()

在 ROS 2 中,您需要:

从获取时间开始。

ROS 2 节点有一个 Clock 实例。

将对 rospy.get_time() 的调用替换为 node.get_clock().now(),以从节点的时钟获取当前时间。

接下来,将 % 的使用替换为 f-string:f'hello world {node.get_clock().now()}'

最后,实例化一个 std_msgs.msg.String() 实例并将上述内容分配给该实例的 data 属性。 您的最终代码应如下所示:

hello_str = String()
hello_str.data = f'hello world {node.get_clock().now()}'

记录信息性消息

在 ROS 2 中,您必须通过“Logger”实例发送日志消息,并且节点有一个。

删除对“rospy.loginfo()”的调用。

rospy.loginfo(hello_str)

将其替换为节点的“Logger”实例上的“info()”调用。

node.get_logger().info(hello_str.data)

这是对“src/talker_py/__init__.py”的最后一次更改。 您的文件应如下所示:

import threading

import rclpy
from rclpy.executors import ExternalShutdownException
from std_msgs.msg import String


def spin_in_background():
    executor = rclpy.get_global_executor()
    try:
        executor.spin()
    except ExternalShutdownException:
        pass


def main():
    rclpy.init()
    # In rospy callbacks are always called in background threads.
    # Spin the executor in another thread for similar behavior in ROS 2.
    t = threading.Thread(target=spin_in_background)
    t.start()

    node = rclpy.create_node('talker')
    rclpy.get_global_executor().add_node(node)
    pub = node.create_publisher(String, 'chatter', 10)
    rate = node.create_rate(10)  # 10hz

    while rclpy.ok():
        hello_str = String()
        hello_str.data = f'hello world {node.get_clock().now()}'
        node.get_logger().info(hello_str.data)
        pub.publish(hello_str)
        rate.sleep()

    t.join()

构建并运行“talker_py_node”

创建三个终端:

  1. 一个用于构建“talker_py”

  2. 一个用于运行“talker_py_node”

  3. 一个用于回显“talker_py_node”发布的消息

在第一个终端中构建工作区。

cd ~/ros2_talker_py
. /opt/ros/rolling/setup.bash
colcon build

在第二个终端中获取您的工作区,并运行“talker_py_node”。

cd ~/ros2_talker_py
. install/setup.bash
ros2 run talker_py talker_py_node

在第三个终端中回显节点发布的消息:

. /opt/ros/rolling/setup.bash
ros2 topic echo /chatter

您应该看到在第二个终端中发布的带有当前时间的消息,并在第三个终端中收到相同的消息。

重构代码以使用 ROS 2 约定

您已成功将 ROS 1 Python 包迁移到 ROS 2! 现在您已经可以正常工作了,请考虑重构它以更好地与 ROS 2 的 Python API 保持一致。 遵循以下两个原则。

  • 创建一个从“Node”继承的类。

  • 在回调中完成所有工作,并且永远不要阻止这些回调。

例如,创建一个从“Node”继承的“Talker”类。 至于在回调中执行工作,请使用带有回调的“Timer”,而不是“rate.sleep()”。 让计时器回调发布消息并返回。 让 main() 创建 Talker 实例,而不是使用 rclpy.create_node(),并让执行器在主线程中执行。

重构后的代码可能如下所示:

import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_msgs.msg import String


class Talker(Node):

    def __init__(self, **kwargs):
        super().__init__('talker', **kwargs)

        self._pub = self.create_publisher(String, 'chatter', 10)
        self._timer = self.create_timer(1 / 10, self.do_publish)

    def do_publish(self):
        hello_str = String()
        hello_str.data = f'hello world {self.get_clock().now()}'
        self.get_logger().info(hello_str.data)
        self._pub.publish(hello_str)


def main():
    rclpy.init()
    try:
        rclpy.spin(Talker())
    except (ExternalShutdownException, KeyboardInterrupt):
        pass
    finally:
        rclpy.try_shutdown()

结论

您已经了解了如何将示例 Python ROS 1 包迁移到 ROS 2。 从现在开始,在迁移您自己的 Python 包时,请参考 迁移 Python 包参考页