实现自定义内存分配器
目标:本教程将展示如何在编写 ROS 2 C++ 代码时使用自定义内存分配器。
教程级别:高级
时间:20 分钟
本教程将教您如何为发布者和订阅者集成自定义分配器,以便在 ROS 节点执行时永远不会调用默认堆分配器。 本教程的代码可在此处获取 <https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/allocator_tutorial.cpp>`__。
背景
假设您想编写实时安全代码,并且您已经听说过在实时关键部分调用“new”的诸多危险,因为大多数平台上的默认堆分配器是不确定的。
默认情况下,许多 C++ 标准库结构会在增长时隐式分配内存,例如“std::vector”。但是,这些数据结构也接受“Allocator”模板参数。如果您为这些数据结构之一指定自定义分配器,它将使用该分配器而不是系统分配器来增加或缩小数据结构。您的自定义分配器可以在堆栈上预先分配一个内存池,这可能更适合实时应用程序。
在 ROS 2 C++ 客户端库 (rclcpp) 中,我们遵循与 C++ 标准库类似的理念。发布者、订阅者和执行者接受一个 Allocator 模板参数,该参数控制该实体在执行期间所做的分配。
编写分配器
要编写与 ROS 2 的分配器接口兼容的分配器,您的分配器必须与 C++ 标准库分配器接口兼容。
C++11 库提供了称为“allocator_traits”的东西。C++11 标准规定自定义分配器只需满足一组最低要求即可用于以标准方式分配和释放内存。 allocator_traits
是一个通用结构,它基于以最低要求编写的分配器填充分配器的其他特性。
例如,以下自定义分配器的声明将满足 ``allocator_traits``(当然,您仍然需要实现此结构中声明的函数):
template <class T>
struct custom_allocator {
using value_type = T;
custom_allocator() noexcept;
template <class U> custom_allocator (const custom_allocator<U>&) noexcept;
T* allocate (std::size_t n);
void deallocate (T* p, std::size_t n);
};
template <class T, class U>
constexpr bool operator== (const custom_allocator<T>&, const custom_allocator<U>&) noexcept;
template <class T, class U>
constexpr bool operator!= (const custom_allocator<T>&, const custom_allocator<U>&) noexcept;
然后,您可以访问由“allocator_traits”填充的分配器的其他函数和成员,如下所示:“std::allocator_traits<custom_allocator<T>>::construct(…)”
要了解“allocator_traits”的全部功能,请参阅 https://en.cppreference.com/w/cpp/memory/allocator_traits 。
但是,一些仅具有部分 C++11 支持的编译器(例如 GCC 4.8)仍然需要分配器实现大量样板代码才能与标准库结构(例如向量和字符串)一起使用,因为这些结构在内部不使用“allocator_traits”。因此,如果您使用的是具有部分 C++11 支持的编译器,您的分配器将需要看起来更像这样:
template<typename T>
struct pointer_traits {
using reference = T &;
using const_reference = const T &;
};
// Avoid declaring a reference to void with an empty specialization
template<>
struct pointer_traits<void> {
};
template<typename T = void>
struct MyAllocator : public pointer_traits<T> {
public:
using value_type = T;
using size_type = std::size_t;
using pointer = T *;
using const_pointer = const T *;
using difference_type = typename std::pointer_traits<pointer>::difference_type;
MyAllocator() noexcept;
~MyAllocator() noexcept;
template<typename U>
MyAllocator(const MyAllocator<U> &) noexcept;
T * allocate(size_t size, const void * = 0);
void deallocate(T * ptr, size_t size);
template<typename U>
struct rebind {
typedef MyAllocator<U> other;
};
};
template<typename T, typename U>
constexpr bool operator==(const MyAllocator<T> &,
const MyAllocator<U> &) noexcept;
template<typename T, typename U>
constexpr bool operator!=(const MyAllocator<T> &,
const MyAllocator<U> &) noexcept;
编写示例主程序
编写有效的 C++ 分配器后,必须将其作为共享指针传递给发布者、订阅者和执行者。
auto alloc = std::make_shared<MyAllocator<void>>();
rclcpp::PublisherOptionsWithAllocator<MyAllocator<void>> publisher_options;
publisher_options.allocator = alloc;
auto publisher = node->create_publisher<std_msgs::msg::UInt32>(
"allocator_tutorial", 10, publisher_options);
rclcpp::SubscriptionOptionsWithAllocator<MyAllocator<void>> subscription_options;
subscription_options.allocator = alloc;
auto msg_mem_strat = std::make_shared<
rclcpp::message_memory_strategy::MessageMemoryStrategy<
std_msgs::msg::UInt32, MyAllocator<void>>>(alloc);
auto subscriber = node->create_subscription<std_msgs::msg::UInt32>(
"allocator_tutorial", 10, callback, subscription_options, msg_mem_strat);
std::shared_ptr<rclcpp::memory_strategy::MemoryStrategy> memory_strategy =
std::make_shared<AllocatorMemoryStrategy<MyAllocator<void>>>(alloc);
rclcpp::ExecutorOptions options;
options.memory_strategy = memory_strategy;
rclcpp::executors::SingleThreadedExecutor executor(options);
您还需要使用分配器来分配沿执行代码路径传递的所有消息。
auto alloc = std::make_shared<MyAllocator<void>>();
一旦实例化了节点并将执行器添加到节点,就该旋转了:
uint32_t i = 0;
while (rclcpp::ok()) {
msg->data = i;
i++;
publisher->publish(msg);
rclcpp::sleep_for(std::chrono::milliseconds(1));
executor.spin_some();
}
将分配器传递给进程内管道
尽管我们在同一进程中实例化了发布者和订阅者,但我们尚未使用进程内管道。
IntraProcessManager 是一个通常对用户隐藏的类,但为了将自定义分配器传递给它,我们需要通过从 rclcpp 上下文中获取它来公开它。 IntraProcessManager 使用了几个标准库结构,因此如果没有自定义分配器,它将调用默认的 new。
auto context = rclcpp::contexts::get_global_default_context();
auto options = rclcpp::NodeOptions()
.context(context)
.use_intra_process_comms(true);
auto node = rclcpp::Node::make_shared("allocator_example", options);
确保在以这种方式构造节点之后实例化发布者和订阅者。
测试和验证代码
您如何知道您的自定义分配器实际上被调用了?
显而易见的做法是计算对自定义分配器的“allocate”和“deallocate”函数的调用次数,并将其与对“new”和“delete”的调用进行比较。
向自定义分配器添加计数很容易:
T * allocate(size_t size, const void * = 0) {
// ...
num_allocs++;
// ...
}
void deallocate(T * ptr, size_t size) {
// ...
num_deallocs++;
// ...
}
您还可以覆盖全局的 new 和 delete 操作符:
void operator delete(void * ptr) noexcept {
if (ptr != nullptr) {
if (is_running) {
global_runtime_deallocs++;
}
std::free(ptr);
ptr = nullptr;
}
}
void operator delete(void * ptr, size_t) noexcept {
if (ptr != nullptr) {
if (is_running) {
global_runtime_deallocs++;
}
std::free(ptr);
ptr = nullptr;
}
}
其中我们正在增加的变量只是全局静态整数,而“is_running”是一个全局静态布尔值,在调用“spin”之前切换。
示例可执行文件 打印变量的值。要运行示例可执行文件,请使用:
ros2 run demo_nodes_cpp allocator_tutorial
或者,使用进程内管道运行示例:
ros2 run demo_nodes_cpp allocator_tutorial intra
您应该得到如下数字:
Global new was called 15590 times during spin
Global delete was called 15590 times during spin
Allocator new was called 27284 times during spin
Allocator delete was called 27281 times during spin
我们捕获了执行路径上发生的约 2/3 的分配/释放,但剩下的 1/3 来自哪里?
事实上,这些分配/释放源自本示例中使用的底层 DDS 实现。
证明这一点超出了本教程的范围,但您可以查看作为 ROS 2 持续集成测试的一部分运行的分配路径测试,该测试回溯代码并确定某些函数调用是源自 rmw 实现还是 DDS 实现:
https://github.com/ros2/realtime_support/blob/rolling/tlsf_cpp/test/test_tlsf.cpp#L41
请注意,此测试未使用我们刚刚创建的自定义分配器,而是使用 TLSF 分配器(见下文)。
TLSF 分配器
ROS 2 提供对 TLSF(两级分离拟合)分配器的支持,该分配器旨在满足实时要求:
https://github.com/ros2/realtime_support/tree/rolling/tlsf_cpp
有关 TLSF 的更多信息,请参阅 http://www.gii.upv.es/tlsf/
请注意,TLSF 分配器是根据双重 GPL/LGPL 许可证授权的。
使用 TLSF 分配器的完整工作示例在此处: https://github.com/ros2/realtime_support/blob/rolling/tlsf_cpp/example/allocator_example.cpp