moveit2
The MoveIt Motion Planning Framework for ROS 2.
test_trajectory_cache.py
Go to the documentation of this file.
1 # Copyright 2024 Intrinsic Innovation LLC.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 # Author: methylDragon
16 
17 from ament_index_python.packages import get_package_share_directory
18 from launch import LaunchDescription
19 from launch.actions import ExecuteProcess
20 from launch_pytest.tools import process as process_tools
21 from launch_ros.actions import Node
22 
23 from moveit_configs_utils import MoveItConfigsBuilder
24 
25 import launch_pytest
26 
27 import pytest
28 import os
29 
30 # This would have been a gtest, but we need a move_group instance, which
31 # requires some parameters loaded and a separate node started.
32 
33 
34 @pytest.fixture
36  return (
37  MoveItConfigsBuilder("moveit_resources_panda")
38  .robot_description(file_path="config/panda.urdf.xacro")
39  .planning_pipelines("ompl", ["ompl"])
40  .trajectory_execution(file_path="config/gripper_moveit_controllers.yaml")
41  .to_moveit_configs()
42  )
43 
44 
45 # We need this so the move_group has something to interact with
46 @pytest.fixture
47 def robot_fixture(moveit_config):
48  run_move_group_node = Node(
49  package="moveit_ros_move_group",
50  executable="move_group",
51  output="log",
52  parameters=[moveit_config.to_dict()],
53  )
54 
55  static_tf = Node(
56  package="tf2_ros",
57  executable="static_transform_publisher",
58  output="log",
59  arguments=["0.0", "0.0", "0.0", "0.0", "0.0", "0.0", "world", "panda_link0"],
60  )
61 
62  robot_state_publisher = Node(
63  package="robot_state_publisher",
64  executable="robot_state_publisher",
65  name="robot_state_publisher",
66  output="log",
67  parameters=[moveit_config.robot_description],
68  )
69 
70  # ros2_control using FakeSystem as hardware
71  ros2_controllers_path = os.path.join(
72  get_package_share_directory("moveit_resources_panda_moveit_config"),
73  "config",
74  "ros2_controllers.yaml",
75  )
76  ros2_control_node = Node(
77  package="controller_manager",
78  executable="ros2_control_node",
79  parameters=[moveit_config.robot_description, ros2_controllers_path],
80  output="log",
81  )
82 
83  load_controllers = []
84  for controller in [
85  "panda_arm_controller",
86  "panda_hand_controller",
87  "joint_state_broadcaster",
88  ]:
89  load_controllers += [
90  ExecuteProcess(
91  cmd=["ros2 run controller_manager spawner {}".format(controller)],
92  shell=True,
93  output="log",
94  )
95  ]
96 
97  return [
98  run_move_group_node,
99  static_tf,
100  robot_state_publisher,
101  ros2_control_node,
102  *load_controllers,
103  ]
104 
105 
106 @pytest.fixture
108  return Node(
109  package="moveit_ros_trajectory_cache",
110  executable="test_trajectory_cache",
111  name="test_trajectory_cache_node",
112  output="screen",
113  cached_output=True,
114  parameters=[moveit_config.to_dict()],
115  )
116 
117 
118 @launch_pytest.fixture
119 def launch_description(trajectory_cache_test_runner_node, robot_fixture):
120  return LaunchDescription(
121  robot_fixture
122  + [trajectory_cache_test_runner_node, launch_pytest.actions.ReadyToTest()]
123  )
124 
125 
126 def validate_stream(expected):
127  def wrapped(output):
128  assert (
129  expected in output.splitlines()
130  ), f"Did not get expected: {expected} in output:\n\n{output}"
131 
132  return wrapped
133 
134 
135 @pytest.mark.launch(fixture=launch_description)
136 def test_all_tests_pass(trajectory_cache_test_runner_node, launch_context):
137  # Check for occurrences of [PASS] in output
138  assert process_tools.wait_for_output_sync(
139  launch_context,
140  trajectory_cache_test_runner_node,
141  lambda x: x.count("[PASS]") == 169, # All test cases passed.
142  timeout=30,
143  )
144 
145  # Check no occurrences of [FAIL] in output
146  assert not process_tools.wait_for_output_sync(
147  launch_context,
148  trajectory_cache_test_runner_node,
149  lambda x: "[FAIL]" in x,
150  timeout=10,
151  )
152 
153  yield
def trajectory_cache_test_runner_node(moveit_config)
def test_all_tests_pass(trajectory_cache_test_runner_node, launch_context)
def launch_description(trajectory_cache_test_runner_node, robot_fixture)
def robot_fixture(moveit_config)