使用回调组
在多线程执行器中运行节点时,ROS 2 提供回调 组作为控制不同回调执行的工具。 本页旨在指导如何有效使用回调组。 假设读者对 executors 的概念有基本的了解。
回调组基础知识
在多线程执行器中运行节点时,ROS 2 提供两种不同类型的回调组来控制回调的执行:
互斥回调组
可重入回调组
这些回调组以不同的方式限制其回调的执行。 简而言之:
互斥回调组可防止其回调被并行执行 - 本质上使组中的回调好像由 SingleThreadedExecutor 执行。
可重入回调组允许执行器以任何其认为合适的方式安排和执行组的回调,而不受限制。
这意味着,除了不同的回调可以并行运行之外,同一回调的不同实例也可以同时执行。 * 属于不同回调组(任何类型)的回调始终可以彼此并行执行。
还需记住的是,不同的 ROS 2 实体会将其回调组中继到它们生成的所有回调。
例如,如果将回调组分配给操作客户端,则该客户端创建的所有回调都将分配给该回调组。
回调组可以通过 rclcpp 中节点的 create_callback_group
函数创建,也可以通过调用 rclpy 中组的构造函数创建。
然后可以在创建订阅、计时器等时将回调组作为参数/选项传递。
my_callback_group = create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
rclcpp::SubscriptionOptions options;
options.callback_group = my_callback_group;
my_subscription = create_subscription<Int32>("/topic", rclcpp::SensorDataQoS(),
callback, options);
my_callback_group = MutuallyExclusiveCallbackGroup()
my_subscription = self.create_subscription(Int32, "/topic", self.callback, qos_profile=1,
callback_group=my_callback_group)
如果用户在创建订阅、计时器等时未指定任何回调组,则此实体将被分配给节点的默认回调组。 默认回调组是互斥回调组,可以通过 rclcpp 中的“NodeBaseInterface::get_default_callback_group()”和 rclpy 中的“Node.default_callback_group”进行查询。 关于回调 ^^^^^^^^^^^^^^^ 在 ROS 2 和执行器的上下文中,回调是指由执行器处理其调度和执行的函数。 此上下文中的回调示例包括 * 订阅回调(接收和处理来自主题的数据)、 * 计时器回调、 * 服务回调(用于在服务器中执行服务请求)、 * 操作服务器和客户端中的不同回调、 * Futures 的完成回调。 以下是在使用回调组时应牢记的有关回调的几个要点。
ROS 2 中的几乎所有内容都是回调!
执行器运行的每个函数,根据定义,都是回调。 ROS 2 系统中的非回调函数主要位于系统的边缘(用户和传感器输入等)。 * 有时回调是隐藏的,从用户/开发人员 API 来看,它们的存在可能并不明显。 对于任何类型的对服务或操作(在 rclpy 中)的“同步”调用,尤其如此。 例如,对服务的同步调用“Client.call(request)”会添加一个 Future 的 done-callback,该回调需要在函数调用执行期间执行,但此回调对用户不直接可见。
控制执行
为了使用回调组控制执行,可以考虑以下准则。
对于单个回调与自身的交互:
如果它应该与自身并行执行,则将其注册到可重入回调组。
一个示例案例可能是需要能够 并行处理多个操作调用的操作/服务服务器。
如果它**永远**不应该与自身并行执行,则将其注册到互斥回调组。
一个示例案例可能是运行发布控制命令的控制循环的计时器回调。
对于不同回调之间的交互:
如果它**永远**不应该并行执行,则将它们注册到同一个互斥回调组。
一个示例案例可能是回调正在访问共享的关键和非线程安全资源。
如果它们应该并行执行,您有两个选择, 取决于各个回调是否应该能够重叠:
将它们注册到不同的互斥回调组(各个回调不重叠)
将它们注册到可重入回调组(各个回调重叠)
并行运行不同回调的一个示例是具有同步服务客户端和调用此服务的计时器的节点。 请参阅下面的详细示例。
避免死锁
错误地设置节点的回调组可能会导致死锁(或其他不良行为),尤其是当希望使用同步调用服务或操作时。 事实上,甚至 ROS 2 的 API 文档也提到,不应在回调中同步调用操作或服务,因为这可能会导致死锁。 虽然使用异步调用在这方面确实更安全,但同步调用也可以工作。 另一方面,同步调用也有其优点,例如使代码更简单、更容易理解。 因此,本节提供了一些有关如何正确设置节点回调组以避免死锁的指南。 这里要注意的第一件事是,每个节点的默认回调组都是互斥回调组。 如果用户在创建计时器、订阅、客户端等时未指定任何其他回调组,则这些实体当时或之后创建的任何回调都将使用节点的默认回调组。 此外,如果节点中的所有内容都使用相同的互斥回调组,则该节点本质上就像是由单线程执行器处理的一样,即使指定了多线程执行器也是如此!因此,每当决定使用多线程执行器时,都应始终指定一些回调组,以便执行器选择有意义。考虑到上述情况,以下是一些有助于避免死锁的指导原则:* 如果您在任何类型的回调中进行同步调用,则此回调和进行调用的客户端需要属于不同的回调组(任何类型),或可重入回调组。如果由于其他要求(例如线程安全和/或在等待结果时阻止其他回调)而无法实现上述配置(或者如果您想绝对确保永远不会出现死锁),请使用异步调用。第一点失败将始终导致死锁。这种情况的一个例子是在计时器回调中进行同步服务调用(请参阅下一节的示例)。
示例
让我们看一些不同回调组设置的简单示例。 以下演示代码考虑在计时器回调中同步调用服务。
演示代码
我们有两个节点 - 一个提供简单服务:
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_srvs/srv/empty.hpp"
using namespace std::placeholders;
namespace cb_group_demo
{
class ServiceNode : public rclcpp::Node
{
public:
ServiceNode() : Node("service_node")
{
service_ptr_ = this->create_service<std_srvs::srv::Empty>(
"test_service",
std::bind(&ServiceNode::service_callback, this, _1, _2, _3)
);
}
private:
rclcpp::Service<std_srvs::srv::Empty>::SharedPtr service_ptr_;
void service_callback(
const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<std_srvs::srv::Empty::Request> request,
const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{
(void)request_header;
(void)request;
(void)response;
RCLCPP_INFO(this->get_logger(), "Received request, responding...");
}
}; // class ServiceNode
} // namespace cb_group_demo
int main(int argc, char* argv[])
{
rclcpp::init(argc, argv);
auto service_node = std::make_shared<cb_group_demo::ServiceNode>();
RCLCPP_INFO(service_node->get_logger(), "Starting server node, shut down with CTRL-C");
rclcpp::spin(service_node);
RCLCPP_INFO(service_node->get_logger(), "Keyboard interrupt, shutting down.\n");
rclcpp::shutdown();
return 0;
}
import rclpy
from rclpy.node import Node
from std_srvs.srv import Empty
class ServiceNode(Node):
def __init__(self):
super().__init__('service_node')
self.srv = self.create_service(Empty, 'test_service', callback=self.service_callback)
def service_callback(self, request, result):
self.get_logger().info('Received request, responding...')
return result
if __name__ == '__main__':
rclpy.init()
node = ServiceNode()
try:
node.get_logger().info("Starting server node, shut down with CTRL-C")
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard interrupt, shutting down.\n')
node.destroy_node()
rclpy.shutdown()
另一个包含服务客户端以及用于进行服务调用的计时器:
Note: The API of service client in rclcpp does not offer a synchronous call method similar to the one in rclpy, so we wait on the future object to simulate the effect of a synchronous call.
#include <chrono>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_srvs/srv/empty.hpp"
using namespace std::chrono_literals;
namespace cb_group_demo
{
class DemoNode : public rclcpp::Node
{
public:
DemoNode() : Node("client_node")
{
client_cb_group_ = nullptr;
timer_cb_group_ = nullptr;
client_ptr_ = this->create_client<std_srvs::srv::Empty>("test_service", rmw_qos_profile_services_default,
client_cb_group_);
timer_ptr_ = this->create_wall_timer(1s, std::bind(&DemoNode::timer_callback, this),
timer_cb_group_);
}
private:
rclcpp::CallbackGroup::SharedPtr client_cb_group_;
rclcpp::CallbackGroup::SharedPtr timer_cb_group_;
rclcpp::Client<std_srvs::srv::Empty>::SharedPtr client_ptr_;
rclcpp::TimerBase::SharedPtr timer_ptr_;
void timer_callback()
{
RCLCPP_INFO(this->get_logger(), "Sending request");
auto request = std::make_shared<std_srvs::srv::Empty::Request>();
auto result_future = client_ptr_->async_send_request(request);
std::future_status status = result_future.wait_for(10s); // timeout to guarantee a graceful finish
if (status == std::future_status::ready) {
RCLCPP_INFO(this->get_logger(), "Received response");
}
}
}; // class DemoNode
} // namespace cb_group_demo
int main(int argc, char* argv[])
{
rclcpp::init(argc, argv);
auto client_node = std::make_shared<cb_group_demo::DemoNode>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(client_node);
RCLCPP_INFO(client_node->get_logger(), "Starting client node, shut down with CTRL-C");
executor.spin();
RCLCPP_INFO(client_node->get_logger(), "Keyboard interrupt, shutting down.\n");
rclcpp::shutdown();
return 0;
}
import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup
from rclpy.node import Node
from std_srvs.srv import Empty
class CallbackGroupDemo(Node):
def __init__(self):
super().__init__('client_node')
client_cb_group = None
timer_cb_group = None
self.client = self.create_client(Empty, 'test_service', callback_group=client_cb_group)
self.call_timer = self.create_timer(1, self._timer_cb, callback_group=timer_cb_group)
def _timer_cb(self):
self.get_logger().info('Sending request')
_ = self.client.call(Empty.Request())
self.get_logger().info('Received response')
if __name__ == '__main__':
rclpy.init()
node = CallbackGroupDemo()
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
node.get_logger().info('Beginning client, shut down with CTRL-C')
executor.spin()
except KeyboardInterrupt:
node.get_logger().info('Keyboard interrupt, shutting down.\n')
node.destroy_node()
rclpy.shutdown()
客户端节点的构造函数包含用于设置服务客户端和计时器的回调组的选项。
使用上面的默认设置(均为 nullptr
/ None
),
计时器和客户端都将使用节点的默认互斥回调组。
问题
由于我们使用 1 秒计时器进行服务调用, 预期结果是服务每秒被调用一次, 客户端始终获得响应并打印“收到响应”。 如果我们尝试在终端中运行服务器和客户端节点, 我们会得到以下输出。
[INFO] [1653034371.758739131] [client_node]: Starting client node, shut down with CTRL-C
[INFO] [1653034372.755865649] [client_node]: Sending request
^C[INFO] [1653034398.161674869] [client_node]: Keyboard interrupt, shutting down.
[INFO] [1653034355.308958238] [service_node]: Starting server node, shut down with CTRL-C
[INFO] [1653034372.758197320] [service_node]: Received request, responding...
^C[INFO] [1653034416.021962246] [service_node]: Keyboard interrupt, shutting down.
因此,结果不是重复调用服务,而是永远收不到第一次调用的响应,此后客户端节点似乎卡住了,无法进行进一步的调用。也就是说,执行因死锁而停止!
原因是计时器回调和客户端使用相同的互斥回调组(节点的默认值)。 当进行服务调用时,客户端将其回调组传递给 Future 对象(隐藏在 Python 版本的调用方法中),该对象的 done-callback 需要执行才能获得服务调用的结果。 但是由于此 done-callback 和计时器回调位于同一个互斥组中,并且计时器回调仍在执行(等待服务调用的结果),因此 done-callback 永远不会执行。 卡住的计时器回调还会阻止其自身的任何其他执行,因此计时器不会第二次触发。
解决方案
我们可以轻松解决这个问题 - 例如 - 通过将计时器和客户端分配给不同的回调组。 因此,让我们将客户端节点构造函数的前两行更改为如下(其他所有内容保持不变):
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
client_cb_group = MutuallyExclusiveCallbackGroup()
timer_cb_group = MutuallyExclusiveCallbackGroup()
现在我们得到了预期的结果,即计时器重复触发并且 每次服务调用都会得到应有的结果:
[INFO] [1653067523.431731177] [client_node]: Starting client node, shut down with CTRL-C
[INFO] [1653067524.431912821] [client_node]: Sending request
[INFO] [1653067524.433230445] [client_node]: Received response
[INFO] [1653067525.431869330] [client_node]: Sending request
[INFO] [1653067525.432912803] [client_node]: Received response
[INFO] [1653067526.431844726] [client_node]: Sending request
[INFO] [1653067526.432893954] [client_node]: Received response
[INFO] [1653067527.431828287] [client_node]: Sending request
[INFO] [1653067527.432848369] [client_node]: Received response
^C[INFO] [1653067528.400052749] [client_node]: Keyboard interrupt, shutting down.
[INFO] [1653067522.052866001] [service_node]: Starting server node, shut down with CTRL-C
[INFO] [1653067524.432577720] [service_node]: Received request, responding...
[INFO] [1653067525.432365009] [service_node]: Received request, responding...
[INFO] [1653067526.432300261] [service_node]: Received request, responding...
[INFO] [1653067527.432272441] [service_node]: Received request, responding...
^C[INFO] [1653034416.021962246] [service_node]: KeyboardInterrupt, shutting down.
有人可能会考虑,仅仅避免节点的默认回调组是否就足够了。 事实并非如此:用不同的互斥组替换默认组不会改变任何事情。 因此,以下配置也会导致先前发现的死锁。
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_cb_group_ = client_cb_group_;
client_cb_group = MutuallyExclusiveCallbackGroup()
timer_cb_group = client_cb_group
事实上,在这种情况下,一切正常运行的确切条件是 计时器和客户端不能属于同一个 互斥组。 因此,以下所有配置(以及其他一些配置) 都会产生所需的结果,即计时器重复触发 并且服务调用完成。
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
timer_cb_group_ = client_cb_group_;
or
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_cb_group_ = nullptr;
or
client_cb_group_ = nullptr;
timer_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
or
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
timer_cb_group_ = nullptr;
client_cb_group = ReentrantCallbackGroup()
timer_cb_group = client_cb_group
or
client_cb_group = MutuallyExclusiveCallbackGroup()
timer_cb_group = None
or
client_cb_group = None
timer_cb_group = MutuallyExclusiveCallbackGroup()
or
client_cb_group = ReentrantCallbackGroup()
timer_cb_group = None