ROS1从入门到精通 12:导航与路径规划(让机器人自主导航)

【ROS1从入门到精通】第12篇:导航与路径规划(让机器人自主导航)

🎯 本文目标:深入学习ROS导航栈(Navigation Stack),掌握move_base框架、全局路径规划、局部路径规划、代价地图、恢复行为等核心概念,能够配置和调试完整的自主导航系统,实现机器人的智能移动。

📑 目录

  1. ROS导航栈概述
  2. move_base框架
  3. 代价地图(Costmap)
  4. 全局路径规划
  5. 局部路径规划
  6. 恢复行为
  7. 导航配置与调优
  8. 导航目标发送
  9. 多机器人导航
  10. 实战案例:完整导航系统
  11. 总结与最佳实践

1. ROS导航栈概述

1.1 什么是ROS导航栈?

ROS导航栈是一个2D导航框架,提供了从当前位置到目标位置的路径规划和执行能力。它包括:

  • 定位系统:确定机器人在地图中的位置
  • 感知系统:处理传感器数据,检测障碍物
  • 规划系统:生成安全可行的路径
  • 控制系统:执行路径,控制机器人运动

ROS Navigation StackCostmap 2DSensor DataMap ServerLocalizationOdometrymove_baseGlobal PlannerLocal PlannerRecovery BehaviorsGlobal Pathcmd_velRobot Base

1.2 导航栈组件

#!/usr/bin/env python# -*- coding: utf-8 -*-""" ROS导航栈组件概览 """classNavigationStack:"""导航栈组件管理"""def__init__(self): self.components ={'move_base':{'description':'导航主框架,协调所有组件','package':'move_base','node':'move_base_node','topics':{'input':['/map','/scan','/odom','/tf'],'output':['/cmd_vel','/move_base/path']}},'map_server':{'description':'提供静态地图','package':'map_server','node':'map_server','topics':{'output':['/map','/map_metadata']}},'amcl':{'description':'自适应蒙特卡罗定位','package':'amcl','node':'amcl','topics':{'input':['/scan','/map'],'output':['/amcl_pose','/particlecloud']}},'global_planner':{'description':'全局路径规划','types':['navfn','global_planner','carrot_planner'],'interface':'nav_core::BaseGlobalPlanner'},'local_planner':{'description':'局部路径规划','types':['dwa_local_planner','teb_local_planner','eband_local_planner'],'interface':'nav_core::BaseLocalPlanner'},'costmap':{'description':'代价地图','types':['static_layer','obstacle_layer','inflation_layer'],'package':'costmap_2d'}}defget_component_info(self, component_name):"""获取组件信息"""return self.components.get(component_name,{})deflist_components(self):"""列出所有组件"""for name, info in self.components.items():print(f"\n{name}:")print(f" Description: {info.get('description','')}")if'package'in info:print(f" Package: {info['package']}")if'types'in info:print(f" Types: {', '.join(info['types'])}")

1.3 导航栈安装

# 安装导航栈sudoapt-getinstall ros-noetic-navigation # 安装额外的规划器sudoapt-getinstall ros-noetic-teb-local-planner sudoapt-getinstall ros-noetic-dwa-local-planner sudoapt-getinstall ros-noetic-global-planner # 安装可视化工具sudoapt-getinstall ros-noetic-rviz-plugin-tutorials 

2. move_base框架

2.1 move_base架构

#!/usr/bin/env python# -*- coding: utf-8 -*-""" move_base节点接口 """import rospy import actionlib from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal, MoveBaseFeedback from geometry_msgs.msg import PoseStamped, Twist from nav_msgs.msg import Path from actionlib_msgs.msg import GoalStatus classMoveBaseInterface:"""move_base接口封装"""def__init__(self): rospy.init_node('move_base_interface')# Action客户端 self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction)# 等待move_base启动 rospy.loginfo("Waiting for move_base action server...") self.move_base_client.wait_for_server() rospy.loginfo("Connected to move_base server")# 发布者 self.goal_pub = rospy.Publisher('/move_base_simple/goal', PoseStamped, queue_size=1)# 订阅者 self.path_sub = rospy.Subscriber('/move_base/NavfnROS/plan', Path, self.path_callback) self.cmd_vel_sub = rospy.Subscriber('/cmd_vel', Twist, self.cmd_vel_callback) self.feedback_sub = rospy.Subscriber('/move_base/feedback', MoveBaseFeedback, self.feedback_callback)# 状态 self.current_path =None self.current_cmd_vel =None self.current_feedback =Nonedefsend_goal(self, x, y, theta):"""发送导航目标""" goal = MoveBaseGoal()# 设置目标位置 goal.target_pose.header.frame_id ="map" goal.target_pose.header.stamp = rospy.Time.now() goal.target_pose.pose.position.x = x goal.target_pose.pose.position.y = y goal.target_pose.pose.position.z =0.0# 设置目标姿态(四元数)import tf.transformations as tf_trans q = tf_trans.quaternion_from_euler(0,0, theta) goal.target_pose.pose.orientation.x = q[0] goal.target_pose.pose.orientation.y = q[1] goal.target_pose.pose.orientation.z = q[2] goal.target_pose.pose.orientation.w = q[3]# 发送目标 rospy.loginfo(f"Sending goal: ({x:.2f}, {y:.2f}, {theta:.2f})") self.move_base_client.send_goal( goal, done_cb=self.done_callback, active_cb=self.active_callback, feedback_cb=self.feedback_callback_action )return goal defcancel_goal(self):"""取消当前目标""" self.move_base_client.cancel_goal() rospy.loginfo("Goal cancelled")defwait_for_result(self, timeout=None):"""等待导航结果"""if timeout: result = self.move_base_client.wait_for_result( rospy.Duration(timeout))else: result = self.move_base_client.wait_for_result()return result defget_state(self):"""获取当前状态""" state = self.move_base_client.get_state() state_names ={ GoalStatus.PENDING:"PENDING", GoalStatus.ACTIVE:"ACTIVE", GoalStatus.PREEMPTED:"PREEMPTED", GoalStatus.SUCCEEDED:"SUCCEEDED", GoalStatus.ABORTED:"ABORTED", GoalStatus.REJECTED:"REJECTED", GoalStatus.PREEMPTING:"PREEMPTING", GoalStatus.RECALLING:"RECALLING", GoalStatus.RECALLED:"RECALLED", GoalStatus.LOST:"LOST"}return state_names.get(state,"UNKNOWN")defpath_callback(self, msg):"""路径回调""" self.current_path = msg rospy.loginfo(f"Received path with {len(msg.poses)} poses")defcmd_vel_callback(self, msg):"""速度命令回调""" self.current_cmd_vel = msg deffeedback_callback(self, msg):"""反馈回调""" self.current_feedback = msg deffeedback_callback_action(self, feedback):"""Action反馈回调""" current_pose = feedback.base_position.pose rospy.loginfo_throttle(1,f"Current position: ({current_pose.position.x:.2f}, "f"{current_pose.position.y:.2f})")defactive_callback(self):"""目标激活回调""" rospy.loginfo("Goal is now active")defdone_callback(self, state, result):"""完成回调""" state_name = self.get_state() rospy.loginfo(f"Goal finished with state: {state_name}")if state == GoalStatus.SUCCEEDED: rospy.loginfo("Goal reached successfully!")elif state == GoalStatus.ABORTED: rospy.logwarn("Goal was aborted")elif state == GoalStatus.PREEMPTED: rospy.logwarn("Goal was preempted")

2.2 move_base配置

# move_base_params.yamlshutdown_costmaps:falsecontroller_frequency:5.0controller_patience:3.0planner_frequency:1.0planner_patience:5.0oscillation_timeout:10.0oscillation_distance:0.2base_global_planner:"navfn/NavfnROS"base_local_planner:"dwa_local_planner/DWAPlannerROS"recovery_behavior_enabled:trueclearing_rotation_allowed:truerecovery_behaviors:-name:'conservative_reset'type:'clear_costmap_recovery/ClearCostmapRecovery'-name:'rotate_recovery'type:'rotate_recovery/RotateRecovery'-name:'aggressive_reset'type:'clear_costmap_recovery/ClearCostmapRecovery'conservative_reset:reset_distance:3.0aggressive_reset:reset_distance:0.0

3. 代价地图(Costmap)

3.1 代价地图层

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 代价地图配置和管理 """import numpy as np import rospy from nav_msgs.msg import OccupancyGrid from geometry_msgs.msg import Point from sensor_msgs.msg import LaserScan, PointCloud2 classCostmapManager:"""代价地图管理器"""# 代价值定义 COST_UNKNOWN =255# 未知区域 COST_LETHAL =254# 致命障碍物 COST_INSCRIBED =253# 内切圆障碍 COST_FREE =0# 自由空间def__init__(self): self.layers ={'static_layer':{'enabled':True,'type':'costmap_2d::StaticLayer','parameters':{'map_topic':'/map','subscribe_to_updates':True,'track_unknown_space':True,'use_maximum':False,'lethal_cost_threshold':100,'unknown_cost_value':255}},'obstacle_layer':{'enabled':True,'type':'costmap_2d::ObstacleLayer','parameters':{'observation_sources':'laser_scan_sensor point_cloud_sensor','laser_scan_sensor':{'sensor_frame':'laser_link','data_type':'LaserScan','topic':'/scan','marking':True,'clearing':True,'min_obstacle_height':0.0,'max_obstacle_height':2.0,'obstacle_range':2.5,'raytrace_range':3.0},'point_cloud_sensor':{'sensor_frame':'camera_link','data_type':'PointCloud2','topic':'/points','marking':True,'clearing':True,'min_obstacle_height':0.1,'max_obstacle_height':2.0,'obstacle_range':2.5,'raytrace_range':3.0}}},'inflation_layer':{'enabled':True,'type':'costmap_2d::InflationLayer','parameters':{'inflation_radius':0.55,'cost_scaling_factor':10.0}},'social_layer':{'enabled':False,'type':'social_navigation_layers::ProxemicLayer','parameters':{'people_topic':'/people','proxemic_radius':1.2,'cost_scaling_factor':5.0}}}defgenerate_costmap_config(self, namespace='global_costmap'):"""生成代价地图配置""" config ={f'{namespace}':{'global_frame':'map'if'global'in namespace else'odom','robot_base_frame':'base_link','update_frequency':5.0if'global'in namespace else10.0,'publish_frequency':2.0if'global'in namespace else5.0,'static_map':Trueif'global'in namespace elseFalse,'rolling_window':Falseif'global'in namespace elseTrue,'width':10.0if'local'in namespace else100.0,'height':10.0if'local'in namespace else100.0,'resolution':0.05,'origin_x':0.0,'origin_y':0.0,'plugins':[]}}# 添加启用的层for layer_name, layer_info in self.layers.items():if layer_info['enabled']: plugin ={'name': layer_name,'type': layer_info['type']} plugin.update(layer_info['parameters']) config[namespace]['plugins'].append(plugin)return config defcalculate_inflation_cost(self, distance, inflation_radius, cost_scaling_factor):"""计算膨胀代价"""if distance <=0:return self.COST_LETHAL elif distance <= inflation_radius:# 指数衰减函数 factor = np.exp(-1.0* cost_scaling_factor *(distance - inflation_radius)) cost =int(self.COST_INSCRIBED * factor)returnmax(cost, self.COST_FREE)else:return self.COST_FREE defupdate_costmap_params(self, namespace, params):"""动态更新代价地图参数"""import dynamic_reconfigure.client client = dynamic_reconfigure.client.Client(f"/{namespace}", timeout=5)try: client.update_configuration(params) rospy.loginfo(f"Updated {namespace} parameters")except Exception as e: rospy.logerr(f"Failed to update parameters: {e}")defvisualize_costmap(self, costmap_msg):"""可视化代价地图"""import matplotlib.pyplot as plt import matplotlib.colors as colors # 转换为numpy数组 width = costmap_msg.info.width height = costmap_msg.info.height data = np.array(costmap_msg.data).reshape((height, width))# 创建自定义颜色映射 cmap = colors.ListedColormap(['white',# FREE (0)'yellow',# LOW COST'orange',# MEDIUM COST'red',# HIGH COST'black',# LETHAL (254)'gray'# UNKNOWN (255)])# 设置边界 bounds =[0,50,100,200,254,255,256] norm = colors.BoundaryNorm(bounds, cmap.N)# 绘制 plt.figure(figsize=(10,10)) plt.imshow(data, cmap=cmap, norm=norm, origin='lower') plt.colorbar(label='Cost Value') plt.title('Costmap Visualization') plt.xlabel('X (cells)') plt.ylabel('Y (cells)') plt.show()

3.2 代价地图配置文件

# costmap_common_params.yamlfootprint:[[-0.25,-0.15],[-0.25,0.15],[0.25,0.15],[0.25,-0.15]]footprint_padding:0.01robot_base_frame: base_link update_frequency:5.0publish_frequency:2.0transform_tolerance:0.5resolution:0.05obstacle_range:2.5raytrace_range:3.0# 障碍物层参数obstacle_layer:enabled:trueobstacle_range:2.5raytrace_range:3.0inflation_radius:0.2track_unknown_space:truecombination_method:1observation_sources: laser_scan_sensor laser_scan_sensor:sensor_frame: laser_link data_type: LaserScan topic: /scan marking:trueclearing:truemin_obstacle_height:0.0max_obstacle_height:2.0# 膨胀层参数inflation_layer:enabled:trueinflation_radius:0.55cost_scaling_factor:10.0# 静态层参数static_layer:enabled:truemap_topic: /map 

4. 全局路径规划

4.1 全局规划器实现

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 全局路径规划器 """import rospy import numpy as np from nav_msgs.msg import Path, OccupancyGrid from geometry_msgs.msg import PoseStamped import heapq classGlobalPlanner:"""全局路径规划器基类"""def__init__(self): self.costmap =None self.resolution =0.05 self.origin_x =0 self.origin_y =0 self.width =0 self.height =0defset_costmap(self, costmap_msg):"""设置代价地图""" self.costmap = np.array(costmap_msg.data).reshape((costmap_msg.info.height, costmap_msg.info.width)) self.resolution = costmap_msg.info.resolution self.origin_x = costmap_msg.info.origin.position.x self.origin_y = costmap_msg.info.origin.position.y self.width = costmap_msg.info.width self.height = costmap_msg.info.height defworld_to_map(self, wx, wy):"""世界坐标转地图坐标""" mx =int((wx - self.origin_x)/ self.resolution) my =int((wy - self.origin_y)/ self.resolution)return mx, my defmap_to_world(self, mx, my):"""地图坐标转世界坐标""" wx = self.origin_x +(mx +0.5)* self.resolution wy = self.origin_y +(my +0.5)* self.resolution return wx, wy defplan(self, start, goal):"""规划路径(需要子类实现)"""raise NotImplementedError classAStarPlanner(GlobalPlanner):"""A*路径规划器"""def__init__(self):super().__init__() self.cost_threshold =252# 致命障碍物阈值defheuristic(self, a, b):"""启发式函数(欧几里得距离)"""return np.sqrt((a[0]- b[0])**2+(a[1]- b[1])**2)defget_neighbors(self, node):"""获取邻居节点""" neighbors =[]for dx, dy in[(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)]: x, y = node[0]+ dx, node[1]+ dy # 检查边界if0<= x < self.width and0<= y < self.height:# 检查是否可通行if self.costmap[y, x]< self.cost_threshold:# 对角移动的代价更高if dx !=0and dy !=0: cost = np.sqrt(2)* self.resolution else: cost = self.resolution # 加上地图代价 cost += self.costmap[y, x]/255.0 neighbors.append(((x, y), cost))return neighbors defplan(self, start, goal):"""A*路径规划"""if self.costmap isNone: rospy.logwarn("No costmap available")returnNone# 转换为地图坐标 start_map = self.world_to_map(start[0], start[1]) goal_map = self.world_to_map(goal[0], goal[1])# 检查起点和终点是否有效ifnot(0<= start_map[0]< self.width and0<= start_map[1]< self.height): rospy.logwarn("Start position out of bounds")returnNoneifnot(0<= goal_map[0]< self.width and0<= goal_map[1]< self.height): rospy.logwarn("Goal position out of bounds")returnNoneif self.costmap[start_map[1], start_map[0]]>= self.cost_threshold: rospy.logwarn("Start position is in obstacle")returnNoneif self.costmap[goal_map[1], goal_map[0]]>= self.cost_threshold: rospy.logwarn("Goal position is in obstacle")returnNone# A*算法 open_set =[(0, start_map)] came_from ={} g_score ={start_map:0} f_score ={start_map: self.heuristic(start_map, goal_map)}while open_set: current = heapq.heappop(open_set)[1]# 到达目标if current == goal_map:# 重建路径 path =[]while current in came_from: wx, wy = self.map_to_world(current[0], current[1]) path.append((wx, wy)) current = came_from[current] wx, wy = self.map_to_world(start_map[0], start_map[1]) path.append((wx, wy)) path.reverse()return path # 扩展邻居for neighbor, move_cost in self.get_neighbors(current): tentative_g_score = g_score[current]+ move_cost if neighbor notin g_score or tentative_g_score < g_score[neighbor]: came_from[neighbor]= current g_score[neighbor]= tentative_g_score f_score[neighbor]= tentative_g_score + self.heuristic(neighbor, goal_map) heapq.heappush(open_set,(f_score[neighbor], neighbor)) rospy.logwarn("No path found")returnNoneclassDijkstraPlanner(GlobalPlanner):"""Dijkstra路径规划器"""def__init__(self):super().__init__() self.cost_threshold =252defplan(self, start, goal):"""Dijkstra路径规划"""if self.costmap isNone:returnNone# 转换为地图坐标 start_map = self.world_to_map(start[0], start[1]) goal_map = self.world_to_map(goal[0], goal[1])# 初始化 distances = np.full((self.height, self.width), np.inf) distances[start_map[1], start_map[0]]=0 visited = np.zeros((self.height, self.width), dtype=bool) parent ={}# 优先队列 pq =[(0, start_map)]while pq: current_dist, current = heapq.heappop(pq)if visited[current[1], current[0]]:continue visited[current[1], current[0]]=True# 到达目标if current == goal_map:# 重建路径 path =[]while current in parent: wx, wy = self.map_to_world(current[0], current[1]) path.append((wx, wy)) current = parent[current] wx, wy = self.map_to_world(start_map[0], start_map[1]) path.append((wx, wy)) path.reverse()return path # 检查邻居for dx, dy in[(-1,0),(1,0),(0,-1),(0,1)]: nx, ny = current[0]+ dx, current[1]+ dy if(0<= nx < self.width and0<= ny < self.height andnot visited[ny, nx]and self.costmap[ny, nx]< self.cost_threshold): new_dist = current_dist +1+ self.costmap[ny, nx]/255.0if new_dist < distances[ny, nx]: distances[ny, nx]= new_dist parent[(nx, ny)]= current heapq.heappush(pq,(new_dist,(nx, ny)))returnNone

4.2 全局规划器配置

# global_planner_params.yaml# NavfnROS参数NavfnROS:visualize_potential:falseallow_unknown:trueplanner_window_x:0.0planner_window_y:0.0default_tolerance:0.0use_dijkstra:trueuse_quadratic:trueuse_grid_path:falseold_navfn_behavior:false# GlobalPlanner参数GlobalPlanner:old_navfn_behavior:falseuse_quadratic:trueuse_dijkstra:trueuse_grid_path:falseallow_unknown:trueplanner_window_x:0.0planner_window_y:0.0default_tolerance:0.0publish_scale:100planner_costmap_publish_frequency:0.0lethal_cost:253neutral_cost:50cost_factor:3.0publish_potential:trueorientation_mode:0orientation_window_size:1

5. 局部路径规划

5.1 DWA局部规划器

#!/usr/bin/env python# -*- coding: utf-8 -*-""" DWA(动态窗口法)局部规划器 """import rospy import numpy as np from geometry_msgs.msg import Twist, PoseStamped from nav_msgs.msg import Odometry, Path from sensor_msgs.msg import LaserScan classDWAPlanner:"""DWA局部路径规划器"""def__init__(self):# 机器人参数 self.max_vel_x =0.5# 最大线速度 m/s self.min_vel_x =-0.1# 最小线速度 m/s self.max_vel_theta =1.0# 最大角速度 rad/s self.min_vel_theta =-1.0# 最小角速度 rad/s self.acc_lim_x =0.5# 线加速度限制 m/s^2 self.acc_lim_theta =1.0# 角加速度限制 rad/s^2# DWA参数 self.dt =0.1# 时间步长 self.predict_time =3.0# 预测时间 self.velocity_samples_x =20# 线速度采样数 self.velocity_samples_theta =40# 角速度采样数# 评价函数权重 self.path_distance_bias =32.0# 路径距离权重 self.goal_distance_bias =24.0# 目标距离权重 self.occdist_scale =0.01# 障碍物距离权重# 当前状态 self.current_pose =None self.current_velocity =[0,0]# [v, w] self.laser_scan =None self.global_path =Nonedefdynamic_window(self, current_vel):"""计算动态窗口"""# 速度限制窗口 vs =[self.min_vel_x, self.max_vel_x, self.min_vel_theta, self.max_vel_theta]# 加速度限制窗口 vd =[current_vel[0]- self.acc_lim_x * self.dt, current_vel[0]+ self.acc_lim_x * self.dt, current_vel[1]- self.acc_lim_theta * self.dt, current_vel[1]+ self.acc_lim_theta * self.dt]# 动态窗口 = 速度限制 ∩ 加速度限制 dw =[max(vs[0], vd[0]),min(vs[1], vd[1]),max(vs[2], vd[2]),min(vs[3], vd[3])]return dw defpredict_trajectory(self, init_pose, velocity, predict_time):"""预测轨迹""" trajectory =[] pose = init_pose.copy() time_steps =int(predict_time / self.dt)for _ inrange(time_steps):# 更新位置 pose[0]+= velocity[0]* np.cos(pose[2])* self.dt pose[1]+= velocity[0]* np.sin(pose[2])* self.dt pose[2]+= velocity[1]* self.dt trajectory.append(pose.copy())return np.array(trajectory)defcalc_obstacle_cost(self, trajectory, obstacles):"""计算障碍物代价"""if obstacles isNoneorlen(obstacles)==0:return0 min_dist =float('inf')for pose in trajectory:for obs in obstacles: dist = np.sqrt((pose[0]- obs[0])**2+(pose[1]- obs[1])**2)if dist < min_dist: min_dist = dist # 碰撞检测if min_dist <0.2:# 机器人半径returnfloat('inf')return1.0/ min_dist defcalc_goal_cost(self, trajectory, goal):"""计算目标代价""" final_pose = trajectory[-1] goal_dist = np.sqrt((final_pose[0]- goal[0])**2+(final_pose[1]- goal[1])**2)return goal_dist defcalc_path_cost(self, trajectory, path):"""计算路径跟踪代价"""if path isNoneorlen(path)==0:return0 min_dist =float('inf')for pose in trajectory:for path_point in path: dist = np.sqrt((pose[0]- path_point[0])**2+(pose[1]- path_point[1])**2)if dist < min_dist: min_dist = dist return min_dist defevaluate_trajectory(self, trajectory, goal, obstacles, path):"""评价轨迹"""# 计算各项代价 goal_cost = self.calc_goal_cost(trajectory, goal) obs_cost = self.calc_obstacle_cost(trajectory, obstacles) path_cost = self.calc_path_cost(trajectory, path)# 如果会碰撞,返回无穷大代价if obs_cost ==float('inf'):returnfloat('inf')# 总代价 total_cost =(self.goal_distance_bias * goal_cost + self.occdist_scale * obs_cost + self.path_distance_bias * path_cost)return total_cost defsearch_best_trajectory(self, current_pose, current_vel, goal, obstacles, path):"""搜索最优轨迹"""# 计算动态窗口 dw = self.dynamic_window(current_vel) best_trajectory =None min_cost =float('inf') best_velocity =[0,0]# 速度采样for vx in np.linspace(dw[0], dw[1], self.velocity_samples_x):for vtheta in np.linspace(dw[2], dw[3], self.velocity_samples_theta):# 预测轨迹 velocity =[vx, vtheta] trajectory = self.predict_trajectory( current_pose, velocity, self.predict_time)# 评价轨迹 cost = self.evaluate_trajectory( trajectory, goal, obstacles, path)# 更新最优if cost < min_cost: min_cost = cost best_trajectory = trajectory best_velocity = velocity return best_velocity, best_trajectory defcontrol(self, goal):"""DWA控制"""if self.current_pose isNone:return Twist()# 从激光数据提取障碍物 obstacles = self.extract_obstacles()# 搜索最优速度 best_velocity, best_trajectory = self.search_best_trajectory( self.current_pose, self.current_velocity, goal, obstacles, self.global_path )# 生成速度命令 cmd_vel = Twist() cmd_vel.linear.x = best_velocity[0] cmd_vel.angular.z = best_velocity[1]# 更新当前速度 self.current_velocity = best_velocity return cmd_vel defextract_obstacles(self):"""从激光数据提取障碍物"""if self.laser_scan isNone:return[] obstacles =[]for i, range_val inenumerate(self.laser_scan.ranges):if range_val < self.laser_scan.range_max: angle = self.laser_scan.angle_min + i * self.laser_scan.angle_increment # 转换到机器人坐标系 obs_x = range_val * np.cos(angle) obs_y = range_val * np.sin(angle)# 转换到世界坐标系 world_x = self.current_pose[0]+ \ obs_x * np.cos(self.current_pose[2])- \ obs_y * np.sin(self.current_pose[2]) world_y = self.current_pose[1]+ \ obs_x * np.sin(self.current_pose[2])+ \ obs_y * np.cos(self.current_pose[2]) obstacles.append([world_x, world_y])return obstacles 

5.2 局部规划器配置

# dwa_local_planner_params.yamlDWAPlannerROS:# 机器人配置参数max_vel_x:0.5min_vel_x:0.0max_vel_y:0.0min_vel_y:0.0max_vel_trans:0.5min_vel_trans:0.1max_vel_theta:1.0min_vel_theta:0.2acc_lim_x:2.5acc_lim_y:0.0acc_lim_theta:3.2acc_lim_trans:2.5# 目标容差xy_goal_tolerance:0.2yaw_goal_tolerance:0.1latch_xy_goal_tolerance:false# 前向仿真参数sim_time:1.7sim_granularity:0.025vx_samples:20vy_samples:0vtheta_samples:40# 轨迹评分参数path_distance_bias:32.0goal_distance_bias:24.0occdist_scale:0.01forward_point_distance:0.325stop_time_buffer:0.2scaling_speed:0.25max_scaling_factor:0.2# 振荡预防oscillation_reset_dist:0.05# 调试publish_traj_pc:truepublish_cost_grid_pc:true

6. 恢复行为

6.1 恢复行为实现

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 导航恢复行为 """import rospy from geometry_msgs.msg import Twist from std_srvs.srv import Empty import tf2_ros import math classRecoveryBehaviors:"""恢复行为管理"""def__init__(self): rospy.init_node('recovery_behaviors')# 速度发布 self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)# 清除代价地图服务 self.clear_costmap_srv = rospy.ServiceProxy('/move_base/clear_costmaps', Empty)# TF self.tf_buffer = tf2_ros.Buffer() self.tf_listener = tf2_ros.TransformListener(self.tf_buffer)# 恢复行为列表 self.behaviors ={'rotate_recovery': self.rotate_recovery,'clear_costmap': self.clear_costmap_recovery,'backup_recovery': self.backup_recovery,'aggressive_reset': self.aggressive_reset }defrotate_recovery(self, angle=2*math.pi, speed=0.5):"""旋转恢复""" rospy.loginfo("Executing rotate recovery...")# 计算旋转时间 duration =abs(angle / speed)# 发送旋转命令 cmd = Twist() cmd.angular.z = speed if angle >0else-speed start_time = rospy.Time.now() rate = rospy.Rate(10)while(rospy.Time.now()- start_time).to_sec()< duration: self.cmd_vel_pub.publish(cmd) rate.sleep()# 停止 self.cmd_vel_pub.publish(Twist()) rospy.loginfo("Rotate recovery completed")returnTruedefclear_costmap_recovery(self):"""清除代价地图恢复""" rospy.loginfo("Executing clear costmap recovery...")try: self.clear_costmap_srv() rospy.loginfo("Costmaps cleared successfully")returnTrueexcept rospy.ServiceException as e: rospy.logerr(f"Failed to clear costmaps: {e}")returnFalsedefbackup_recovery(self, distance=0.3, speed=0.1):"""后退恢复""" rospy.loginfo("Executing backup recovery...")# 计算后退时间 duration =abs(distance / speed)# 发送后退命令 cmd = Twist() cmd.linear.x =-abs(speed) start_time = rospy.Time.now() rate = rospy.Rate(10)while(rospy.Time.now()- start_time).to_sec()< duration: self.cmd_vel_pub.publish(cmd) rate.sleep()# 停止 self.cmd_vel_pub.publish(Twist()) rospy.loginfo("Backup recovery completed")returnTruedefaggressive_reset(self):"""激进重置""" rospy.loginfo("Executing aggressive reset...")# 1. 停止机器人 self.cmd_vel_pub.publish(Twist()) rospy.sleep(0.5)# 2. 清除代价地图 self.clear_costmap_recovery()# 3. 小幅度旋转 self.rotate_recovery(angle=0.5, speed=0.3)# 4. 再次清除代价地图 self.clear_costmap_recovery() rospy.loginfo("Aggressive reset completed")returnTruedefexecute_behavior_sequence(self, sequence):"""执行恢复行为序列""" rospy.loginfo(f"Executing recovery sequence: {sequence}")for behavior_name in sequence:if behavior_name in self.behaviors: success = self.behaviors[behavior_name]()ifnot success: rospy.logwarn(f"Recovery behavior '{behavior_name}' failed")returnFalse# 行为间延迟 rospy.sleep(0.5)else: rospy.logwarn(f"Unknown recovery behavior: {behavior_name}")returnTruedefmonitor_and_recover(self):"""监控并执行恢复"""# 标准恢复序列 recovery_sequences =[['clear_costmap'],['rotate_recovery','clear_costmap'],['backup_recovery','rotate_recovery','clear_costmap'],['aggressive_reset']] recovery_level =0whilenot rospy.is_shutdown():# 检查是否需要恢复(这里需要实现具体的检测逻辑)if self.need_recovery():if recovery_level <len(recovery_sequences): sequence = recovery_sequences[recovery_level] success = self.execute_behavior_sequence(sequence)if success: recovery_level =0# 重置恢复级别else: recovery_level +=1# 升级恢复级别else: rospy.logerr("All recovery behaviors failed!") recovery_level =0 rospy.sleep(1.0)defneed_recovery(self):"""检查是否需要恢复"""# 这里应该实现具体的检测逻辑# 例如:检查机器人是否卡住、路径是否被阻塞等returnFalse

7. 导航配置与调优

7.1 导航参数调优

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 导航参数自动调优 """import rospy import dynamic_reconfigure.client import numpy as np from move_base_msgs.msg import MoveBaseActionResult from geometry_msgs.msg import PoseWithCovarianceStamped classNavigationTuner:"""导航参数调优器"""def__init__(self):# 参数范围 self.param_ranges ={'max_vel_x':(0.1,1.0),'max_vel_theta':(0.2,2.0),'acc_lim_x':(0.5,3.0),'acc_lim_theta':(0.5,4.0),'xy_goal_tolerance':(0.05,0.5),'yaw_goal_tolerance':(0.05,0.5),'path_distance_bias':(10.0,100.0),'goal_distance_bias':(10.0,100.0),'occdist_scale':(0.001,0.1),'inflation_radius':(0.3,1.0),'cost_scaling_factor':(1.0,20.0)}# 性能指标 self.metrics ={'success_rate':0,'avg_time':0,'avg_path_length':0,'collision_count':0,'oscillation_count':0}# 动态配置客户端 self.dwa_client = dynamic_reconfigure.client.Client("/move_base/DWAPlannerROS") self.costmap_client = dynamic_reconfigure.client.Client("/move_base/global_costmap/inflation_layer")defevaluate_performance(self, test_goals):"""评估导航性能""" results =[]for goal in test_goals: start_time = rospy.Time.now()# 发送目标 success = self.send_goal_and_wait(goal) end_time = rospy.Time.now() duration =(end_time - start_time).to_sec() results.append({'success': success,'time': duration,'goal': goal })# 计算指标 success_count =sum(1for r in results if r['success']) self.metrics['success_rate']= success_count /len(results)if success_count >0: successful_results =[r for r in results if r['success']] self.metrics['avg_time']= np.mean([r['time']for r in successful_results])return self.metrics defgrid_search(self, param_names, test_goals, resolution=3):"""网格搜索最优参数""" best_params ={} best_score =-float('inf')# 生成参数网格 param_grid = self.generate_param_grid(param_names, resolution)for params in param_grid:# 应用参数 self.apply_params(params)# 评估性能 metrics = self.evaluate_performance(test_goals)# 计算得分 score = self.calculate_score(metrics)# 更新最优if score > best_score: best_score = score best_params = params.copy() rospy.loginfo(f"Params: {params}, Score: {score}")return best_params, best_score defgenerate_param_grid(self, param_names, resolution):"""生成参数网格""" grid =[]# 简化:只生成每个参数的几个值for param_name in param_names:if param_name in self.param_ranges: min_val, max_val = self.param_ranges[param_name] values = np.linspace(min_val, max_val, resolution)for value in values: grid.append({param_name: value})return grid defcalculate_score(self, metrics):"""计算性能得分"""# 加权组合多个指标 score =(metrics['success_rate']*100- metrics['avg_time']*0.1- metrics['collision_count']*10- metrics['oscillation_count']*5)return score defapply_params(self, params):"""应用参数"""try:# 分离DWA参数和代价地图参数 dwa_params ={} costmap_params ={}for key, value in params.items():if key in['inflation_radius','cost_scaling_factor']: costmap_params[key]= value else: dwa_params[key]= value # 应用参数if dwa_params: self.dwa_client.update_configuration(dwa_params)if costmap_params: self.costmap_client.update_configuration(costmap_params) rospy.sleep(0.5)# 等待参数生效except Exception as e: rospy.logerr(f"Failed to apply params: {e}")defadaptive_tuning(self):"""自适应参数调整"""# 监控导航性能 performance_window =[] window_size =10whilenot rospy.is_shutdown():# 获取最新性能数据 current_performance = self.get_current_performance() performance_window.append(current_performance)iflen(performance_window)> window_size: performance_window.pop(0)# 分析性能趋势iflen(performance_window)>= window_size: trend = self.analyze_trend(performance_window)# 根据趋势调整参数if trend =='too_slow': self.increase_speed()elif trend =='too_aggressive': self.decrease_speed()elif trend =='oscillating': self.reduce_oscillation() rospy.sleep(1.0)defincrease_speed(self):"""提高速度""" current = self.dwa_client.get_configuration() new_params ={'max_vel_x':min(current['max_vel_x']*1.1, self.param_ranges['max_vel_x'][1]),'max_vel_theta':min(current['max_vel_theta']*1.1, self.param_ranges['max_vel_theta'][1])} self.dwa_client.update_configuration(new_params) rospy.loginfo("Increased speed parameters")defdecrease_speed(self):"""降低速度""" current = self.dwa_client.get_configuration() new_params ={'max_vel_x':max(current['max_vel_x']*0.9, self.param_ranges['max_vel_x'][0]),'max_vel_theta':max(current['max_vel_theta']*0.9, self.param_ranges['max_vel_theta'][0])} self.dwa_client.update_configuration(new_params) rospy.loginfo("Decreased speed parameters")defreduce_oscillation(self):"""减少振荡""" current = self.dwa_client.get_configuration() new_params ={'oscillation_reset_dist': current.get('oscillation_reset_dist',0.05)*1.2,'path_distance_bias': current.get('path_distance_bias',32.0)*1.1} self.dwa_client.update_configuration(new_params) rospy.loginfo("Adjusted oscillation parameters")

8. 导航目标发送

8.1 多目标导航

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 多目标导航管理 """import rospy import actionlib from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal from geometry_msgs.msg import PoseStamped from visualization_msgs.msg import Marker, MarkerArray import yaml classMultiGoalNavigation:"""多目标导航管理器"""def__init__(self): rospy.init_node('multi_goal_navigation')# Action客户端 self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction) self.move_base_client.wait_for_server()# 目标点管理 self.goals =[] self.current_goal_index =0 self.patrol_mode =False# 可视化 self.marker_pub = rospy.Publisher('/navigation_goals', MarkerArray, queue_size=1)# 目标订阅 self.goal_sub = rospy.Subscriber('/clicked_point', PointStamped, self.add_goal_callback)# 加载预设目标点 self.load_goals_from_file()defload_goals_from_file(self, filename='goals.yaml'):"""从文件加载目标点"""try:withopen(filename,'r')as f: data = yaml.safe_load(f) self.goals =[]for goal_data in data['goals']: goal = PoseStamped() goal.header.frame_id ='map' goal.pose.position.x = goal_data['x'] goal.pose.position.y = goal_data['y'] goal.pose.position.z =0# 四元数 goal.pose.orientation.x = goal_data.get('qx',0) goal.pose.orientation.y = goal_data.get('qy',0) goal.pose.orientation.z = goal_data.get('qz',0) goal.pose.orientation.w = goal_data.get('qw',1) self.goals.append(goal) rospy.loginfo(f"Loaded {len(self.goals)} goals from file")except Exception as e: rospy.logwarn(f"Failed to load goals: {e}")defsave_goals_to_file(self, filename='goals.yaml'):"""保存目标点到文件""" data ={'goals':[]}for goal in self.goals: goal_data ={'x': goal.pose.position.x,'y': goal.pose.position.y,'qx': goal.pose.orientation.x,'qy': goal.pose.orientation.y,'qz': goal.pose.orientation.z,'qw': goal.pose.orientation.w } data['goals'].append(goal_data)try:withopen(filename,'w')as f: yaml.dump(data, f) rospy.loginfo(f"Saved {len(self.goals)} goals to file")except Exception as e: rospy.logerr(f"Failed to save goals: {e}")defadd_goal(self, pose_stamped):"""添加目标点""" self.goals.append(pose_stamped) self.visualize_goals() rospy.loginfo(f"Added goal #{len(self.goals)}")defremove_goal(self, index):"""移除目标点"""if0<= index <len(self.goals): self.goals.pop(index) self.visualize_goals() rospy.loginfo(f"Removed goal #{index}")defclear_goals(self):"""清除所有目标点""" self.goals =[] self.current_goal_index =0 self.visualize_goals() rospy.loginfo("Cleared all goals")defstart_navigation(self):"""开始导航"""ifnot self.goals: rospy.logwarn("No goals to navigate")return self.current_goal_index =0 self.send_next_goal()defsend_next_goal(self):"""发送下一个目标"""if self.current_goal_index >=len(self.goals):if self.patrol_mode: self.current_goal_index =0 rospy.loginfo("Restarting patrol from first goal")else: rospy.loginfo("All goals completed")return goal = self.goals[self.current_goal_index]# 转换为MoveBaseGoal move_base_goal = MoveBaseGoal() move_base_goal.target_pose = goal move_base_goal.target_pose.header.stamp = rospy.Time.now() rospy.loginfo(f"Sending goal #{self.current_goal_index +1}/{len(self.goals)}") self.move_base_client.send_goal( move_base_goal, done_cb=self.goal_done_callback )defgoal_done_callback(self, state, result):"""目标完成回调"""if state == actionlib.GoalStatus.SUCCEEDED: rospy.loginfo(f"Goal #{self.current_goal_index +1} reached successfully") self.current_goal_index +=1 self.send_next_goal()else: rospy.logwarn(f"Failed to reach goal #{self.current_goal_index +1}")# 可选:重试或跳过 retry = rospy.get_param('~retry_failed_goals',False)ifnot retry: self.current_goal_index +=1 self.send_next_goal()defvisualize_goals(self):"""可视化目标点""" marker_array = MarkerArray()# 清除旧标记 clear_marker = Marker() clear_marker.header.frame_id ='map' clear_marker.action = Marker.DELETEALL marker_array.markers.append(clear_marker)# 添加目标标记for i, goal inenumerate(self.goals):# 目标点标记 marker = Marker() marker.header.frame_id ='map' marker.header.stamp = rospy.Time.now() marker.ns ='goals' marker.id= i marker.type= Marker.ARROW marker.action = Marker.ADD marker.pose = goal.pose marker.scale.x =0.5 marker.scale.y =0.1 marker.scale.z =0.1# 当前目标用不同颜色if i == self.current_goal_index: marker.color.r =0.0 marker.color.g =1.0 marker.color.b =0.0else: marker.color.r =1.0 marker.color.g =0.0 marker.color.b =0.0 marker.color.a =1.0 marker_array.markers.append(marker)# 文字标记 text_marker = Marker() text_marker.header = marker.header text_marker.ns ='goal_texts' text_marker.id= i +1000 text_marker.type= Marker.TEXT_VIEW_FACING text_marker.action = Marker.ADD text_marker.pose = goal.pose text_marker.pose.position.z +=0.5 text_marker.text =f"Goal {i+1}" text_marker.scale.z =0.3 text_marker.color.r =1.0 text_marker.color.g =1.0 text_marker.color.b =1.0 text_marker.color.a =1.0 marker_array.markers.append(text_marker)# 连接线iflen(self.goals)>1: line_marker = Marker() line_marker.header.frame_id ='map' line_marker.header.stamp = rospy.Time.now() line_marker.ns ='path' line_marker.id=2000 line_marker.type= Marker.LINE_STRIP line_marker.action = Marker.ADD for goal in self.goals: line_marker.points.append(goal.pose.position)if self.patrol_mode:# 闭合路径 line_marker.points.append(self.goals[0].pose.position) line_marker.scale.x =0.05 line_marker.color.r =0.0 line_marker.color.g =0.0 line_marker.color.b =1.0 line_marker.color.a =0.5 marker_array.markers.append(line_marker) self.marker_pub.publish(marker_array)defset_patrol_mode(self, enabled):"""设置巡逻模式""" self.patrol_mode = enabled mode ="enabled"if enabled else"disabled" rospy.loginfo(f"Patrol mode {mode}") self.visualize_goals()

9. 多机器人导航

9.1 多机器人协调

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 多机器人导航协调 """import rospy from geometry_msgs.msg import PoseStamped, Twist from nav_msgs.msg import Path import threading classMultiRobotCoordinator:"""多机器人协调器"""def__init__(self, robot_names): rospy.init_node('multi_robot_coordinator') self.robots ={}# 为每个机器人创建接口for name in robot_names: self.robots[name]={'namespace': name,'pose':None,'path':None,'goal':None,'status':'idle','priority':0}# 订阅位姿 pose_sub = rospy.Subscriber(f'/{name}/amcl_pose', PoseWithCovarianceStamped,lambda msg, n=name: self.pose_callback(msg, n))# 订阅路径 path_sub = rospy.Subscriber(f'/{name}/move_base/NavfnROS/plan', Path,lambda msg, n=name: self.path_callback(msg, n))# 目标发布 self.robots[name]['goal_pub']= rospy.Publisher(f'/{name}/move_base_simple/goal', PoseStamped, queue_size=1)# 速度控制 self.robots[name]['cmd_vel_pub']= rospy.Publisher(f'/{name}/cmd_vel', Twist, queue_size=1)# 冲突检测线程 self.conflict_thread = threading.Thread(target=self.conflict_detection_loop) self.conflict_thread.start()defpose_callback(self, msg, robot_name):"""位姿回调""" self.robots[robot_name]['pose']= msg.pose.pose defpath_callback(self, msg, robot_name):"""路径回调""" self.robots[robot_name]['path']= msg defsend_goal(self, robot_name, goal):"""发送目标"""if robot_name in self.robots: self.robots[robot_name]['goal']= goal self.robots[robot_name]['goal_pub'].publish(goal) self.robots[robot_name]['status']='navigating' rospy.loginfo(f"Sent goal to {robot_name}")defstop_robot(self, robot_name):"""停止机器人"""if robot_name in self.robots: stop_cmd = Twist() self.robots[robot_name]['cmd_vel_pub'].publish(stop_cmd) self.robots[robot_name]['status']='stopped' rospy.loginfo(f"Stopped {robot_name}")defconflict_detection_loop(self):"""冲突检测循环""" rate = rospy.Rate(10)whilenot rospy.is_shutdown(): self.detect_and_resolve_conflicts() rate.sleep()defdetect_and_resolve_conflicts(self):"""检测并解决冲突""" robot_names =list(self.robots.keys())for i inrange(len(robot_names)):for j inrange(i +1,len(robot_names)): robot1 = robot_names[i] robot2 = robot_names[j]# 检查路径冲突if self.check_path_conflict(robot1, robot2): self.resolve_conflict(robot1, robot2)defcheck_path_conflict(self, robot1, robot2):"""检查路径冲突"""if(self.robots[robot1]['path']isNoneor self.robots[robot2]['path']isNone):returnFalse path1 = self.robots[robot1]['path'].poses path2 = self.robots[robot2]['path'].poses # 检查路径交叉 min_distance =0.5# 最小安全距离for pose1 in path1[:20]:# 只检查近期路径for pose2 in path2[:20]: dx = pose1.pose.position.x - pose2.pose.position.x dy = pose1.pose.position.y - pose2.pose.position.y distance =(dx**2+ dy**2)**0.5if distance < min_distance:returnTruereturnFalsedefresolve_conflict(self, robot1, robot2):"""解决冲突""" rospy.logwarn(f"Conflict detected between {robot1} and {robot2}")# 基于优先级解决if self.robots[robot1]['priority']> self.robots[robot2]['priority']:# robot1优先,robot2等待 self.stop_robot(robot2) rospy.loginfo(f"{robot2} yielding to {robot1}")elif self.robots[robot2]['priority']> self.robots[robot1]['priority']:# robot2优先,robot1等待 self.stop_robot(robot1) rospy.loginfo(f"{robot1} yielding to {robot2}")else:# 相同优先级,基于距离决定if self.get_goal_distance(robot1)< self.get_goal_distance(robot2): self.stop_robot(robot2)else: self.stop_robot(robot1)defget_goal_distance(self, robot_name):"""获取到目标的距离""" robot = self.robots[robot_name]if robot['pose']isNoneor robot['goal']isNone:returnfloat('inf') dx = robot['goal'].pose.position.x - robot['pose'].position.x dy = robot['goal'].pose.position.y - robot['pose'].position.y return(dx**2+ dy**2)**0.5defcoordinate_formation(self, formation_type='line'):"""编队控制""" robot_names =list(self.robots.keys())if formation_type =='line':# 直线编队 spacing =1.0for i, name inenumerate(robot_names): offset_x = i * spacing offset_y =0# 计算编队位置...elif formation_type =='triangle':# 三角编队passelif formation_type =='square':# 方形编队pass

10. 实战案例:完整导航系统

10.1 完整导航系统实现

#!/usr/bin/env python# -*- coding: utf-8 -*-""" 完整的自主导航系统 """import rospy import actionlib from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped from nav_msgs.msg import OccupancyGrid, Path from sensor_msgs.msg import LaserScan from std_srvs.srv import Empty import tf2_ros import numpy as np classAutonomousNavigationSystem:"""自主导航系统"""def__init__(self): rospy.init_node('autonomous_navigation_system')# 系统组件 self.components ={'localization':False,'map':False,'move_base':False,'sensors':False}# 初始化组件 self.setup_localization() self.setup_navigation() self.setup_sensors() self.setup_recovery()# 状态机 self.state ='idle' self.state_machine ={'idle': self.idle_state,'localizing': self.localizing_state,'planning': self.planning_state,'navigating': self.navigating_state,'recovering': self.recovering_state,'reached': self.reached_state }# 主循环 self.main_loop()defsetup_localization(self):"""设置定位"""# 初始位姿发布 self.initial_pose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=1)# AMCL位姿订阅 self.amcl_pose_sub = rospy.Subscriber('/amcl_pose', PoseWithCovarianceStamped, self.amcl_pose_callback)# TF self.tf_buffer = tf2_ros.Buffer() self.tf_listener = tf2_ros.TransformListener(self.tf_buffer) self.current_pose =None self.localization_confidence =0defsetup_navigation(self):"""设置导航"""# Move base客户端 self.move_base_client = actionlib.SimpleActionClient('move_base', MoveBaseAction) rospy.loginfo("Waiting for move_base...")if self.move_base_client.wait_for_server(rospy.Duration(5)): self.components['move_base']=True rospy.loginfo("Connected to move_base")else: rospy.logwarn("move_base not available")# 地图订阅 self.map_sub = rospy.Subscriber('/map', OccupancyGrid, self.map_callback)# 路径订阅 self.global_path_sub = rospy.Subscriber('/move_base/NavfnROS/plan', Path, self.global_path_callback) self.local_path_sub = rospy.Subscriber('/move_base/DWAPlannerROS/local_plan', Path, self.local_path_callback) self.map_data =None self.global_path =None self.local_path =Nonedefsetup_sensors(self):"""设置传感器"""# 激光扫描 self.scan_sub = rospy.Subscriber('/scan', LaserScan, self.scan_callback) self.latest_scan =None# 传感器状态检查 rospy.Timer(rospy.Duration(1.0), self.check_sensors)defsetup_recovery(self):"""设置恢复行为"""# 清除代价地图服务 self.clear_costmaps_srv = rospy.ServiceProxy('/move_base/clear_costmaps', Empty)# 恢复行为计数 self.recovery_attempts =0 self.max_recovery_attempts =3defamcl_pose_callback(self, msg):"""AMCL位姿回调""" self.current_pose = msg.pose.pose # 计算定位置信度 covariance = np.array(msg.pose.covariance).reshape(6,6) position_variance = covariance[0,0]+ covariance[1,1]if position_variance <0.1: self.localization_confidence =1.0elif position_variance <0.5: self.localization_confidence =0.5else: self.localization_confidence =0.1if self.localization_confidence >0.5: self.components['localization']=Truedefmap_callback(self, msg):"""地图回调""" self.map_data = msg self.components['map']=True rospy.loginfo("Map received")defscan_callback(self, msg):"""激光扫描回调""" self.latest_scan = msg self.components['sensors']=Truedefglobal_path_callback(self, msg):"""全局路径回调""" self.global_path = msg deflocal_path_callback(self, msg):"""局部路径回调""" self.local_path = msg defcheck_sensors(self, event):"""检查传感器状态"""if self.latest_scan isNone: self.components['sensors']=False rospy.logwarn("No laser scan data")# 检查扫描数据质量if self.latest_scan: valid_ranges =[r for r in self.latest_scan.ranges if r > self.latest_scan.range_min and r < self.latest_scan.range_max]iflen(valid_ranges)<len(self.latest_scan.ranges)*0.5: rospy.logwarn("Poor scan quality")defsystem_check(self):"""系统检查""" all_ready =all(self.components.values())ifnot all_ready: rospy.logwarn("System not ready:")for component, status in self.components.items():ifnot status: rospy.logwarn(f" - {component}: NOT READY")return all_ready defnavigate_to_goal(self, goal_pose):"""导航到目标"""ifnot self.system_check(): rospy.logerr("System not ready for navigation")returnFalse# 创建目标 goal = MoveBaseGoal() goal.target_pose.header.frame_id ='map' goal.target_pose.header.stamp = rospy.Time.now() goal.target_pose.pose = goal_pose # 发送目标 self.move_base_client.send_goal(goal) self.state ='navigating'returnTruedefcancel_navigation(self):"""取消导航""" self.move_base_client.cancel_all_goals() self.state ='idle' rospy.loginfo("Navigation cancelled")# 状态机函数defidle_state(self):"""空闲状态"""passdeflocalizing_state(self):"""定位状态"""if self.localization_confidence >0.8: self.state ='idle' rospy.loginfo("Localization complete")defplanning_state(self):"""规划状态"""if self.global_path isnotNone: self.state ='navigating'defnavigating_state(self):"""导航状态""" state = self.move_base_client.get_state()if state == actionlib.GoalStatus.SUCCEEDED: self.state ='reached' self.recovery_attempts =0 rospy.loginfo("Goal reached!")elif state == actionlib.GoalStatus.ABORTED: self.state ='recovering' rospy.logwarn("Navigation aborted, attempting recovery")defrecovering_state(self):"""恢复状态"""if self.recovery_attempts < self.max_recovery_attempts:# 执行恢复行为 self.execute_recovery() self.recovery_attempts +=1 self.state ='navigating'else: self.state ='idle' rospy.logerr("Max recovery attempts reached")defreached_state(self):"""到达状态""" rospy.sleep(1.0) self.state ='idle'defexecute_recovery(self):"""执行恢复行为""" rospy.loginfo(f"Executing recovery behavior {self.recovery_attempts +1}")# 清除代价地图try: self.clear_costmaps_srv() rospy.loginfo("Costmaps cleared")except: rospy.logwarn("Failed to clear costmaps")# 旋转恢复if self.recovery_attempts >0: self.rotate_recovery()defrotate_recovery(self):"""旋转恢复""" cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)# 旋转360度 cmd = Twist() cmd.angular.z =0.5 duration =2* np.pi /0.5# 完整旋转时间 start_time = rospy.Time.now() rate = rospy.Rate(10)while(rospy.Time.now()- start_time).to_sec()< duration: cmd_vel_pub.publish(cmd) rate.sleep()# 停止 cmd_vel_pub.publish(Twist())defmain_loop(self):"""主循环""" rate = rospy.Rate(10)whilenot rospy.is_shutdown():# 执行当前状态if self.state in self.state_machine: self.state_machine[self.state]() rate.sleep()defmain():"""主函数""" nav_system = AutonomousNavigationSystem()# 示例:导航到目标 goal_pose = Pose() goal_pose.position.x =5.0 goal_pose.position.y =5.0 goal_pose.orientation.w =1.0 nav_system.navigate_to_goal(goal_pose) rospy.spin()if __name__ =='__main__': main()

11. 总结与最佳实践

11.1 本文总结

通过本文的学习,你已经掌握了:

  1. ROS导航栈架构:理解导航系统的整体结构
  2. move_base框架:掌握导航核心节点的使用
  3. 代价地图机制:配置和优化代价地图
  4. 全局路径规划:实现A*、Dijkstra等算法
  5. 局部路径规划:掌握DWA等局部规划器
  6. 恢复行为:处理导航失败情况
  7. 参数调优:优化导航性能
  8. 多目标导航:管理多个导航目标
  9. 多机器人协调:实现多机器人导航
  10. 完整系统:构建完整的导航系统

11.2 最佳实践

方面建议原因
参数调优从保守参数开始确保安全性
代价地图适当的膨胀半径平衡安全和通过性
规划频率全局1Hz,局部10Hz平衡计算负载
恢复行为逐级升级策略提高成功率
传感器多传感器融合提高鲁棒性
目标容差合理设置容差避免无法到达

11.3 常见问题解决

  1. 机器人原地打转
    • 检查代价地图配置
    • 调整振荡参数
    • 增加路径跟踪权重
  2. 无法到达目标
    • 检查目标是否在障碍物中
    • 调整目标容差
    • 检查代价地图膨胀
  3. 路径规划失败
    • 确认地图完整性
    • 检查起点和终点
    • 调整规划器参数
  4. 导航速度过慢
    • 增加最大速度限制
    • 优化加速度参数
    • 减少安全裕度

11.4 下一步学习

在下一篇文章中,我们将学习:

  • SLAM建图:同时定位与地图构建
  • gmapping:基于粒子滤波的SLAM
  • cartographer:Google的SLAM解决方案
  • 建图优化:提高地图质量

版权声明:本文为原创文章,转载请注明出处

💡 学习建议:导航是移动机器人最核心的功能之一。建议先在仿真环境中充分测试,熟悉各个参数的影响。实际部署时,要根据具体的机器人和环境特点进行细致的调优。记住,没有通用的最佳参数,需要根据实际情况不断调整。

下一篇预告:《【ROS1从入门到精通】第13篇:SLAM建图(同时定位与地图构建)》

敬请期待!

Read more

AI 大模型应用后端开发,2026 年最新零基础入门路线,少走 3 年弯路

导语:别让「算法陷阱」,毁掉你的AI入行路 2026年,AI大模型的浪潮已经从技术圈彻底席卷至全行业,92%的科技企业已完成大模型架构的业务嵌入,大模型应用后端开发岗位的市场需求较2023年暴增340%,20K起步的校招薪资、50W+的社招年薪早已成为行业常态。 但我接触过上千名想入行的零基础学习者,90%的人都倒在了入门的路上,白白浪费了1-3年的时间,核心原因只有一个:从一开始就走错了路,掉进了「上来就学算法」的新手致命陷阱里。 很多人天然觉得,做AI大模型开发,就得先啃高等数学、线性代数、概率论,就得手撕Transformer源码、搞懂反向传播、学预训练和模型微调。结果学了半年,连一个最简单的AI对话接口都写不出来,更别说做能上线的企业级产品,最后只能自我怀疑「我是不是不是这块料」,直接放弃。 今天这篇文章,我结合2026年最新的行业招聘标准、一线大厂技术落地实践、开源社区技术演进趋势,给你一套零基础可复制、零算法门槛、少走3年弯路的大模型应用后端开发入门路线。 全文会严格区分「必学/选学」内容,拆解最科学的技术栈学习顺序,给你精确到每周的3个月落地学习计划,只要你会用

大语言模型LLM解决AI幻觉方法的深度分析

大语言模型LLM解决AI幻觉方法的深度分析

LLM解决AI幻觉方法的深度分析 引言:AI幻觉的定义与研究背景 AI 幻觉作为大型语言模型(LLM)部署的核心挑战,其学术价值体现于对模型"概率生成天性"的机制探索(如 OpenAI 2025 年论文《Why Language Models Hallucinate》揭示的底层逻辑),产业意义则关乎医疗、金融等关键领域的安全应用[1]。当前研究显示,即使开发团队对 LLM 内部运作的理解仍局限于 10%~20%(Anthropic 团队研究),但该现象已引发信息污染、信任危机等风险,同时在科学发现等领域展现创造力价值,成为 AI 可靠性研究的焦点[2][3][4]。 AI 幻觉的权威分类: * 事实性幻觉:生成内容与客观事实冲突,例如错误声称"蜂蜜可帮助糖尿病患者稳定血糖"[2]

百度搜索AI开放计划:助力开发者通过MCP Server连接用户和应用

百度搜索AI开放计划:助力开发者通过MCP Server连接用户和应用

百度搜索AI开放计划:助力开发者通过MCP Server连接用户和应用 一、背景 2025年4月25日,百度在Create开发者大会上发布了全新的AI开放计划。这一计划的核心目的是实现用户和AI应用、MCP Server的高效链接,提供更流畅的互动体验,推动技术服务闭环。百度通过免费、开放的方式邀请全球开发者共同打造AI应用与服务,提供流量支持和商业化路径,为开发者创造更多机会。对于用户来说,可以快速找到满足需求的AI应用,快速完成任务闭环。 打开百度搜索开放平台:https://sai.baidu.com,我们可以看到平台主要分为三大板块:应用与MCP广场,以及帮助文档中心。其中,MCP广场支持智能搜索与推荐功能,让我们一起来详细了解。 二、百度搜索打造更懂开发者的MCP Server检索工具 近期,MCP生态迎来了大爆发,海量的MCP Server汇聚在一起。要找到一款真正解决问题的MCP Server,需要筛选大量信息才能做出决策。然而,我发现在Create开发者大会上发布的MCP广场拥有深度搜索功能,这对开发者来说是一个巨大的便利。 百度搜索通过全新的AI开放计

【AI实战教程】Nanobot实战教程:基于vLLM部署的智能QQ聊天机器人

【AI实战教程】Nanobot实战教程:基于vLLM部署的智能QQ聊天机器人

🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流 📝个人主页-Sonhhxg_柒的博客_ZEEKLOG博客 📃 🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝 📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】  深度学习【DL】  🖍foreword ✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。 如果你对这个系列感兴趣的话,可以关注订阅哟👋 一、前言 在大模型时代,拥有一个个人专属的AI助手已经不再是遥不可及的梦想。今天,我将为大家带来一份详尽的实战教程,教你如何使用Nanobot构建一个基于vLLM部署的智能QQ聊天机器人。 Nanobot是由香港大学数据科学研究所开发的一款超轻量级AI智能体,灵感来自OpenClaw。它的设计理念是"让AI触手可及",通过极简的代码实现强大的功能。Nanobot的关键特性包括: