36 Definition of an abstract base class for policy deployment.
37 For now, this policy only supports moveit_servo command interfaces and Image sensors.
40 from abc
import ABC, abstractmethod
42 from rclpy.node
import Node
43 from rclpy.qos
import QoSProfile
45 from control_msgs.msg
import JointJog
46 from geometry_msgs.msg
import PoseStamped, Twist
47 from sensor_msgs.msg
import Image
49 from std_srvs.srv
import SetBool
51 import message_filters
55 """An abstract base class for deploying learnt policies."""
57 def __init__(self, params, node_name="policy_node"):
58 """Initialise the policy."""
60 self.
loggerlogger = self.get_logger()
82 """Returns True if the policy is active."""
87 """Sets the policy to active state via Python API."""
91 """Sets the policy to active state via ROS interface."""
96 """Returns the ROS 2 message type for a given sensor type."""
97 if msg_type ==
"sensor_msgs/Image":
100 raise ValueError(f
"Sensor type {msg_type} not supported.")
103 """Returns the ROS 2 message type for a given command type."""
104 if msg_type ==
"geometry_msgs/PoseStamped":
106 elif msg_type ==
"geometry_msgs/Twist":
108 elif msg_type ==
"control_msgs/JointJog":
111 raise ValueError(f
"Command type {msg_type} not supported.")
114 """Register the topics to listen to for sensor data."""
118 for sensor_idx
in range(self.
paramsparams.num_sensors):
119 sensor_params = self.get_parameters_by_prefix(f
"sensor{sensor_idx + 1}")
121 message_filters.Subscriber(
124 str(sensor_params[
"topic"].value),
125 qos_profile=QoSProfile(depth=sensor_params[
"qos"].value),
130 self.
sensor_syncsensor_sync = message_filters.ApproximateTimeSynchronizer(
132 self.
paramsparams.sensor_queue,
133 self.
paramsparams.sensor_slop,
140 """Register the topic to publish actions to."""
143 self.
paramsparams.command.topic,
144 QoSProfile(depth=self.
paramsparams.command.qos),
149 """Perform a forward pass of the policy."""
def get_command_msg_type(self, msg_type)
def __init__(self, params, node_name="policy_node")
def activate_policy(self, request, response)
def register_sensors(self)
def register_command(self)
def get_sensor_msg_type(self, msg_type)
std::string append(const std::string &left, const std::string &right)