编写动作服务器和客户端 (Python)

目标:使用 Python 实现动作服务器和客户端。

教程级别:中级

时间:15 分钟

背景

动作是 ROS 2 中的一种异步通信形式。 *动作客户端*向*动作服务器*发送目标请求。 *动作服务器*向*动作客户端*发送目标反馈和结果。

先决条件

您将需要 custom_action_interfaces 包和 Fibonacci.action 接口,它们在上一个教程 创建操作 中定义。

任务

1 编写动作服务器

让我们专注于编写一个动作服务器,使用我们在 创建操作 教程中创建的动作来计算斐波那契数列。

到目前为止,您已经创建了包并使用 ros2 run 来运行您的节点。 但是,为了在本教程中保持简单,我们将操作服务器的范围限定为单个文件。 如果您想查看操作教程的完整包是什么样子,请查看 action_tutorials

在您的主目录中打开一个新文件,我们将其命名为“fibonacci_action_server.py”, 并添加以下代码:

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from custom_action_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

第 8 行定义了一个类“FibonacciActionServer”,它是“Node”的子类。 该类通过调用“Node”构造函数进行初始化,将我们的节点命名为“fibonacci_action_server”:

        super().__init__('fibonacci_action_server')

在构造函数中我们还实例化了一个新的动作服务器:

        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

动作服务器需要四个参数:

  1. 一个 ROS 2 节点,用于添加动作客户端:“self”。

  2. 动作类型:“Fibonacci”(第 5 行导入)。

  3. 动作名称:“’fibonacci’”。

4. 用于执行已接受目标的回调函数:“self.execute_callback”。 此回调**必须**返回动作类型的结果消息。

我们还在我们的类中定义了一个“execute_callback”方法:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

这是在目标被接受后将调用来执行目标的方法。

让我们尝试运行我们的操作服务器:

python3 fibonacci_action_server.py

在另一个终端中,我们可以使用命令行界面发送目标:

ros2 action send_goal fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"

在运行操作服务器的终端中,您应该会看到一条记录的消息“正在执行目标…”,随后是一条警告,提示目标状态未设置。 默认情况下,如果在执行回调中未设置目标句柄状态,则它假定为*中止*状态。 我们可以在目标句柄上调用“succeed()”来指示目标已成功:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

现在,如果您重新启动操作服务器并发送另一个目标,您应该会看到目标已完成,状态为“成功”。

现在让我们让我们的目标执行实际计算并返回请求的斐波那契数列:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

计算序列后,我们将其分配给结果消息字段,然后再返回。

再次重新启动操作服务器并发送另一个目标。 您应该看到目标以正确的结果序列结束。

1.2 发布反馈

动作的优点之一是能够在目标执行期间向动作客户端提供反馈。

我们可以通过调用目标句柄的 publish_feedback() 方法让动作服务器为动作客户端发布反馈。

我们将替换 sequence 变量,并使用反馈消息来存储序列。

在 for 循环中每次更新反馈消息后,我们都会发布反馈消息并休眠以达到戏剧效果:

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from custom_action_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

重新启动动作服务器后,我们可以使用带有“–feedback”选项的命令行工具来确认反馈现已发布:

ros2 action send_goal --feedback fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"

2 编写操作客户端

我们还将操作客户端的范围限定为单个文件。 打开一个新文件,我们将其命名为“fibonacci_action_client.py”,并添加以下样板代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from custom_action_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)


if __name__ == '__main__':
    main()

我们定义了一个类“FibonacciActionClient”,它是“Node”的子类。 该类通过调用“Node”构造函数进行初始化,将我们的节点命名为“fibonacci_action_client”:

        super().__init__('fibonacci_action_client')

此外,在类构造函数中,我们使用上一个教程:doc:创建操作 中的自定义动作定义创建一个动作客户端:

        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

我们通过向其传递三个参数来创建一个“ActionClient”:

  1. 一个要添加动作客户端的 ROS 2 节点:“self”

  2. 动作类型:“Fibonacci”

  3. 动作名称:“’fibonacci’”

我们的动作客户端将能够与具有相同动作名称和类型的动作服务器进行通信。

我们还在“FibonacciActionClient”类中定义了一个方法“send_goal”:

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

此方法等待动作服务器可用,然后向服务器发送目标。 它返回一个我们稍后可以等待的未来。

在类定义之后,我们定义一个函数“main()”,它初始化 ROS 2 并创建我们的“FibonacciActionClient”节点的实例。 然后它发送一个目标并等待该目标完成。

最后,我们在 Python 程序的入口点调用“main()”。

让我们通过首先运行之前构建的动作服务器来测试我们的动作客户端:

python3 fibonacci_action_server.py

在另一个终端中,运行动作客户端:

python3 fibonacci_action_client.py

当动作服务器成功执行目标时,您应该会看到它打印的消息:

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

操作客户端应该启动,然后快速完成。 此时,我们有一个正常运行的操作客户端,但我们看不到任何结果或得到任何反馈。

2.1 获取结果

因此,我们可以发送一个目标,但我们如何知道它何时完成? 我们可以通过几个步骤获取结果信息。 首先,我们需要获取我们发送的目标的目标句柄。 然后,我们可以使用目标句柄来请求结果。

这是此示例的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from custom_action_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

ActionClient.send_goal_async() 方法将未来返回给目标句柄。 首先,我们注册一个回调函数,用于未来完成时回调:

        self._send_goal_future.add_done_callback(self.goal_response_callback)

请注意,当动作服务器接受或拒绝目标请求时,未来就完成了。 让我们更详细地看一下“goal_response_callback”。 我们可以检查目标是否被拒绝并提前返回,因为我们知道不会有结果:

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

现在我们已经有了目标句柄,我们可以使用它通过方法“get_result_async()”来请求结果。 与发送目标类似,我们将获得一个在结果准备就绪时完成的未来。 让我们注册一个回调,就像我们对目标响应所做的那样:

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

在回调中,我们记录结果序列并关闭 ROS 2 以干净退出:

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

在单独的终端中运行动作服务器,继续尝试运行我们的斐波那契动作客户端!

python3 fibonacci_action_client.py

您应该会看到目标被接受的记录消息和最终结果。

2.2 获取反馈

我们的操作客户端可以发送目标。 很好! 但是如果我们可以从操作服务器获得有关我们发送的目标的一些反馈,那就太好了。

以下是此示例的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from custom_action_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

反馈消息的回调函数如下:

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

在回调中,我们获取消息的反馈部分,并将“partial_sequence”字段打印到屏幕上。

我们需要向操作客户端注册回调。 这是通过在发送目标时将回调额外传递给操作客户端来实现的:

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

一切就绪。如果我们运行操作客户端,您应该会看到反馈被打印到屏幕上。

摘要

在本教程中,您逐行组合了一个 Python 操作服务器和操作客户端,并将它们配置为交换目标、反馈和结果。

相关内容

  • 有几种方法可以用 Python 编写操作服务器和客户端;请查看 ros2/examples 存储库中的 minimal_action_serverminimal_action_client 包。

  • 有关 ROS 操作的更多详细信息,请参阅 设计文章