迁移 Python 包示例
本指南介绍如何将示例 Python 包从 ROS 1 迁移到 ROS 2。
先决条件
您需要一个可运行的 ROS 2 安装,例如 ROS rolling。
ROS 1 代码
您将不会在本指南中使用 catkin,因此您不需要可运行的 ROS 1 安装。 您将改用 ROS 2 的构建工具 Colcon。
本节为您提供 ROS 1 Python 包的代码。
该包名为 talker_py
,它有一个名为 talker_py_node
的节点。
为了便于以后运行 Colcon,这些说明让您在 Colcon 工作区 内创建包,
首先,在 ~/ros2_talker_py
创建一个文件夹作为 Colcon 工作区的根目录。
mkdir -p ~/ros2_talker_py/src
mkdir -p ~/ros2_talker_py/src
md \ros2_talker_py\src
接下来,创建 ROS 1 包的文件。
cd ~/ros2_talker_py
mkdir -p src/talker_py/src/talker_py
mkdir -p src/talker_py/scripts
touch src/talker_py/package.xml
touch src/talker_py/CMakeLists.txt
touch src/talker_py/src/talker_py/__init__.py
touch src/talker_py/scripts/talker_py_node
touch src/talker_py/setup.py
cd ~/ros2_talker_py
mkdir -p src/talker_py/src/talker_py
mkdir -p src/talker_py/scripts
touch src/talker_py/package.xml
touch src/talker_py/CMakeLists.txt
touch src/talker_py/src/talker_py/__init__.py
touch src/talker_py/scripts/talker_py_node
touch src/talker_py/setup.py
cd \ros2_talker_py
md src\talker_py\src\talker_py
md src\talker_py\scripts
type nul > src\talker_py\package.xml
type nul > src\talker_py\CMakeLists.txt
type nul > src\talker_py\src\talker_py\__init__.py
type nul > src\talker_py\scripts/talker_py_node
type nul > src\talker_py\setup.py
将以下内容放入每个文件中。
src/talker_py/package.xml
:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>talker_py</name>
<version>1.0.0</version>
<description>The talker_py package</description>
<maintainer email="gerkey@example.com">Brian Gerkey</maintainer>
<license>BSD</license>
<buildtool_depend>catkin</buildtool_depend>
<depend>rospy</depend>
<depend>std_msgs</depend>
</package>
src/talker_py/CMakeLists.txt
:
cmake_minimum_required(VERSION 3.0.2)
project(talker_py)
find_package(catkin REQUIRED)
catkin_python_setup()
catkin_package()
catkin_install_python(PROGRAMS
scripts/talker_py_node
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
src/talker/src/talker_py/__init__.py
:
import rospy
from std_msgs.msg import String
def main():
rospy.init_node('talker')
pub = rospy.Publisher('chatter', String, queue_size=10)
rate = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
src/talker_py/scripts/talker_py_node
:
#!/usr/bin/env python
import talker_py
if __name__ == '__main__':
talker_py.main()
src/talker_py/setup.py
:
from setuptools import setup
from catkin_pkg.python_setup import generate_distutils_setup
setup_args = generate_distutils_setup(
packages=['talker_py'],
package_dir={'': 'src'}
)
setup(**setup_args)
这是完整的 ROS 1 Python 包。
迁移 package.xml
将包迁移到 ROS 2 时,请先迁移构建系统文件,以便您可以随时构建和运行代码来检查您的工作。
始终从迁移您的 package.xml
开始。
首先,ROS 2 不使用 catkin
。
删除其上的 <buildtool_depend>
。
<!-- delete this -->
<buildtool_depend>catkin</buildtool_depend>
接下来,ROS 2 使用“rclpy”而不是“rospy”。 删除对“rospy”的依赖。
<!-- Delete this -->
<depend>rospy</depend>
用对“rclpy”的新依赖项替换它。
<depend>rclpy</depend>
添加一个“<export>”部分来告诉 ROS 2 的构建工具“Colcon <https://colcon.readthedocs.io/>”这是一个“ament_python”包而不是“catkin”包。
<export>
<build_type>ament_python</build_type>
</export>
您的“package.xml”已完全迁移。 它现在应如下所示:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>talker_py</name>
<version>1.0.0</version>
<description>The talker_py package</description>
<maintainer email="gerkey@example.com">Brian Gerkey</maintainer>
<license>BSD</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
删除``CMakeLists.txt``
ROS 2 中的 Python 包不使用 CMake,因此请删除``CMakeLists.txt``。
迁移``setup.py``
setup.py
中``setup()`` 的参数不再能通过``catkin_pkg`` 自动生成。
您必须手动传递这些参数,这意味着您的``package.xml`` 会有一些重复。
首先删除来自``catkin_pkg`` 的导入。
# Delete this
from catkin_pkg.python_setup import generate_distutils_setup
将传递给“generate_distutils_setup()”的所有参数移至对“setup()”的调用,然后添加“install_requires”和“zip_safe”参数。 您对“setup()”的调用应如下所示:
setup(
packages=['talker_py'],
package_dir={'': 'src'},
install_requires=['setuptools'],
zip_safe=True,
)
删除对“generate_distutils_setup()”的调用。
# Delete this
setup_args = generate_distutils_setup(
packages=['talker_py'],
package_dir={'': 'src'}
)
对 setup()
的调用需要从 package.xml
复制一些 附加元数据 <https://docs.python.org/3.11/distutils/setupscript.html#additional-meta-data>
:
通过
name
参数指定包名称通过
version
参数指定包版本通过
maintainer
和maintainer_email
参数指定维护者通过
description
参数指定描述通过
license
参数指定许可证
包名称将被多次使用。
在对 setup()
的调用上方创建一个名为 package_name
的变量。
package_name = 'talker_py'
将所有剩余信息复制到“setup.py”中“setup()”的参数中。 您对“setup()”的调用应如下所示:
setup(
name=package_name,
version='1.0.0',
install_requires=['setuptools'],
zip_safe=True,
packages=['talker_py'],
package_dir={'': 'src'},
maintainer='Brian Gerkey',
maintainer_email='gerkey@example.com',
description='The talker_py package',
license='BSD',
)
ROS 2 软件包必须安装两个数据文件:
package.xml
软件包标记文件
您的软件包已经有一个 package.xml
。
它描述了您的软件包的依赖关系。
软件包标记文件会告诉 ros2 run
等工具在哪里找到您的软件包。
在 package.xml
旁边创建一个名为 resource
的目录。
在 resource
目录中创建一个与软件包同名的空文件。
mkdir resource
touch resource/talker_py
mkdir resource
touch resource/talker_py
md resource
type nul > resource\talker_py
“setup.py” 中的“setup()”调用必须告诉“setuptools”如何安装这些文件。 将以下“data_files”参数添加到对“setup()”的调用中。
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
您的 setup.py
几乎已完成。
迁移 Python 脚本并创建 setup.cfg
ROS 2 Python 包使用 console_scripts
入口点 将 Python 脚本安装为可执行文件。
配置文件 setup.cfg
告诉 setuptools
将这些可执行文件安装在特定于包的目录中,以便 ros2 run
等工具可以找到它们。
在 package.xml
旁边创建一个 setup.cfg
文件。
touch setup.cfg
touch setup.cfg
type nul > touch setup.cfg
将下面的内容放入其中:
[develop]
script_dir=$base/lib/talker_py
[install]
install_scripts=$base/lib/talker_py
您需要使用 console_scripts
入口点来定义要安装的可执行文件。
每个条目的格式为 executable_name = some.module:function
。
第一部分指定要创建的可执行文件的名称。
第二部分指定可执行文件启动时应运行的函数。
此包需要创建一个名为 talker_py_node
的可执行文件,并且可执行文件需要调用 talker_py
模块中的函数 main
。
将以下入口点规范作为另一个参数添加到您的 setup.py
中的 setup()
。
entry_points={
'console_scripts': [
'talker_py_node = talker_py:main',
],
},
talker_py_node
文件不再需要。
删除文件 talker_py_node
并删除 scripts/
目录。
rm scripts/talker_py_node
rmdir scripts
rm scripts/talker_py_node
rmdir scripts
del scripts/talker_py_node
rd scripts
添加“console_scripts”是对您的“setup.py”的最后一次更改。 您的最终“setup.py”应如下所示:
from setuptools import setup
package_name = 'talker_py'
setup(
name=package_name,
version='1.0.0',
packages=['talker_py'],
package_dir={'': 'src'},
install_requires=['setuptools'],
zip_safe=True,
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
maintainer='Brian Gerkey',
maintainer_email='gerkey@example.com',
description='The talker_py package',
license='BSD',
entry_points={
'console_scripts': [
'talker_py_node = talker_py:main',
],
},
)
在 src/talker_py/__init__.py
中迁移 Python 代码
ROS 2 改变了许多 Python 代码的最佳实践。 首先按原样迁移代码。 在代码运行正常后,稍后重构代码会更容易。
使用 rclpy
而不是 rospy
ROS 2 软件包使用 rclpy <https://index.ros.org/p/rclpy>`__ 而不是 ``rospy
。
要使用 rclpy
,您必须做两件事:
Import
rclpy
Initialize
rclpy
删除导入“rospy”的语句。
# Remove this
import rospy
用导入“rclpy”的语句替换它。
import rclpy
在“main()”函数中添加对“rclpy.init()”的调用作为第一个语句。
def main():
# Add this line
rclpy.init()
在后台执行回调
ROS 1 和 ROS 2 都使用 callbacks。
在 ROS 1 中,回调始终在后台线程中执行,用户可以自由地使用 rate.sleep()
之类的调用来阻止主线程。
在 ROS 2 中,rclpy
使用 Executors 让用户更好地控制回调的调用位置。
在移植使用 rate.sleep()
之类的阻止调用的代码时,必须确保这些调用不会干扰执行器。
一种方法是为执行器创建专用线程。
首先,添加这两个导入语句。
import threading
from rclpy.executors import ExternalShutdownException
接下来,添加名为“spin_in_background()”的顶级函数。 此函数要求默认执行器执行回调,直到某些东西将其关闭。
def spin_in_background():
executor = rclpy.get_global_executor()
try:
executor.spin()
except ExternalShutdownException:
pass
在“main()”函数中,在调用“rclpy.init()”之后添加以下代码,以启动一个调用“spin_in_background()”的线程。
# In rospy callbacks are always called in background threads.
# Spin the executor in another thread for similar behavior in ROS 2.
t = threading.Thread(target=spin_in_background)
t.start()
最后,通过将此语句放在“main()”函数的底部,在程序结束时加入线程。
t.join()
创建节点
在 ROS 1 中,Python 脚本每个进程只能创建一个节点,API init_node()
创建它。
在 ROS 2 中,单个 Python 脚本可以创建多个节点,创建节点的 API 名为 create_node
。
删除对 rospy.init_node()
的调用:
rospy.init_node('talker')
添加对“rclpy.create_node()”的新调用,并将结果存储在名为“node”的变量中:
node = rclpy.create_node('talker')
我们必须告诉执行器有关这个节点的信息。 在节点创建的正下方添加以下行:
rclpy.get_global_executor().add_node(node)
创建发布者
在 ROS 1 中,用户通过实例化 Publisher
类来创建发布者。
在 ROS 2 中,用户通过节点的 create_publisher()
API 创建发布者。
create_publisher()
API 与 ROS 1 有一个不幸的区别:主题名称和主题类型参数被交换了。
删除 rospy.Publisher
实例的创建。
pub = rospy.Publisher('chatter', String, queue_size=10)
将其替换为对“node.create_publisher()”的调用。
pub = node.create_publisher(String, 'chatter', 10)
创建费率
在 ROS 1 中,用户直接创建“Rate”实例,而在 ROS 2 中,用户通过节点的“create_rate()”API 创建它们。
删除“rospy.Rate”实例的创建。 .. code-block:
rate = rospy.Rate(10) # 10hz
用对“node.create_rate()”的调用替换它。
rate = node.create_rate(10) # 10hz
在 rclpy.ok()
上循环
在 ROS 1 中,rospy.is_shutdown()
API 指示是否已要求关闭进程。
在 ROS 2 中,rclpy.ok()
API 执行此操作。
删除语句 not rospy.is_shutdown()
while not rospy.is_shutdown():
用对“rclpy.ok()”的调用替换它。
while rclpy.ok():
创建一个包含当前时间的“字符串”消息
您必须对此行进行一些更改
hello_str = "hello world %s" % rospy.get_time()
在 ROS 2 中,您需要:
必须从
Clock
实例获取时间应使用
f-strings <https://docs.python.org/3/reference/lexical_analysis.html#f-strings>`__ 格式化 ``str
数据,因为在活跃的 Python 版本中不鼓励使用 ``% <https://docs.python.org/3/library/stdtypes.html#printf-style-string-formatting>`__必须实例化
std_msgs.msg.String
实例
从获取时间开始。
ROS 2 节点有一个 Clock
实例。
将对 rospy.get_time()
的调用替换为 node.get_clock().now()
,以从节点的时钟获取当前时间。
接下来,将 %
的使用替换为 f-string:f'hello world {node.get_clock().now()}'
。
最后,实例化一个 std_msgs.msg.String()
实例并将上述内容分配给该实例的 data
属性。
您的最终代码应如下所示:
hello_str = String()
hello_str.data = f'hello world {node.get_clock().now()}'
记录信息性消息
在 ROS 2 中,您必须通过“Logger”实例发送日志消息,并且节点有一个。
删除对“rospy.loginfo()”的调用。
rospy.loginfo(hello_str)
将其替换为节点的“Logger”实例上的“info()”调用。
node.get_logger().info(hello_str.data)
这是对“src/talker_py/__init__.py”的最后一次更改。 您的文件应如下所示:
import threading
import rclpy
from rclpy.executors import ExternalShutdownException
from std_msgs.msg import String
def spin_in_background():
executor = rclpy.get_global_executor()
try:
executor.spin()
except ExternalShutdownException:
pass
def main():
rclpy.init()
# In rospy callbacks are always called in background threads.
# Spin the executor in another thread for similar behavior in ROS 2.
t = threading.Thread(target=spin_in_background)
t.start()
node = rclpy.create_node('talker')
rclpy.get_global_executor().add_node(node)
pub = node.create_publisher(String, 'chatter', 10)
rate = node.create_rate(10) # 10hz
while rclpy.ok():
hello_str = String()
hello_str.data = f'hello world {node.get_clock().now()}'
node.get_logger().info(hello_str.data)
pub.publish(hello_str)
rate.sleep()
t.join()
构建并运行“talker_py_node”
创建三个终端:
一个用于构建“talker_py”
一个用于运行“talker_py_node”
一个用于回显“talker_py_node”发布的消息
在第一个终端中构建工作区。
cd ~/ros2_talker_py
. /opt/ros/rolling/setup.bash
colcon build
cd ~/ros2_talker_py
. /opt/ros/rolling/setup.bash
colcon build
cd \ros2_talker_py
call C:\dev\ros2\local_setup.bat
colcon build
在第二个终端中获取您的工作区,并运行“talker_py_node”。
cd ~/ros2_talker_py
. install/setup.bash
ros2 run talker_py talker_py_node
cd ~/ros2_talker_py
. install/setup.bash
ros2 run talker_py talker_py_node
cd \ros2_talker_py
call install\setup.bat
ros2 run talker_py talker_py_node
在第三个终端中回显节点发布的消息:
. /opt/ros/rolling/setup.bash
ros2 topic echo /chatter
. /opt/ros/rolling/setup.bash
ros2 topic echo /chatter
call C:\dev\ros2\local_setup.bat
ros2 topic echo /chatter
您应该看到在第二个终端中发布的带有当前时间的消息,并在第三个终端中收到相同的消息。
重构代码以使用 ROS 2 约定
您已成功将 ROS 1 Python 包迁移到 ROS 2! 现在您已经可以正常工作了,请考虑重构它以更好地与 ROS 2 的 Python API 保持一致。 遵循以下两个原则。
创建一个从“Node”继承的类。
在回调中完成所有工作,并且永远不要阻止这些回调。
例如,创建一个从“Node”继承的“Talker”类。
至于在回调中执行工作,请使用带有回调的“Timer”,而不是“rate.sleep()”。
让计时器回调发布消息并返回。
让 main()
创建 Talker
实例,而不是使用 rclpy.create_node()
,并让执行器在主线程中执行。
重构后的代码可能如下所示:
import rclpy
from rclpy.node import Node
from rclpy.executors import ExternalShutdownException
from std_msgs.msg import String
class Talker(Node):
def __init__(self, **kwargs):
super().__init__('talker', **kwargs)
self._pub = self.create_publisher(String, 'chatter', 10)
self._timer = self.create_timer(1 / 10, self.do_publish)
def do_publish(self):
hello_str = String()
hello_str.data = f'hello world {self.get_clock().now()}'
self.get_logger().info(hello_str.data)
self._pub.publish(hello_str)
def main():
rclpy.init()
try:
rclpy.spin(Talker())
except (ExternalShutdownException, KeyboardInterrupt):
pass
finally:
rclpy.try_shutdown()
结论
您已经了解了如何将示例 Python ROS 1 包迁移到 ROS 2。 从现在开始,在迁移您自己的 Python 包时,请参考 迁移 Python 包参考页。