设置机器人模拟(高级)
目标:使用避障节点扩展机器人模拟。
教程级别:高级
时间:20 分钟
背景
在本教程中,您将扩展在教程第一部分中创建的包:设置机器人模拟(基础)。
目的是实现一个使用机器人距离传感器避开障碍物的 ROS 2 节点。
本教程重点介绍如何使用带有 webots_ros2_driver
接口的机器人设备。
先决条件
这是本教程第一部分的延续:设置机器人模拟(基础)。 必须从第一部分开始设置自定义包和必要的文件。
本教程与 webots_ros2
和 Webots R2023b 的 2023.1.0 版本以及即将推出的版本兼容。
任务
1 更新 my_robot.urdf
如 设置机器人模拟(基础) 中所述,webots_ros2_driver
包含插件,可将大多数 Webots 设备直接与 ROS 2 连接。
可以使用机器人 URDF 文件中的 <device>
标签加载这些插件。
reference
属性应与 Webots 设备 name
参数匹配。
可以在``设备参考页面 <https://github.com/cyberbotics/webots_ros2/wiki/References-Devices>`_ 上找到所有现有接口和相应参数的列表。 对于 URDF 文件中未配置的可用设备,将自动创建接口,并使用 ROS 参数的默认值(例如“更新率”、“主题名称”和“框架名称”)。
在“my_robot.urdf”中将所有内容替换为:
<?xml version="1.0" ?>
<robot name="My robot">
<webots>
<device reference="ds0" type="DistanceSensor">
<ros>
<topicName>/left_sensor</topicName>
<alwaysOn>true</alwaysOn>
</ros>
</device>
<device reference="ds1" type="DistanceSensor">
<ros>
<topicName>/right_sensor</topicName>
<alwaysOn>true</alwaysOn>
</ros>
</device>
<plugin type="my_package.my_robot_driver.MyRobotDriver" />
</webots>
</robot>
<?xml version="1.0" ?>
<robot name="My robot">
<webots>
<device reference="ds0" type="DistanceSensor">
<ros>
<topicName>/left_sensor</topicName>
<alwaysOn>true</alwaysOn>
</ros>
</device>
<device reference="ds1" type="DistanceSensor">
<ros>
<topicName>/right_sensor</topicName>
<alwaysOn>true</alwaysOn>
</ros>
</device>
<plugin type="my_robot_driver::MyRobotDriver" />
</webots>
</robot>
除了您的自定义插件之外,“webots_ros2_driver”还将解析引用**DistanceSensor**节点的“<device>”标签,并使用“<ros>”标签中的标准参数来启用传感器并命名其主题。
2 创建 ROS 节点以避开障碍物
The robot will use a standard ROS node to detect the wall and send motor commands to avoid it.
In the my_package/my_package/
folder, create a file named obstacle_avoider.py
with this code:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Range
from geometry_msgs.msg import Twist
MAX_RANGE = 0.15
class ObstacleAvoider(Node):
def __init__(self):
super().__init__('obstacle_avoider')
self.__publisher = self.create_publisher(Twist, 'cmd_vel', 1)
self.create_subscription(Range, 'left_sensor', self.__left_sensor_callback, 1)
self.create_subscription(Range, 'right_sensor', self.__right_sensor_callback, 1)
def __left_sensor_callback(self, message):
self.__left_sensor_value = message.range
def __right_sensor_callback(self, message):
self.__right_sensor_value = message.range
command_message = Twist()
command_message.linear.x = 0.1
if self.__left_sensor_value < 0.9 * MAX_RANGE or self.__right_sensor_value < 0.9 * MAX_RANGE:
command_message.angular.z = -2.0
self.__publisher.publish(command_message)
def main(args=None):
rclpy.init(args=args)
avoider = ObstacleAvoider()
rclpy.spin(avoider)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
avoider.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
This node will create a publisher for the command and subscribe to the sensors topics here:
self.__publisher = self.create_publisher(Twist, 'cmd_vel', 1)
self.create_subscription(Range, 'left_sensor', self.__left_sensor_callback, 1)
self.create_subscription(Range, 'right_sensor', self.__right_sensor_callback, 1)
When a measurement is received from the left sensor it will be copied to a member field:
def __left_sensor_callback(self, message):
self.__left_sensor_value = message.range
Finally, a message will be sent to the /cmd_vel
topic when a measurement from the right sensor is received.
The command_message
will register at least a forward speed in linear.x
in order to make the robot move when no obstacle is detected.
If any of the two sensors detect an obstacle, command_message
will also register a rotational speed in angular.z
in order to make the robot turn right.
def __right_sensor_callback(self, message):
self.__right_sensor_value = message.range
command_message = Twist()
command_message.linear.x = 0.1
if self.__left_sensor_value < 0.9 * MAX_RANGE or self.__right_sensor_value < 0.9 * MAX_RANGE:
command_message.angular.z = -2.0
self.__publisher.publish(command_message)
The robot will use a standard ROS node to detect the wall and send motor commands to avoid it.
In the my_package/include/my_package
folder, create a header file named ObstacleAvoider.hpp
with this code:
#include <memory>
#include "geometry_msgs/msg/twist.hpp"
#include "rclcpp/rclcpp.hpp"
#include "sensor_msgs/msg/range.hpp"
class ObstacleAvoider : public rclcpp::Node {
public:
explicit ObstacleAvoider();
private:
void leftSensorCallback(const sensor_msgs::msg::Range::SharedPtr msg);
void rightSensorCallback(const sensor_msgs::msg::Range::SharedPtr msg);
rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr publisher_;
rclcpp::Subscription<sensor_msgs::msg::Range>::SharedPtr left_sensor_sub_;
rclcpp::Subscription<sensor_msgs::msg::Range>::SharedPtr right_sensor_sub_;
double left_sensor_value{0.0};
double right_sensor_value{0.0};
};
In the my_package/src
folder, create a source file named ObstacleAvoider.cpp
with this code:
#include "my_package/ObstacleAvoider.hpp"
#define MAX_RANGE 0.15
ObstacleAvoider::ObstacleAvoider() : Node("obstacle_avoider") {
publisher_ = create_publisher<geometry_msgs::msg::Twist>("/cmd_vel", 1);
left_sensor_sub_ = create_subscription<sensor_msgs::msg::Range>(
"/left_sensor", 1,
std::bind(&ObstacleAvoider::leftSensorCallback, this,
std::placeholders::_1));
right_sensor_sub_ = create_subscription<sensor_msgs::msg::Range>(
"/right_sensor", 1,
std::bind(&ObstacleAvoider::rightSensorCallback, this,
std::placeholders::_1));
}
void ObstacleAvoider::leftSensorCallback(
const sensor_msgs::msg::Range::SharedPtr msg) {
left_sensor_value = msg->range;
}
void ObstacleAvoider::rightSensorCallback(
const sensor_msgs::msg::Range::SharedPtr msg) {
right_sensor_value = msg->range;
auto command_message = std::make_unique<geometry_msgs::msg::Twist>();
command_message->linear.x = 0.1;
if (left_sensor_value < 0.9 * MAX_RANGE ||
right_sensor_value < 0.9 * MAX_RANGE) {
command_message->angular.z = -2.0;
}
publisher_->publish(std::move(command_message));
}
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
auto avoider = std::make_shared<ObstacleAvoider>();
rclcpp::spin(avoider);
rclcpp::shutdown();
return 0;
}
This node will create a publisher for the command and subscribe to the sensors topics here:
publisher_ = create_publisher<geometry_msgs::msg::Twist>("/cmd_vel", 1);
left_sensor_sub_ = create_subscription<sensor_msgs::msg::Range>(
"/left_sensor", 1,
std::bind(&ObstacleAvoider::leftSensorCallback, this,
std::placeholders::_1));
right_sensor_sub_ = create_subscription<sensor_msgs::msg::Range>(
"/right_sensor", 1,
std::bind(&ObstacleAvoider::rightSensorCallback, this,
std::placeholders::_1));
When a measurement is received from the left sensor it will be copied to a member field:
void ObstacleAvoider::leftSensorCallback(
const sensor_msgs::msg::Range::SharedPtr msg) {
left_sensor_value = msg->range;
}
Finally, a message will be sent to the /cmd_vel
topic when a measurement from the right sensor is received.
The command_message
will register at least a forward speed in linear.x
in order to make the robot move when no obstacle is detected.
If any of the two sensors detect an obstacle, command_message
will also register a rotational speed in angular.z
in order to make the robot turn right.
void ObstacleAvoider::rightSensorCallback(
const sensor_msgs::msg::Range::SharedPtr msg) {
right_sensor_value = msg->range;
auto command_message = std::make_unique<geometry_msgs::msg::Twist>();
command_message->linear.x = 0.1;
if (left_sensor_value < 0.9 * MAX_RANGE ||
right_sensor_value < 0.9 * MAX_RANGE) {
command_message->angular.z = -2.0;
}
publisher_->publish(std::move(command_message));
}
3 更新其他文件
您必须修改这两个文件才能启动新节点。
Edit setup.py
and replace 'console_scripts'
with:
'console_scripts': [
'my_robot_driver = my_package.my_robot_driver:main',
'obstacle_avoider = my_package.obstacle_avoider:main'
],
This will add an entry point for the obstacle_avoider
node.
Edit CMakeLists.txt
and add the compilation and installation of the obstacle_avoider
:
cmake_minimum_required(VERSION 3.5)
project(my_package)
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()
# Besides the package specific dependencies we also need the `pluginlib` and `webots_ros2_driver`
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(pluginlib REQUIRED)
find_package(webots_ros2_driver REQUIRED)
# Export the plugin configuration file
pluginlib_export_plugin_description_file(webots_ros2_driver my_robot_driver.xml)
# Obstacle avoider
include_directories(
include
)
add_executable(obstacle_avoider
src/ObstacleAvoider.cpp
)
ament_target_dependencies(obstacle_avoider
rclcpp
geometry_msgs
sensor_msgs
)
install(TARGETS
obstacle_avoider
DESTINATION lib/${PROJECT_NAME}
)
install(
DIRECTORY include/
DESTINATION include
)
# MyRobotDriver library
add_library(
${PROJECT_NAME}
SHARED
src/MyRobotDriver.cpp
)
target_include_directories(
${PROJECT_NAME}
PRIVATE
include
)
ament_target_dependencies(
${PROJECT_NAME}
pluginlib
rclcpp
webots_ros2_driver
)
install(TARGETS
${PROJECT_NAME}
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION bin
)
# Install additional directories.
install(DIRECTORY
launch
resource
worlds
DESTINATION share/${PROJECT_NAME}/
)
ament_export_include_directories(
include
)
ament_export_libraries(
${PROJECT_NAME}
)
ament_package()
转到文件 robot_launch.py
并将其替换为:
import os
import launch
from launch_ros.actions import Node
from launch import LaunchDescription
from ament_index_python.packages import get_package_share_directory
from webots_ros2_driver.webots_launcher import WebotsLauncher
from webots_ros2_driver.webots_controller import WebotsController
def generate_launch_description():
package_dir = get_package_share_directory('my_package')
robot_description_path = os.path.join(package_dir, 'resource', 'my_robot.urdf')
webots = WebotsLauncher(
world=os.path.join(package_dir, 'worlds', 'my_world.wbt')
)
my_robot_driver = WebotsController(
robot_name='my_robot',
parameters=[
{'robot_description': robot_description_path},
]
)
obstacle_avoider = Node(
package='my_package',
executable='obstacle_avoider',
)
return LaunchDescription([
webots,
my_robot_driver,
obstacle_avoider,
launch.actions.RegisterEventHandler(
event_handler=launch.event_handlers.OnProcessExit(
target_action=webots,
on_exit=[launch.actions.EmitEvent(event=launch.events.Shutdown())],
)
)
])
这将创建一个“obstacle_avoider”节点,该节点将包含在“LaunchDescription”中。
4 测试避障代码
从 ROS 2 工作区中的终端启动模拟:
From a terminal in your ROS 2 workspace run:
colcon build
source install/local_setup.bash
ros2 launch my_package robot_launch.py
From a terminal in your WSL ROS 2 workspace run:
colcon build
export WEBOTS_HOME=/mnt/c/Program\ Files/Webots
source install/local_setup.bash
ros2 launch my_package robot_launch.py
Be sure to use the /mnt
prefix in front of your path to the Webots installation folder to access the Windows file system from WSL.
In a terminal of the host machine (not in the VM), if not done already, specify the Webots installation folder (e.g. /Applications/Webots.app
) and start the server using the following commands:
export WEBOTS_HOME=/Applications/Webots.app
python3 local_simulation_server.py
Note that the server keeps running once the ROS 2 nodes are ended. You don’t need to restart it every time you want to launch a new simulation. From a terminal in the Linux VM in your ROS 2 workspace, build and launch your custom package with:
cd ~/ros2_ws
colcon build
source install/local_setup.bash
ros2 launch my_package robot_launch.py
您的机器人应该向前移动,并且在撞到墙壁之前顺时针旋转。 您可以在 Webots 中按“Ctrl+F10”或转到“查看”菜单、“可选渲染”和“显示距离传感器射线”以显示机器人距离传感器的范围。
摘要
在本教程中,您使用避障器 ROS 2 节点扩展了基本模拟,该节点根据机器人的距离传感器值发布速度命令。
后续步骤
您可能想要改进插件或创建新节点来更改机器人的行为。
您还可以实现重置处理程序,以便在从 Webots 界面重置模拟时自动重新启动 ROS 节点: