实现自定义内存分配器

目标:本教程将展示如何在编写 ROS 2 C++ 代码时使用自定义内存分配器。

教程级别:高级

时间:20 分钟

本教程将教您如何为发布者和订阅者集成自定义分配器,以便在 ROS 节点执行时永远不会调用默认堆分配器。 本教程的代码可在此处获取 here

背景

假设您想编写实时安全代码,并且您已经听说过在实时关键部分调用“new”的诸多危险,因为大多数平台上的默认堆分配器是不确定的。

默认情况下,许多 C++ 标准库结构会在增长时隐式分配内存,例如“std::vector”。 但是,这些数据结构也接受“Allocator”模板参数。 如果您为这些数据结构之一指定自定义分配器,它将使用该分配器而不是系统分配器来增大或缩小数据结构。 您的自定义分配器可以在堆栈上预先分配一个内存池,这可能更适合实时应用程序。

在 ROS 2 C++ 客户端库 (rclcpp) 中,我们遵循与 C++ 标准库类似的理念。 发布者、订阅者和执行者接受分配器模板参数,该参数控制该实体在执行期间所做的分配。

编写分配器

要编写与 ROS 2 的分配器接口兼容的分配器,您的分配器必须与 C++ 标准库分配器接口兼容。

自 C++17 以来,标准库提供了称为“std::pmr::memory_resource”的东西。 这是一个可以从中派生的类,用于创建满足最低要求的自定义分配器。

例如,以下自定义内存资源声明满足要求(当然,您仍然需要在此类中实现声明的函数):

class CustomMemoryResource : public std::pmr::memory_resource
{
private:
  void * do_allocate(std::size_t bytes, std::size_t alignment) override;

  void do_deallocate(
    void * p, std::size_t bytes,
    std::size_t alignment) override;

  bool do_is_equal(
    const std::pmr::memory_resource & other) const noexcept override;
};

要了解“std::pmr::memory_resource”的全部功能,请参阅 https://en.cppreference.com/w/cpp/memory/memory_resource。

本教程的自定义分配器的完整实现位于 https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/allocator_tutorial_pmr.cpp

编写示例主程序

编写有效的 C++ 分配器后,必须将其作为共享指针传递给发布者、订阅者和执行者。 但首先,我们将声明一些别名来缩短名称。

using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy;
using Alloc = std::pmr::polymorphic_allocator<void>;
using MessageAllocTraits =
  rclcpp::allocator::AllocRebind<std_msgs::msg::UInt32, Alloc>;
using MessageAlloc = MessageAllocTraits::allocator_type;
using MessageDeleter = rclcpp::allocator::Deleter<MessageAlloc, std_msgs::msg::UInt32>;
using MessageUniquePtr = std::unique_ptr<std_msgs::msg::UInt32, MessageDeleter>;

现在我们可以使用自定义分配器创建资源:

CustomMemoryResource mem_resource{};
auto alloc = std::make_shared<Alloc>(&mem_resource);
rclcpp::PublisherOptionsWithAllocator<Alloc> publisher_options;
publisher_options.allocator = alloc;
auto publisher = node->create_publisher<std_msgs::msg::UInt32>(
  "allocator_tutorial", 10, publisher_options);

rclcpp::SubscriptionOptionsWithAllocator<Alloc> subscription_options;
subscription_options.allocator = alloc;
auto msg_mem_strat = std::make_shared<
  rclcpp::message_memory_strategy::MessageMemoryStrategy<
    std_msgs::msg::UInt32, Alloc>>(alloc);
auto subscriber = node->create_subscription<std_msgs::msg::UInt32>(
  "allocator_tutorial", 10, callback, subscription_options, msg_mem_strat);

std::shared_ptr<rclcpp::memory_strategy::MemoryStrategy> memory_strategy =
  std::make_shared<AllocatorMemoryStrategy<Alloc>>(alloc);

rclcpp::ExecutorOptions options;
options.memory_strategy = memory_strategy;
rclcpp::executors::SingleThreadedExecutor executor(options);

您还必须实例化自定义删除器和分配器,以便在分配消息时使用:

MessageDeleter message_deleter;
MessageAlloc message_alloc = *alloc;
rclcpp::allocator::set_allocator_for_deleter(&message_deleter, &message_alloc);

将节点添加到执行器后,就可以开始旋转了。 我们将使用自定义分配器来分配每条消息:

uint32_t i = 0;
while (rclcpp::ok()) {
  auto ptr = MessageAllocTraits::allocate(message_alloc, 1);
  MessageAllocTraits::construct(message_alloc, ptr);
  MessageUniquePtr msg(ptr, message_deleter);
  msg->data = i;
  ++i;
  publisher->publish(std::move(msg));
  rclcpp::sleep_for(10ms);
  executor.spin_some();
}

将分配器传递给进程内管道

尽管我们在同一进程中实例化了发布者和订阅者,但我们尚未使用进程内管道。

IntraProcessManager 是一个通常对用户隐藏的类,但为了将自定义分配器传递给它,我们需要通过从 rclcpp 上下文中获取它来公开它。 IntraProcessManager 使用了几个标准库结构,因此如果没有自定义分配器,它将调用默认的“new”。

auto context = rclcpp::contexts::get_global_default_context();
auto options = rclcpp::NodeOptions()
  .context(context)
  .use_intra_process_comms(true);
auto node = rclcpp::Node::make_shared("allocator_example", options);

确保在以这种方式构造节点之后实例化发布者和订阅者。

测试和验证代码

您如何知道您的自定义分配器实际上被调用了?

显而易见的做法是计算对自定义分配器的“allocate”和“deallocate”函数的调用次数,并将其与对“new”和“delete”的调用进行比较。

向自定义分配器添加计数很容易:

void * do_allocate(std::size_t size, std::size_t alignment) override
{
  // ...
  num_allocs++;
  // ...
}

void do_deallocate(
  void * p, std::size_t bytes,
  std::size_t alignment) override
{
  // ...
  num_deallocs++;
  // ...
}

您还可以覆盖全局 newdelete 操作符:

void * operator new(std::size_t size)
{
  if (is_running) {
    global_runtime_allocs++;
  }
  return std::malloc(size);
}

void operator delete(void * ptr, size_t) noexcept
{
  if (ptr != nullptr) {
    if (is_running) {
      global_runtime_deallocs++;
    }
    std::free(ptr);
  }
}

void operator delete(void * ptr) noexcept
{
  if (ptr != nullptr) {
    if (is_running) {
      global_runtime_deallocs++;
    }
    std::free(ptr);
  }
}

其中,我们要增加的变量只是全局静态整数,而“is_running”是一个全局静态布尔值,在调用“spin”之前会切换。

“示例可执行文件<https://github.com/ros2/demos/blob/rolling/demo_nodes_cpp/src/topics/allocator_tutorial_pmr.cpp>”打印变量的值。 要运行示例可执行文件,请使用:

ros2 run demo_nodes_cpp allocator_tutorial

或者,使用进程内管道运行示例:

ros2 run demo_nodes_cpp allocator_tutorial intra

您应该得到如下数字:

Global new was called 15590 times during spin
Global delete was called 15590 times during spin
Allocator new was called 27284 times during spin
Allocator delete was called 27281 times during spin

我们捕获了执行路径上发生的约 2/3 的分配/释放,但剩下的 1/3 来自哪里?

事实上,这些分配/释放源自本示例中使用的底层 DDS​​ 实现。

证明这一点超出了本教程的范围,但您可以查看作为 ROS 2 持续集成测试的一部分运行的分配路径测试,该测试回溯代码并确定某些函数调用是源自 rmw 实现还是 DDS 实现:

https://github.com/ros2/realtime_support/blob/rolling/tlsf_cpp/test/test_tlsf.cpp#L41

请注意,此测试未使用我们刚刚创建的自定义分配器,而是使用 TLSF 分配器(见下文)。

TLSF 分配器

ROS 2 提供对 TLSF(两级分离拟合)分配器的支持,该分配器旨在满足实时要求:

https://github.com/ros2/realtime_support/tree/rolling/tlsf_cpp

有关 TLSF 的更多信息,请参阅 http://www.gii.upv.es/tlsf/

请注意,TLSF 分配器是根据双重 GPL/LGPL 许可证授权的。

使用 TLSF 分配器的完整工作示例在此处: https://github.com/ros2/realtime_support/blob/rolling/tlsf_cpp/example/allocator_example.cpp