moveit2
The MoveIt Motion Planning Framework for ROS 2.
policy.py
Go to the documentation of this file.
1 # Software License Agreement (BSD License)
2 #
3 # Copyright (c) 2023, Peter David Fagan
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following
14 # disclaimer in the documentation and/or other materials provided
15 # with the distribution.
16 # * Neither the name of Willow Garage, Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived
18 # from this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 # POSSIBILITY OF SUCH DAMAGE.
32 #
33 # Author: Peter David Fagan
34 
35 """
36 Definition of an abstract base class for policy deployment.
37 For now, this policy only supports moveit_servo command interfaces and Image sensors.
38 """
39 
40 from abc import ABC, abstractmethod
41 
42 from rclpy.node import Node
43 from rclpy.qos import QoSProfile
44 
45 from control_msgs.msg import JointJog
46 from geometry_msgs.msg import PoseStamped, Twist
47 from sensor_msgs.msg import Image
48 
49 from std_srvs.srv import SetBool
50 
51 import message_filters
52 
53 
54 class Policy(ABC, Node):
55  """An abstract base class for deploying learnt policies."""
56 
57  def __init__(self, params, node_name="policy_node"):
58  """Initialise the policy."""
59  super().__init__(node_name)
60  self.loggerlogger = self.get_logger()
61 
62  # parse parameters
63  self.param_listenerparam_listener = params.ParamListener(self)
64  self.paramsparams = self.param_listenerparam_listener.get_params()
65 
66  # set policy to inactive by default
67  self._is_active_is_active = False
68  self.activate_policy_serviceactivate_policy_service = self.create_service(
69  SetBool,
70  "activate_policy",
71  self.activate_policyactivate_policy,
72  )
73 
74  # register sensor topics
75  self.register_sensorsregister_sensors()
76 
77  # register servo command topic
78  self.register_commandregister_command()
79 
80  @property
81  def is_active(self):
82  """Returns True if the policy is active."""
83  return self._is_active_is_active
84 
85  @is_active.setter
86  def active(self, value):
87  """Sets the policy to active state via Python API."""
88  self._is_active_is_active = value
89 
90  def activate_policy(self, request, response):
91  """Sets the policy to active state via ROS interface."""
92  self._is_active_is_active = request.data
93  return response
94 
95  def get_sensor_msg_type(self, msg_type):
96  """Returns the ROS 2 message type for a given sensor type."""
97  if msg_type == "sensor_msgs/Image":
98  return Image
99  else:
100  raise ValueError(f"Sensor type {msg_type} not supported.")
101 
102  def get_command_msg_type(self, msg_type):
103  """Returns the ROS 2 message type for a given command type."""
104  if msg_type == "geometry_msgs/PoseStamped":
105  return PoseStamped
106  elif msg_type == "geometry_msgs/Twist":
107  return Twist
108  elif msg_type == "control_msgs/JointJog":
109  return JointJog
110  else:
111  raise ValueError(f"Command type {msg_type} not supported.")
112 
113  def register_sensors(self):
114  """Register the topics to listen to for sensor data."""
115  self.sensor_subssensor_subs = []
116  # TODO: refactor this section
117  # Related Issue: https://github.com/PickNikRobotics/generate_parameter_library/issues/155
118  for sensor_idx in range(self.paramsparams.num_sensors):
119  sensor_params = self.get_parameters_by_prefix(f"sensor{sensor_idx + 1}")
120  self.sensor_subssensor_subs.append(
121  message_filters.Subscriber(
122  self,
123  self.get_sensor_msg_typeget_sensor_msg_type(sensor_params["type"].value),
124  str(sensor_params["topic"].value),
125  qos_profile=QoSProfile(depth=sensor_params["qos"].value),
126  )
127  )
128 
129  # create a synchronizer for the sensor topics
130  self.sensor_syncsensor_sync = message_filters.ApproximateTimeSynchronizer(
131  self.sensor_subssensor_subs,
132  self.paramsparams.sensor_queue,
133  self.paramsparams.sensor_slop,
134  )
135 
136  # register model forward pass as callback
137  self.sensor_syncsensor_sync.registerCallback(self.forwardforward)
138 
139  def register_command(self):
140  """Register the topic to publish actions to."""
141  self.command_pubcommand_pub = self.create_publisher(
142  self.get_command_msg_typeget_command_msg_type(self.paramsparams.command.type),
143  self.paramsparams.command.topic,
144  QoSProfile(depth=self.paramsparams.command.qos),
145  )
146 
147  @abstractmethod
148  def forward():
149  """Perform a forward pass of the policy."""
150  pass
def active(self, value)
Definition: policy.py:86
def get_command_msg_type(self, msg_type)
Definition: policy.py:102
def __init__(self, params, node_name="policy_node")
Definition: policy.py:57
def activate_policy(self, request, response)
Definition: policy.py:90
def get_sensor_msg_type(self, msg_type)
Definition: policy.py:95
std::string append(const std::string &left, const std::string &right)