全国大学生智能车竞赛智慧医疗机器人惯导与避障思路分享
前言 在第 20 届全国大学生智能车竞赛(智慧医疗机器人创意赛)中,我们团队获得了国一。作为队长兼技术主力,我想分享一下在备赛过程中的一些思路。为了保持比赛的公平性和竞争性,部分核心代码(如惯导和避障思路)未开源,但主要技术逻辑如下。 记录了备赛过程中的关键经验,包括网络问题优化、上位机辅助处理、半场扫码策略、P 点准确返回方法、STM32 源码修改以及数据处理脚本。 网络问题 第一年参赛时,…

前言 在第 20 届全国大学生智能车竞赛(智慧医疗机器人创意赛)中,我们团队获得了国一。作为队长兼技术主力,我想分享一下在备赛过程中的一些思路。为了保持比赛的公平性和竞争性,部分核心代码(如惯导和避障思路)未开源,但主要技术逻辑如下。 记录了备赛过程中的关键经验,包括网络问题优化、上位机辅助处理、半场扫码策略、P 点准确返回方法、STM32 源码修改以及数据处理脚本。 网络问题 第一年参赛时,…

在第 20 届全国大学生智能车竞赛(智慧医疗机器人创意赛)中,我们团队获得了国一。作为队长兼技术主力,我想分享一下在备赛过程中的一些思路。为了保持比赛的公平性和竞争性,部分核心代码(如惯导和避障思路)未开源,但主要技术逻辑如下。
本文记录了备赛过程中的关键经验,包括网络问题优化、上位机辅助处理、半场扫码策略、P 点准确返回方法、STM32 源码修改以及数据处理脚本。
第一年参赛时,我们遇到了严重的网络延迟问题。第二年备赛,我们非常重视网络稳定性。
初期使用华为 AX3 Pro,在校赛时表现良好。校赛夺冠后,我们升级了设备,最终选用华为 BE7 Pro。在实验室调试时,该路由器无延迟。赛场上我们使用了 165 信道,该信道干扰较少。虽然区域赛前调试发现部分设备(如 ROG 路由器)在 165 信道有延迟,但整体表现稳定。

图 1 华为 BE7 Pro 路由器

图 2 华为 BE7 Pro 路由器
建议上位机使用网线连接路由器,避免使用板载无线网卡。在区域赛调试时,曾尝试使用路由器的中继模式上网,但开启中继后无法修改信道,导致第 3 轮出现严重延迟。最终放弃云端 API,改用本地部署的大模型作为备用方案。
现场网络问题严重影响比赛,建议提前准备本地部署方案,避免依赖云端 API。
在上位机视角中,桶和 P 点底部会有红线。这是通过 bridge_client.py 单独开启的 Python 脚本接收 YOLO 结果,并使用 tkinter 库绘制出来的。画出障碍物位置可以帮助机师快速确定障碍物位置。
import tkinter as tk
import time
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from ai_msgs.msg import PerceptionTargets
class LLM2Origincar:
def __init__(self, host, port):
self.ros = None
self.host = host
self.port = port
self.roadblock_list = []
self.end_list = []
self.init_ros()
self.init_topic()
self.init_thread()
self.keep()
def init_topic(self):
# 初始化 ROS 订阅
self.yolo_sub = Topic(self.ros, '/hobot_dnn_detection', 'ai_msgs/msg/PerceptionTargets', latch=True)
self.yolo_sub.subscribe(self.yolo_sub_callback)
def yolo_sub_callback(self, msg):
self.roadblock_list.clear()
self.end_list.clear()
for target in msg['targets']:
if target['type'] == 'roadblock':
rect = target['rois'][0]['rect']
self.roadblock_list.append({
'x': rect['x_offset'],
'w': rect['width'],
'b': rect['y_offset'] + rect['height'],
})
elif target['type'] == 'end':
rect = target['rois'][0]['rect']
self.end_list.append({
'x': rect['x_offset'],
'y': rect['y_offset'],
'w': rect['width'],
'b': rect['y_offset'] + rect['height'],
'c': target['rois'][0]['confidence'],
})
def keep(self):
root = tk.Tk()
root.overrideredirect(True)
root.geometry("900x680+192+215")
root.attributes("-topmost", True)
root.attributes("-transparentcolor", "white")
canvas = tk.Canvas(root, width=900, height=680, bg="white", highlightthickness=0)
canvas.pack()
try:
while True:
canvas.delete("all")
canvas.create_line(141, 0, 141, 680, fill="red", width=1)
canvas.create_line(689, 0, 689, 680, fill="red", width=1)
if self.roadblock_list:
for obst in self.roadblock_list:
b = int(obst['b'] * 1.42)
canvas.create_line(
int(obst['x'] * 1.41),
b,
int((obst['x'] + obst['w']) * 1.41),
b,
fill="red", width=2
)
if self.end_list:
for end in self.end_list:
x1 = int(end['x'] * 1.41)
y1 = int(end['y'] * 1.41)
x2 = int((end['x'] + end['w']) * 1.41)
y2 = int(end['b'] * 1.42)
canvas.create_line(x1, y2, x2, y2, fill="blue", width=1)
canvas.create_text(
int((x1 + x2) / 2),
(y1 - 20) if (y1 - 20) > 0 else 0,
text="conf:{:.2f}".format(end['c']),
fill='cyan'
)
time.sleep(0.05)
except KeyboardInterrupt:
root.destroy()
除了画出障碍物作为辅助,我们还加了几个按键来辅助任务之间的切换以及调用 API。
def keyboard_thread(self):
while True:
time.sleep(0.05)
if keyboard.is_pressed('b') or keyboard.is_pressed('B'):
self.sign4return_pub.publish(self.sign4return_data)
time.sleep(0.5)
if keyboard.is_pressed('r') or keyboard.is_pressed('R'):
self.sign4return_data['data'] = 5
self.sign4return_pub.publish(self.sign4return_data)
self.sign4return_data['data'] = 0
time.sleep(0.5)
if keyboard.is_pressed('p') or keyboard.is_pressed('P'):
self.sign4return_data['data'] = 6
self.sign4return_pub.publish(self.sign4return_data)
self.sign4return_data['data'] = 0
time.sleep(0.5)
if keyboard.is_pressed('j') or keyboard.is_pressed('J'):
self.llm_data['data'] = 1
self.llm_pub.publish(self.llm_data)
time.sleep(1)
小粉和小橙前面的 USB 相机拍出来的照片较模糊。小橙的优势在于深度相机,不仅能获取深度信息,成像清晰度也远高于 USB 相机。

图 3 深度相机扫码

图 4 USB 相机扫码
扫码节点不要一直开启,特别耗 CPU。扫码条件是:任务状态为任务一,且小车过了半场(全局坐标 x 超过 2m)。
import rclpy
from rclpy.node import Node
import cv2
import numpy as np
from sensor_msgs.msg import Image
from std_msgs.msg import String, Int32
from nav_msgs.msg import Odometry
from origincar_msg.msg import Sign
from cv_bridge import CvBridge
TASK1 = 1
TASK2_WAITFOR_CMD = 2
TASK2 = 3
TASK3 = 4
TASK_STOP = 5
class QrCodeDetection(Node):
def __init__(self):
super().__init__('QRcodeSub')
self.Sign4ReturnSub = self.create_subscription(Int32, 'sign4return', self.sign4return_callback, 10)
self.ImageSub = self.create_subscription(Image, '/aurora/rgb/image_raw', self.image_callback, 10)
self.OdomSub = self.create_subscription(Odometry, '/odom_combined', self.Odom_callback, 10)
self.qrcode_publisher = self.create_publisher(String, "/qrcode_information", 10)
self.info_result = String()
self.sign_publisher = self.create_publisher(Sign, '/sign_switch', 10)
self.sign_msg = Sign()
self.detector = cv2.wechat_qrcode_WeChatQRCode(
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/detect.prototxt",
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/detect.caffemodel",
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/sr.prototxt",
"/userdata/WorkSpace/codes/src/qrcode/qrcode/model/sr.caffemodel"
)
self.bridge = CvBridge()
self.node_run = False
self.task = TASK1
def image_callback(self, msg):
if self.node_run and (self.task == TASK1 or self.task == TASK2):
cv2_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='mono8')[155:, :]
res = self.detector.detectAndDecode(cv2_image)[0]
if res:
self.node_run = False
for r in res:
self.info_result.data = str(r)
self.qrcode_publisher.publish(self.info_result)
self.get_logger().info("\033[94m{}\033[0m".format(self.info_result.data))
if self.info_result.data == "AntiClockWise":
self.sign_msg.sign_data = 4
elif self.info_result.data == "ClockWise":
self.sign_msg.sign_data = 3
else:
try:
data = int(r)
if data % 2:
self.sign_msg.sign_data = 3
else:
self.sign_msg.sign_data = 4
except:
pass
self.sign_publisher.publish(self.sign_msg)
self.info_result.data = "None"
self.sign_msg.sign_data = 0
else:
return
def sign4return_callback(self, msg):
if msg.data == 0 or msg.data == -1:
self.task = TASK1
self.node_run = False
elif msg.data == 5:
self.task = TASK2
elif msg.data == 6:
self.task = TASK3
def Odom_callback(self, msg):
if self.task == TASK1 and msg.pose.pose.position.x > 2:
self.node_run = True
def main(args=None):
rclpy.init(args=args)
qrCodeDetection = QrCodeDetection()
while rclpy.ok():
rclpy.spin(qrCodeDetection)
qrCodeDetection.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
我们没有把整张照片丢给微信的扫码模型,而是裁掉了一部分。例如,先取最近的扫码距离,把红线上面的部分去掉,这样图像就小了很多。

图 5 裁掉一部分图片
我们准确返回 P 点的思路有 3 个,其中一个是另一队学弟分享的。他们的思路是任务二停车对准 P 点,然后退出遥操作,这样 P 点就在车的正前方,加上 YOLO 识别 P 点,也可以回去,但这很看机师的操作。
我的思路是:校准 P 点的坐标。
这个思路要重置里程计,每次都在通道重置,相当于把原点设在了这里,计算出来的相对坐标可以直接用作全局坐标。
如果小车每次从任务二出来都停在同一个位置,然后重置里程计,那么一定会有一个终点可以让小车回去。比如每次都停在这个位置,那么一定有个坐标是可以回去的(以车头方向为 x+,车左边为 y+)。

图 6 固定小橙的位置,终点是 (1.9m, -1.5m)
但是正式比赛时,机师是最紧张的,很难做到每次都停得这么准确,所以得想办法校准,用地图的固定元素。
通道出来有什么是一定可以识别到的?很明显,前面有线。线在地图的位置绝对是固定的,接下来的操作就是在线上了。
有了线,那就先求线的相对位置(相对于小车的,因为重置了里程计)。求线的相对位置可以参考逆透视变换的文章。
我们先把车固定在一个位置,比如图 6 的位置,用小橙的 USB 相机拍一张照片,然后给终点让小橙跑过去,多试几次,每次都要放回原来的位置跑过去,小橙正好回去的点就是 (1.9m, -1.5m)(记得清空里程计之后再跑)。

图 8 固定位置的视角
这个 (1.9m, -1.5m) 是关键。有了一个点,接下来就是用逆透视变换求前面 2 根线的相对位置(必须是最近的 2 根)。
这样,我们就有了 3 个点的坐标,分别是 A,B,P。有了这 3 个点,我们再任意摆放车,比如图 7 的位置,再用逆透视变换计算图 7 视角下最近的 2 根线的相对位置。这样就又有了 2 个点,分别是 A',B',一共有了 5 个点,现在就是用这 5 个点来计算 P'。
问题简化一下:在原来坐标系下,我知道 A,B,P 3 个点。原坐标系经过变换之后,我可以知道 A',B',接下来我想求 P'。
要求 P',那就得先知道前后 2 个坐标系是怎么变换的,也就是前后 2 个坐标系之间的旋转矩阵 R 和平移变量 t。
利用已知的两个对应点对 (A, A') 和 (B, B') 来求解 R 和 t。
对于点 A 和 A':
A′ = RA + t (1)
对于点 B 和 B':
B′ = RB + t (2)
将方程 (1) 和 (2) 相减:
A′ − B′ = R( A − B )
令:
ΔAB = A − B ΔA′B′ = A′ − B′
则:
ΔA′B′ = RΔAB
因此,旋转矩阵 R 可以这样求解:
R = ΔA′B′ ΔAB^(-1)
一旦得到 R,可以通过任一对点求解 t。比如,用 A 和 A':
t = A′ − RA
最后,对于 P,其在新坐标系中的坐标 P′ 为:
P′ = RP + t
这就是校准的所有步骤了,用 2 根线的坐标去校准 P 点的坐标。
实际情况是二维平面,操作可以更简单:

大家把 C 换成 P 来看就可以了。
最后计算出来这个 P 点是非常准确的,用 Python 写的代码(有些参数是固定的,大家可以先离线计算):
def end_point(x1, y1, x2, y2, x3, y3, x1_, y1_, x2_, y2_):
delta_x = x1 - x2
delta_y = y1 - y2
delta_x_ = x1_ - x2_
delta_y_ = y1_ - y2_
den = delta_x ** 2 + delta_y ** 2
a = (delta_x * delta_x_ + delta_y * delta_y_) / den
b = (delta_x * delta_y_ - delta_y * delta_x_) / den
tx = x1_ - a * x1 + b * y1
ty = y1_ - b * x1 - a * y1
x3_ = a * x3 - b * y3 + tx
y3_ = b * x3 + a * y3 + ty
print(f"(x1, y1): ({x1, y1}), (x2, y2): ({x2, y2}), (x3, y3): ({x3, y3}) delta x: {delta_x}, delta y: {delta_y}, den: {den}")
return x3_, y3_
print(f"end': {end_point(ptx1, pty1, ptx2, pty2, 1.9, -1.5, ptx3, pty3, ptx4, pty4)}")
我们比赛时候使用的就是这个思路,上一个思路要在通道处停一下,这个是不需要停下来,直接冲出去就行了。这个思路比上一个简单很多,能回去的概率非常大。
这个思路是,使用 YOLO 识别 P 点,然后还是用逆透视变换计算 P 点相对坐标,再通过小车的坐标计算这个 P 点的全局坐标。
# 单应性矩阵,计算全局坐标
H = np.array([
[-4.66389128e-04, -2.26288030e-04, -4.92300831e-02],
[7.59821540e-04, 5.20569143e-05, -2.33074608e-01],
[-6.59643252e-04, -7.15022786e-03, 1.00000000e+00],
])
def pixel2global(self, pixel_x, pixel_y):
# 计算全局坐标,从像素到局部再到全局
pixel = np.array([pixel_x, pixel_y, 1], dtype=np.float32)
# 应用逆透视变换矩阵
local = np.dot(H, pixel)
# 归一化坐标
local /= local[2]
local[0] += 0.25
# 摄像头坐标转车底盘坐标
car_cos = np.cos(self.current_pos[2])
car_sin = np.sin(self.current_pos[2])
global_x = self.current_pos[0] + car_cos * local[0] - car_sin * local[1]
global_y = self.current_pos[1] + car_sin * local[0] + car_cos * local[1]
return global_x, global_y
将 P 点中心的像素坐标丢到这个函数里面,输出的就是校正之后的终点了(车头朝向为 x+,车左边为 y+)。因为全程不重置里程计,所以未校正的终点设为出发的原点 (0.5m, 0.2m)。
这种思路对 YOLO 的要求比较高,所以必须采集非常多的数据。我们在实验室采了 7k,数据增强之后就有了差不多 2w2,训练出来的效果很好。最后在比赛现场采了 1k4,增强到 7k,一共 2w9 张,加急训练了 22 小时。其中数据集有超过一半的是 P 点的。

采集数据的时候,这种只有一个小角或被桶挡住了或比较远看起来很扁的也贴上(凡是小车可以看到 P 点,哪怕是一点点也要标上,而且尽量采集多一些),这样训练出来也是可以识别到的,回去更轻松。不过要注意一下任务三出来的位置,可能会误识成 P 点,所以在任务三出来(包括白色网格那里)也要采集一点,不需要太多。

STM32 源码我基本上都看过了,也找到了很多可以修改的地方。
最重要的是舵机转角。我找到了舵机转角的限制,但是转角对不对称跟有没有限制是没关系的,如果去掉限制,反而有可能会把前轮给跑坏。

所以,问题应该是出在了计算上面,计算舵机转角的地方也就只有这个多项式了。

它的输入是右前轮的转角,输出是舵机角度。

右前轮的转向角度的限幅是(-0.49,0.32)(0.32 是左转最大的角度,-0.49 是右转最大角度),说明右前轮的角度在 -0.49 和 0.32 的时候,小车的舵机量应该是相同的。但是,直接把这 2 个值带入原来的多项式中,得到的舵机量是这样的:

这就说明小车左转转角小,右转转角大。用上位机跑的时候,确实是左转小,右转大。
调整多项式的二次项系数之后,大概让左右转的舵机量相同。

经过这样的调整之后,小车左转和右转是差不多的角度了。在上位机跑起来,左右转是对称的。
为了提高惯导的计算频率,我把串口发送的频率提高到了 50Hz,串口波特率提高到了 921600,而且也把其他没用到的外设(比如 CAN,蓝牙)给关了,只留下串口 1 和串口 3。

这里为什么用串口 1 呢?因为我们小橙底板的串口 3 有问题,会出现断连,干脆直接换成串口 1 了,也就是烧录口。(不知道为什么,把串口 3 关掉之后,oled 会卡住,只能打开它了)
在 X5 上面,找到 /root/dev_ws/origincar/origincar_base/launch/base_serial.launch.py,波特率改成 921600,再编译一下就可以正常通信了(不用管 clear_flag)。

因为提高串口发送的频率到 50Hz,所以,小车上 EKF 的计算频率也要设为 50Hz。

我们实际用下来,发现小车的 odom_combined 挺准的,一圈下来 x 和 y 偏得都不算太大。我们也改了一下 ekf.yaml,大家可以参考一下我们改的地方。

原来的 /imu/data_raw 是原始数据,/imu/data 是经过滤波之后的。
除了删掉一些外设,我们还把电位器决定车型号的部分也删掉了,固定车型号为 Ackerman;oled 刷屏显示也把跟 Ackerman 无关的给删掉了。
忘记把处理数据的代码放上来了,就在这里补充一下吧。
我们贴标签不是傻乎乎地全部自己来贴。我们先用以前训练过的模型贴一遍,然后再人工检查一遍。这样子操作,一个人不用一天时间就可以贴差不多 6k。
除此之外,我们还写了删除无效图片和无效标签的脚本(图片没有对应同名的标签或者标签没有对应同名的图片)、数据增强的脚本(没有旋转)和将数据分批次让队友来帮忙的脚本。
最好在检查完模型贴的标签之后,再进行数据增强。
代码我都放在后面。
希望这份分享可以帮助大家接下来的比赛。
import argparse
import os
import shutil
import time
from pathlib import Path
import torch
import torch.backends.cudnn as cudnn
import cv2
from models.experimental import attempt_load
from utils.datasets import LoadImages
from utils.utils import non_max_suppression, scale_coords, xyxy2xywh
from utils.torch_utils import select_device, time_synchronized
'''
用训练过的模型贴标签
'''
def auto_annotate(source, weights, output, img_size=640, conf_thres=0.25, iou_thres=0.45, view_img=False):
""" 使用 YOLOv5 模型自动标注图像
参数:
source (str): 输入图像文件夹路径
weights (str): 模型权重路径
output (str): 输出文件夹路径
img_size (int): 推理尺寸
conf_thres (float): 置信度阈值
iou_thres (float): IOU 阈值
device (str): 使用的设备 (cpu, 0, 1, ...)
view_img (bool): 是否显示结果图像
"""
# 初始化
device = select_device(device)
half = device.type != 'cpu'
# 半精度仅在 CUDA 上支持
# 创建输出文件夹
# if os.path.exists(output):
# shutil.rmtree(output)
# os.makedirs(output)
# 创建新的输出文件夹
# os.makedirs(os.path.join(output, 'labels'))
# 创建标签文件夹
# 加载模型
model = attempt_load(weights, map_location=device)
# 加载 FP32 模型
imgsz = img_size
if half:
model.half()
# 转换为 FP16
# 获取类别名称
names = model.module.names if hasattr(model, 'module') else model.names
# 设置数据加载器
dataset = LoadImages(source, img_size=imgsz)
# 运行推理
t0 = time.time()
img = torch.zeros((1, 3, imgsz, imgsz), device=device)
# 初始化图像
_ = model(img.half() if half else img) if device.type != 'cpu' else None
# 运行一次
for path, img, im0s, _ in dataset:
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float()
# uint8 to fp16/32
img /= 255.0
# 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# 推理
t1 = time_synchronized()
pred = model(img, augment=False)[0]
# 应用 NMS
pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None, agnostic=False)
t2 = time_synchronized()
# 处理检测结果
p, im0 = path, im0s.copy()
txt_path = str(Path(output) / Path(p).stem) + ('.txt')
# 标签保存路径
# 确保标签文件存在(即使为空)
open(txt_path, 'w').close()
# 创建空文件或清空现有文件
# 归一化增益
whwh gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]
# 处理检测结果(如果有)
if pred is not None:
for i, det in enumerate(pred):
# 每张图像的检测结果
if det is not None and len(det):
# 将边界框从 img_size 调整到 im0 大小
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
# 写入结果
# 修改写入标签的部分:
with open(txt_path, 'w') as f:
if det is not None and len(det):
for *xyxy, conf, cls in reversed(det):
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist()
# 格式化输出:6 位小数,无行末空格
line = "%d %.6f %.6f %.6f %.6f" % (cls, *xywh)
f.write(line + "\n")
# 注意:换行符前无空格
else:
f.write("")
# 空文件(或按需写入占位符)
# 打印时间 (推理 + NMS)
print(f'{Path(p).name} done. ({t2 - t1:.3f}s)')
# 显示结果 (可选)
if view_img:
cv2.imshow(Path(p).name, im0)
if cv2.waitKey(1) == ord('q'):
# 按 q 退出
raise StopIteration
print(f'Done. ({time.time() - t0:.3f}s)')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--source', type=str, default='dataset_process/new1/images', help='输入图像文件夹路径')
parser.add_argument('--weights', type=str, default='runs/2025.7.28/weights/last.pt', help='模型权重路径')
parser.add_argument('--output', type=str, default='dataset_process/new1/labels', help='输出标签路径')
parser.add_argument('--img-size', type=int, default=640, help='推理尺寸 (像素)')
parser.add_argument('--conf-thres', type=float, default=0.25, help='目标置信度阈值')
parser.add_argument('--iou-thres', type=float, default=0.45, help='NMS 的 IOU 阈值')
parser.add_argument('--device', help='cuda 设备,如 0 或 0,1,2,3 或 cpu')
parser.add_argument('--view-img', action='store_true', help='显示结果')
opt = parser.parse_args()
print(opt)
with torch.no_grad():
auto_annotate(
source=opt.source,
weights=opt.weights,
output=opt.output,
img_size=opt.img_size,
conf_thres=opt.conf_thres,
iou_thres=opt.iou_thres,
device=opt.device,
view_img=opt.view_img
)
import os
from pathlib import Path
def remove_invalid_images_labels(image_dir, label_dir):
""" 删除无效的图片和标签文件(标签为空或不存在)
参数:
image_dir (str): 图片文件夹路径
label_dir (str): 标签文件夹路径
"""
deleted_images = 0
deleted_labels = 0
# 遍历图片文件夹
for image_file in os.listdir(image_dir):
if image_file.lower().endswith(('.jpg', '.png', '.jpeg')):
image_path = os.path.join(image_dir, image_file)
label_path = os.path.join(label_dir, Path(image_file).stem + '.txt')
# 检查标签文件是否存在或为空
if not os.path.exists(label_path):
os.remove(image_path)
deleted_images += 1
print(f"删除图片(无标签): {image_file}")
else:
with open(label_path, 'r') as f:
content = f.read().strip()
if not content: # 标签文件为空
os.remove(image_path)
os.remove(label_path)
deleted_images += 1
deleted_labels += 1
print(f"删除无效数据:{image_file} 和对应标签")
print(f"\n操作完成!共删除:{deleted_images} 张图片,{deleted_labels} 个标签")
if __name__ == '__main__':
# 设置路径(修改为你的实际路径)
image_dir = os.path.join(os.path.dirname(__file__), "new1/images/")
label_dir = os.path.join(os.path.dirname(__file__), "new1/labels/")
# 确认操作
print(f"即将检查:\n图片目录:{image_dir}\n标签目录:{label_dir}")
confirm = input("是否继续?(y/n): ").lower()
if confirm == 'y':
remove_invalid_images_labels(image_dir, label_dir)
else:
print("操作已取消")
import torch
import torchvision.transforms as T
import torchvision.transforms.functional as TF
from pathlib import Path
import shutil
from PIL import Image
import random
from multiprocessing import Pool
import os
'''
数据增强
'''
class YOLOAugment:
def __init__(self, output_dir):
self.output_dir = output_dir
Path(f"{output_dir}/images").mkdir(parents=True, exist_ok=True)
Path(f"{output_dir}/labels").mkdir(parents=True, exist_ok=True)
# 定义基础增强(仅影响图像)
self.img_augment = T.Compose([
T.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2),
T.GaussianBlur(kernel_size=(3, 7))
])
def apply_augment(self, img_path, label_path, aug_id):
"""处理单张图像和对应标签"""
# 读取原始数据
img = Image.open(img_path).convert('RGB')
with open(label_path) as f:
bboxes = [list(map(float, line.strip().split())) for line in f]
# 转换为 Tensor 格式
img_tensor = TF.to_tensor(img)
bboxes_tensor = torch.tensor(bboxes)
# 应用图像增强(不影响框)
img_tensor = self.img_augment(img_tensor)
# 保存增强结果
stem = Path(img_path).stem
self._save_results(img_tensor, bboxes_tensor, stem, aug_id)
return img, bboxes
def _save_results(self, img_tensor, bboxes, stem, aug_id):
"""保存增强图像和标签"""
# 保存图像
aug_img = TF.to_pil_image(img_tensor)
aug_img.save(f"{self.output_dir}/images/{stem}_aug{aug_id}.jpg")
# 保存标签(YOLO 格式)
with open(f"{self.output_dir}/labels/{stem}_aug{aug_id}.txt", 'w') as f:
for bbox in bboxes.numpy():
line = " ".join(map(str, bbox))
f.write(line + '\n')
def process_file(args):
"""多进程处理函数"""
img_path, label_path, output_dir, aug_per_image = args
augmenter = YOLOAugment(output_dir)
for i in range(1, aug_per_image + 1):
augmenter.apply_augment(img_path, label_path, i)
# 保留原始文件
shutil.copy(img_path, f"{output_dir}/images/{Path(img_path).name}")
shutil.copy(label_path, f"{output_dir}/labels/{Path(label_path).name}")
if __name__ == "__main__":
root_path = os.path.dirname(__file__)
# 配置参数
input_dir = os.path.join(root_path, "new1") # 原始数据集路径
output_dir = os.path.join(root_path, "new1_aug") # 输出路径
aug_per_image = 3 # 每张图片生成 4 个增强版本
num_workers = 4 # 并行进程数
# 准备列表
tasks = []
for img_file in Path(f"{input_dir}/images").glob("*.*"):
if img_file.suffix.lower() in ('.jpg', '.png', '.jpeg'):
label_file = Path(f"{input_dir}/labels/{img_file.stem}.txt")
if label_file.exists():
tasks.append((str(img_file), str(label_file), output_dir, aug_per_image))
# 多进程处理
print(f"开始增强 {len(tasks)} 张图像...")
with Pool(processes=num_workers) as pool:
pool.map(process_file, tasks)
# 统计结果
orig_count = len(tasks)
aug_count = orig_count * aug_per_image
print(f"处理完成!\n"
f"- 原始图像保留:{orig_count} 张\n"
f"- 增强图像生成:{aug_count} 张\n"
f"- 总数据量:{orig_count + aug_count} 张")
import os
import zipfile
import math
from pathlib import Path
'''
将数据集分好份打包好
'''
def create_task_packs(images_dir, labels_dir, output_dir, tasks=3, label_txt=False):
"""
创建包含匹配 images 和 labels 的 task 压缩包
:param images_dir: 图片文件夹路径
:param labels_dir: 标注文件夹路径
:param output_dir: 输出目录
:param tasks: 需要划分的任务数
"""
# 获取匹配的文件对(确保严格对应)
image_files = sorted([f for f in os.listdir(images_dir) if f.endswith(('.jpg', '.png'))])
label_files = sorted([f for f in os.listdir(labels_dir) if f.endswith('.txt')])
# 验证一致性
image_stems = {Path(f).stem for f in image_files}
label_stems = {Path(f).stem for f in label_files}
unmatched = image_stems.symmetric_difference(label_stems)
if unmatched:
print(f"⚠️ 警告:发现 {len(unmatched)} 个不匹配文件(示例:{list(unmatched)[:3]})")
print("建议先运行数据校验脚本修复不一致问题!")
return
# 计算每个 task 应包含的文件数
total_pairs = len(image_files)
pairs_per_task = math.ceil(total_pairs / tasks)
print(f"数据集统计:")
print(f"- 图片数量:{len(image_files)}")
print(f"- 标注数量:{len(label_files)}")
print(f"- 将分成 {tasks} 个任务包,每个约 {pairs_per_task} 对数据\n")
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
for task_num in range(1, tasks + 1):
start_idx = (task_num - 1) * pairs_per_task
end_idx = min(start_idx + pairs_per_task, total_pairs)
task_images = image_files[start_idx:end_idx]
task_labels = [Path(f).stem + '.txt' for f in task_images]
# 自动匹配对应的 labels
zip_path = os.path.join(output_dir, f"task_{task_num}.zip")
print(f"创建任务包 {task_num}:")
print(f"- 包含图片:{len(task_images)} 张")
print(f"- 包含标注:{len(task_labels)} 个")
print(f"- 保存到:{zip_path}")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 添加图片
for img in task_images:
img_path = os.path.join(images_dir, img)
zipf.write(img_path, f"images/{img}")
# 添加对应的标注
for label in task_labels:
label_path = os.path.join(labels_dir, label)
if os.path.exists(label_path):
# 双重验证
zipf.write(label_path, f"labels/{label}")
else:
print(f"⚠️ 缺失标注文件:{label}")
if label_txt is not False:
label_info = Path(label_txt).open("r").read()
zipf.writestr(f"labels/labels.txt", label_info)
# 每个任务里面都放进一个 labels.txt
print("-" * 50)
print(f"\n🎉 任务包创建完成!共生成 {tasks} 个压缩包,保存在:{output_dir}")
if __name__ == "__main__":
root_path = os.path.dirname(__file__)
# 配置参数
dataset_dir = os.path.join(root_path, "new1") # 数据集根目录
output_dir = os.path.join(root_path, "package") # 输出目录
label_txt = os.path.join(root_path, "labels.txt") # 标签文件
num_tasks = 4 # 需要划分的任务数量
# 运行打包
create_task_packs(
images_dir=os.path.join(dataset_dir, "images"),
labels_dir=os.path.join(dataset_dir, "labels"),
output_dir=output_dir,
tasks=num_tasks,
# label_txt=label_txt,
)

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online