建立高效的进程内通信
背景
ROS 应用程序通常由执行特定任务并与系统其他部分分离的单个“节点”组成。 这有利于故障隔离、更快的开发、模块化和代码重用,但通常以牺牲性能为代价。 在 ROS 1 最初开发之后,对节点高效组合的需求变得显而易见,并开发了 Nodelet。 在 ROS 2 中,我们旨在通过解决一些需要重组节点的基本问题来改进 Nodelet 的设计。
在此演示中,我们将重点介绍如何手动组合节点,方法是单独定义节点,但将它们组合在不同的过程布局中,而无需更改节点的代码或限制其功能。
安装演示
有关安装 ROS 2 的详细信息,请参阅 安装说明。
如果您已从软件包安装了 ROS 2,请确保已安装 ros-rolling-intra-process-demo
。
如果您下载了存档或从源代码构建了 ROS 2,它将已成为安装的一部分。
运行和理解演示
有几个不同的演示:一些是玩具问题,旨在突出进程内通信功能的特性,一些是使用 OpenCV 的端到端示例,并展示了将节点重新组合成不同配置的能力。
双节点管道演示
此演示旨在展示进程内发布/订阅连接在使用 std::unique_ptr
s 发布和订阅时可实现消息的零拷贝传输。
首先让我们看一下源代码:
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"
using namespace std::chrono_literals;
// Node that produces messages.
struct Producer : public rclcpp::Node
{
Producer(const std::string & name, const std::string & output)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
{
// Create a publisher on the output topic.
pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10);
std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;
// Create a timer which publishes on the output topic at ~1Hz.
auto callback = [captured_pub]() -> void {
auto pub_ptr = captured_pub.lock();
if (!pub_ptr) {
return;
}
static int32_t count = 0;
std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());
msg->data = count++;
printf(
"Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
pub_ptr->publish(std::move(msg));
};
timer_ = this->create_wall_timer(1s, callback);
}
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
rclcpp::TimerBase::SharedPtr timer_;
};
// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
Consumer(const std::string & name, const std::string & input)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
{
// Create a subscription on the input topic which prints on receipt of new messages.
sub_ = this->create_subscription<std_msgs::msg::Int32>(
input,
10,
[](std_msgs::msg::Int32::UniquePtr msg) {
printf(
" Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
});
}
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
};
int main(int argc, char * argv[])
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
rclcpp::init(argc, argv);
rclcpp::executors::SingleThreadedExecutor executor;
auto producer = std::make_shared<Producer>("producer", "number");
auto consumer = std::make_shared<Consumer>("consumer", "number");
executor.add_node(producer);
executor.add_node(consumer);
executor.spin();
rclcpp::shutdown();
return 0;
}
通过查看 main
函数,您可以看到,我们有一个生产者节点和一个消费者节点,我们将它们添加到单线程执行器中,然后调用 spin。
如果您查看 Producer
结构中“生产者”节点的实现,您会发现我们创建了一个在“number”主题上发布的发布者,以及一个定期创建新消息、打印出其内存中的地址及其内容的值然后发布它的计时器。
“消费者”节点稍微简单一些,您可以在 Consumer
结构中看到它的实现,因为它只订阅“number”主题并打印它收到的消息的地址和值。
预期是生产者将打印出地址和值,消费者将打印出匹配的地址和值。
这表明进程内通信确实有效,并且避免了不必要的复制,至少对于简单的图表而言。
让我们通过执行“ros2 run intra_process_demo two_node_pipeline”可执行文件来运行演示(不要忘记先获取安装文件):
$ ros2 run intra_process_demo two_node_pipeline
Published message with value: 0, and address: 0x7fb02303faf0
Published message with value: 1, and address: 0x7fb020cf0520
Received message with value: 1, and address: 0x7fb020cf0520
Published message with value: 2, and address: 0x7fb020e12900
Received message with value: 2, and address: 0x7fb020e12900
Published message with value: 3, and address: 0x7fb020cf0520
Received message with value: 3, and address: 0x7fb020cf0520
Published message with value: 4, and address: 0x7fb020e12900
Received message with value: 4, and address: 0x7fb020e12900
Published message with value: 5, and address: 0x7fb02303cea0
Received message with value: 5, and address: 0x7fb02303cea0
[...]
您会注意到,消息以每秒一条的速度发送。 这是因为我们告诉计时器以每秒一次的速度触发。
您可能还注意到,第一条消息(值为“0”)没有相应的“已接收消息…”行。 这是因为发布/订阅是“尽力而为”的,并且我们没有启用任何“锁定”之类的行为。 这意味着,如果发布者在建立订阅之前发布消息,则订阅将不会收到该消息。 这种竞争条件可能导致前几条消息丢失。 在这种情况下,由于它们每秒只发送一次,因此通常只有第一条消息丢失。
最后,您可以看到具有相同值的“已发布消息…”和“已接收消息…”行也具有相同的地址。
这表明接收的消息的地址与发布的地址相同,并且它不是副本。
这是因为我们使用 std::unique_ptr
s 发布和订阅,这允许在系统中安全地移动消息的所有权。
您也可以使用 const &
和 std::shared_ptr
发布和订阅,但在这种情况下不会发生零拷贝。
循环管道演示
此演示与上一个演示类似,但生产者不是为每次迭代创建新消息,而是仅使用一个消息实例。 这是通过在图中创建一个循环并通过在旋转执行器之前从外部使其中一个节点发布来“启动”通信来实现的:
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"
using namespace std::chrono_literals;
// This node receives an Int32, waits 1 second, then increments and sends it.
struct IncrementerPipe : public rclcpp::Node
{
IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
{
// Create a publisher on the output topic.
pub = this->create_publisher<std_msgs::msg::Int32>(out, 10);
std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;
// Create a subscription on the input topic.
sub = this->create_subscription<std_msgs::msg::Int32>(
in,
10,
[captured_pub](std_msgs::msg::Int32::UniquePtr msg) {
auto pub_ptr = captured_pub.lock();
if (!pub_ptr) {
return;
}
printf(
"Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
printf(" sleeping for 1 second...\n");
if (!rclcpp::sleep_for(1s)) {
return; // Return if the sleep failed (e.g. on :kbd:`ctrl-c`).
}
printf(" done.\n");
msg->data++; // Increment the message's data.
printf(
"Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
pub_ptr->publish(std::move(msg)); // Send the message along to the output topic.
});
}
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;
};
int main(int argc, char * argv[])
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
rclcpp::init(argc, argv);
rclcpp::executors::SingleThreadedExecutor executor;
// Create a simple loop by connecting the in and out topics of two IncrementerPipe's.
// The expectation is that the address of the message being passed between them never changes.
auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");
auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");
rclcpp::sleep_for(1s); // Wait for subscriptions to be established to avoid race conditions.
// Publish the first message (kicking off the cycle).
std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());
msg->data = 42;
printf(
"Published first message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
pipe1->pub->publish(std::move(msg));
executor.add_node(pipe1);
executor.add_node(pipe2);
executor.spin();
rclcpp::shutdown();
return 0;
}
与上一个演示不同,此演示仅使用一个节点,使用不同的名称和配置实例化两次。
图最终为“pipe1”->“pipe2”->“pipe1”……循环。
“pipe1->pub->publish(msg);”行启动该过程,但从那时起,消息在节点之间来回传递,每个节点在其自己的订阅回调中调用发布。
这里的预期是节点每秒来回传递一次消息,每次增加消息的值。
因为消息是作为“unique_ptr”发布和订阅的,所以一开始创建的相同消息被持续使用。
为了测试这些预期,让我们运行它:
$ ros2 run intra_process_demo cyclic_pipeline
Published first message with value: 42, and address: 0x7fd2ce0a2bc0
Received message with value: 42, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0
Received message with value: 43, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0
Received message with value: 44, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0
Received message with value: 45, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0
Received message with value: 46, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0
Received message with value: 47, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
[...]
Y您应该看到每次迭代的数字都在不断增加,从 42 开始…因为 42,并且整个过程中它都会重复使用相同的消息,正如指针地址不变所示,这避免了不必要的复制。
图像管道演示
在此演示中,我们将使用 OpenCV 来捕获、注释然后查看图像。
Note
如果您使用的是 macOS,并且这些示例不起作用,或者您收到类似“ddsi_conn_write failed -1”的错误,那么您需要增加系统范围的 UDP 数据包大小:
$ sudo sysctl -w net.inet.udp.recvspace=209715
$ sudo sysctl -w net.inet.udp.maxdgram=65500
These changes will not persist after a reboot.
简单的管道
首先,我们将有一个由三个节点组成的管道,排列如下:“camera_node” -> “watermark_node” -> “image_view_node”
“camera_node”从计算机上的相机设备“0”读取数据,在图像上写入一些信息并发布。 “watermark_node”订阅“camera_node”的输出,并在发布之前添加更多文本。 最后,“image_view_node”订阅“watermark_node”的输出,向图像写入更多文本,然后使用“cv::imshow”将其可视化。
在每个节点中,正在发送的消息或已接收的消息的地址(或两者)都写入图像。 水印和图像视图节点旨在修改图像而不复制它,因此只要节点处于同一个进程中并且图形仍然按照上面概述的管道组织,那么印在图像上的地址应该都是相同的。
Note
在某些系统上(我们在 Linux 上看到过这种情况),打印到屏幕上的地址可能不会改变。
这是因为相同的唯一指针被重复使用。在这种情况下,管道仍在运行。
让我们通过执行以下可执行文件来运行该演示:
ros2 run intra_process_demo image_pipeline_all_in_one
你应该看到类似这样的内容:
您可以通过按空格键暂停图像渲染,然后再次按空格键恢复。 您也可以按“q”或“ESC”退出。
如果暂停图像查看器,您应该能够比较图像上写的地址并发现它们是相同的。
带有两个图像查看器的管道
现在让我们看一个与上面类似的示例,只是它有两个图像视图节点。 所有节点仍在同一个进程中,但现在应该会显示两个图像视图窗口。(macOS 用户请注意:您的图像视图窗口可能彼此重叠)。 让我们使用以下命令运行它:
ros2 run intra_process_demo image_pipeline_with_two_image_view
就像上一个示例一样,您可以使用空格键暂停渲染,然后再次按空格键继续。您可以停止更新以检查写入屏幕的指针。
如您在上面的示例图中看到的,我们有一张图像,其中所有指针都相同,然后另一张图像的前两个条目的指针与第一张图像相同,但第二张图像上的最后一个指针不同。要了解为什么会发生这种情况,请考虑图的拓扑结构:
camera_node -> watermark_node -> image_view_node
-> image_view_node2
“camera_node” 和 “watermark_node” 之间的链接可以使用相同的指针而无需复制,因为只有一个进程内订阅应该向其传递消息。但是对于 “watermark_node” 和两个图像视图节点之间的链接,关系是一对多的,因此如果图像视图节点使用 “unique_ptr” 回调,那么就不可能将相同指针的所有权传递给两者。但是,它可以传递给其中一个。哪一个会获得原始指针尚未定义,而只是最后一个被传递。
请注意,图像视图节点未使用 “unique_ptr” 回调订阅。相反,它们使用 “const shared_ptr”s 订阅。这意味着系统向两个回调传递相同的 “shared_ptr”。当处理第一个进程内订阅时,内部存储的 “unique_ptr” 被提升为 “shared_ptr”。每个回调都将获得同一条消息的共享所有权。
带有进程间查看器的管道
另一个需要正确处理的重要事项是避免在进行进程间订阅时中断进程内零拷贝行为。为了测试这一点,我们可以运行第一个图像管道演示“image_pipeline_all_in_one”,然后运行独立“image_view_node”的实例(不要忘记在终端中为其添加前缀“ros2 run intra_process_demo”)。这将看起来像这样:
很难同时暂停两个图像,因此图像可能不会对齐,但需要注意的重要一点是“image_pipeline_all_in_one”图像视图显示每个步骤的相同地址。这意味着即使订阅了外部视图,进程内零拷贝也会保留。您还可以看到,进程间图像视图的前两行文本具有不同的进程 ID,第三行文本具有独立图像查看器的进程 ID。