diff --git a/Center_Predicted.py b/Center_Predicted.py new file mode 100644 index 0000000..cdcc1d3 --- /dev/null +++ b/Center_Predicted.py @@ -0,0 +1,66 @@ +import numpy as np +from config import ARMOR_WIDTH, ARMOR_HEIGHT # 装甲板尺寸(毫米) + + +class BallisticPredictor: + """弹道预测器:结合卡尔曼预测位置、距离和弹速计算击中预测点""" + + def __init__(self, bullet_speed=16.0): # 出弹速度默认16m/s,可调节 + self.bullet_speed = bullet_speed # 子弹速度(米/秒) + self.armor_half_width = ARMOR_WIDTH / 2000.0 # 装甲板半宽(转换为米) + self.armor_half_height = ARMOR_HEIGHT / 2000.0 # 装甲板半高(转换为米) + self.last_ballistic_point = None # 记录上一次弹道预测点 + + def _calculate_distance(self, armor_center, img_center, focal_length=800): + """ + 估算目标距离(简化模型,需根据实际相机内参调整) + 原理:基于装甲板实际尺寸和图像中尺寸的比例关系 + """ + # 假设图像中装甲板宽度约为灯条间距(简化计算,实际可根据检测的灯条尺寸优化) + # 此处使用中心偏移量和焦距估算,更精确的方式需结合实际检测的装甲板像素尺寸 + dx = abs(armor_center[0] - img_center[0]) + dy = abs(armor_center[1] - img_center[1]) + pixel_distance = np.sqrt(dx ** 2 + dy ** 2) + + # 若目标在中心附近,避免除以零 + if pixel_distance < 10: + return 5.0 # 默认5米(可根据实际场景调整) + + # 距离估算公式:(实际尺寸 × 焦距) / 像素尺寸 + distance = (self.armor_half_width * focal_length) / (pixel_distance * 0.5) + return max(0.5, distance) # 限制最小距离为0.5米 + + def predict_ballistic_point(self, predicted_center, img_center, target_speed): + """ + 计算弹道预测点 + :param predicted_center: 卡尔曼预测的目标中心(图像坐标,(x,y)) + :param img_center: 图像中心坐标((x,y)) + :param target_speed: 目标移动速度(由卡尔曼滤波器获取,(vx, vy)像素/秒) + :return: 弹道修正后的预测点(图像坐标,(x,y)) + """ + if predicted_center is None: + self.last_ballistic_point = None + return None + + # 1. 估算目标距离(米) + distance = self._calculate_distance(predicted_center, img_center) + + # 2. 计算子弹飞行时间(秒):距离 / 子弹速度 + flight_time = distance / self.bullet_speed if self.bullet_speed > 0 else 0.0 + + # 3. 计算飞行时间内目标的移动量(像素) + # 目标速度(pixel/s)× 飞行时间(s)= 预测移动像素 + delta_x = target_speed[0] * flight_time + delta_y = target_speed[1] * flight_time + + # 4. 弹道预测点 = 卡尔曼预测中心 + 目标移动补偿 + ballistic_x = predicted_center[0] + delta_x + ballistic_y = predicted_center[1] + delta_y + ballistic_point = (ballistic_x, ballistic_y) + + self.last_ballistic_point = ballistic_point + return ballistic_point + + def get_last_ballistic_point(self): + """获取上一次弹道预测点""" + return self.last_ballistic_point \ No newline at end of file diff --git a/TTLCommunicator.py b/TTLCommunicator.py new file mode 100644 index 0000000..0052cab --- /dev/null +++ b/TTLCommunicator.py @@ -0,0 +1,70 @@ +import serial +import serial.tools.list_ports + + +class TTLCommunicator: + def __init__(self, baudrate=115200, timeout=0.1): + """初始化TTL通信对象""" + self.baudrate = baudrate + self.timeout = timeout + self.ser = None + self.connected = False + + @staticmethod # 声明为静态方法 + def find_usb_ttl_port(): + """自动查找USB转TTL设备端口(静态方法,不依赖实例状态)""" + ports = list(serial.tools.list_ports.comports()) + for port in ports: + if "USB Serial" in port.description or "CH340" in port.description or "PL2303" in port.description: + return port.device + return None + + def connect(self, port=None): + """连接到USB转TTL设备""" + try: + if not port: + # 调用静态方法时,可直接通过类名或实例调用(此处保持原有逻辑) + port = self.find_usb_ttl_port() + if not port: + print("未找到USB转TTL设备") + return False + + self.ser = serial.Serial( + port=port, + baudrate=self.baudrate, + timeout=self.timeout, + parity=serial.PARITY_NONE, + stopbits=1, + bytesize=serial.EIGHTBITS + ) + + if self.ser.is_open: + self.connected = True + print(f"已连接TTL设备:{port}(波特率:{self.baudrate})") + return True + return False + except Exception as e: + print(f"连接失败:{str(e)}") + return False + + def send_data(self, data_str): + """发送数据到TTL设备""" + if not self.connected or not self.ser: + print("未连接TTL设备,请先调用connect()") + return False + + try: + data = (data_str + "\n").encode("utf-8") # 加换行符作为结束标志 + self.ser.write(data) + self.ser.flush() + return True + except Exception as e: + print(f"发送失败:{str(e)}") + return False + + def close(self): + """关闭串口连接""" + if self.ser and self.ser.is_open: + self.ser.close() + self.connected = False + print("TTL连接已关闭") \ No newline at end of file diff --git a/armor_detector.py b/armor_detector.py new file mode 100644 index 0000000..84d8885 --- /dev/null +++ b/armor_detector.py @@ -0,0 +1,223 @@ +import cv2 +import numpy as np +from config import ( + HORIZONTAL_ANGLE_THRESHOLD, NEARBY_LIGHT_BAR_THRESHOLD, + LIGHT_BAR_IOU_THRESHOLD, ARMOR_DISTANCE_RATIO_RANGE, + ARMOR_LENGTH_DIFF_RATIO, ARMOR_ANGLE_DIFF_THRESHOLD +) + + +class ArmorDetector: + """装甲板检测类:灯条提取、配对""" + + def __init__(self): + self.angle_threshold_rad = np.radians(HORIZONTAL_ANGLE_THRESHOLD) + + def _extract_initial_light_bars(self, mask): + """从掩码中提取初始灯条(轮廓+最小外接矩形)""" + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + initial_light_bars = [] + + for contour in contours: + rect = cv2.minAreaRect(contour) + box = cv2.boxPoints(rect) # 获取矩形四个顶点 + if len(box) == 0: # 确保顶点数有效 + continue + box = np.int32(box) # 转为int32(OpenCV要求的类型) + (x, y), (w, h), angle = rect + contour_area = cv2.contourArea(contour) + rect_area = w * h + + # 筛选:面积足够 + 填充度高 + if rect_area > 30 and (contour_area / rect_area if rect_area > 0 else 0) > 0.4: + initial_light_bars.append({"rect": rect, "box": box, "area": rect_area}) + + return initial_light_bars + + def _merge_nearby_light_bars(self, initial_light_bars): + """合并临近灯条(避免同一灯条被拆分)""" + processed_boxes = [] + visited = [False] * len(initial_light_bars) + + for i in range(len(initial_light_bars)): + if visited[i]: + continue + + nearby_indices = [i] + (x1, y1), _, _ = initial_light_bars[i]["rect"] + + for j in range(i + 1, len(initial_light_bars)): + if visited[j]: + continue + (x2, y2), _, _ = initial_light_bars[j]["rect"] + distance = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) + + # 距离+IOU筛选 + if distance < NEARBY_LIGHT_BAR_THRESHOLD: + box_i = initial_light_bars[i]["box"] + box_j = initial_light_bars[j]["box"] + if len(box_i) == 0 or len(box_j) == 0: + continue + b1 = cv2.boundingRect(box_i) + b2 = cv2.boundingRect(box_j) + inter_area = self._calculate_iou(b1, b2) + if inter_area > LIGHT_BAR_IOU_THRESHOLD: + nearby_indices.append(j) + visited[j] = True + + # 合并临近灯条的顶点,生成新矩形 + all_valid_points = [] + for k in nearby_indices: + box_k = initial_light_bars[k]["box"] + if len(box_k) > 0: + all_valid_points.append(box_k) + if not all_valid_points: + continue + all_points = np.vstack(all_valid_points) + merged_rect = cv2.minAreaRect(all_points) + merged_box = np.int32(cv2.boxPoints(merged_rect)) # 转为int32 + processed_boxes.append(merged_box) + visited[i] = True + + return processed_boxes + + def _calculate_iou(self, b1, b2): + """计算两个bounding box的IOU""" + b1 = [b1[0], b1[1], b1[0] + b1[2], b1[1] + b1[3]] + b2 = [b2[0], b2[1], b2[0] + b2[2], b2[1] + b2[3]] + + inter_x1 = max(b1[0], b2[0]) + inter_y1 = max(b1[1], b2[1]) + inter_x2 = min(b1[2], b2[2]) + inter_y2 = min(b1[3], b2[3]) + if inter_x1 >= inter_x2 or inter_y1 >= inter_y2: + return 0.0 + + inter_area = (inter_x2 - inter_x1) * (inter_y2 - inter_y1) + b1_area = (b1[2] - b1[0]) * (b1[3] - b1[1]) + b2_area = (b2[2] - b2[0]) * (b2[3] - b2[1]) + union_area = b1_area + b2_area - inter_area + return inter_area / union_area if union_area > 0 else 0.0 + + def _filter_valid_light_bars(self, processed_boxes): + """筛选有效灯条(排除水平灯条)""" + valid_light_bars = [] + + for box in processed_boxes: + if len(box) == 0: + continue + rect = cv2.minAreaRect(box) + (x, y), (w, h), angle = rect + + length = w if w > h else h + width = h if w > h else w + main_angle = angle if w > h else angle + 90 + main_angle = main_angle % 180 + if main_angle > 90: + main_angle -= 180 + main_angle_rad = np.radians(main_angle) + + center = (x, y) + end1 = ( + center[0] + (length / 2) * np.cos(main_angle_rad), + center[1] + (length / 2) * np.sin(main_angle_rad) + ) + end2 = ( + center[0] - (length / 2) * np.cos(main_angle_rad), + center[1] - (length / 2) * np.sin(main_angle_rad) + ) + + rect_area = length * width + contour_area = cv2.contourArea(box) + area_ratio = contour_area / rect_area if rect_area > 0 else 0 + # 筛选:面积足够 + 填充度高 + 非水平 + if (rect_area > 40 and area_ratio > 0.4 and + abs(main_angle_rad) > self.angle_threshold_rad): + valid_light_bars.append({ + "center": center, + "size": (length, width), + "angle": main_angle, + "angle_rad": main_angle_rad, + "area": rect_area, + "center_line": [end1, end2], + "center_line_length": length, + "box": box + }) + + return valid_light_bars + + def _pair_light_bars_to_armor(self, light_bars, target_color): + """灯条配对为装甲板(按置信度排序)""" + armor_plates = [] + used_indices = set() + + for i in range(len(light_bars)): + if i in used_indices: + continue + lb1 = light_bars[i] + + for j in range(i + 1, len(light_bars)): + if j in used_indices: + continue + lb2 = light_bars[j] + + # 角度差异筛选 + angle_diff = abs(lb1["angle_rad"] - lb2["angle_rad"]) + angle_diff = min(angle_diff, 2 * np.pi - angle_diff) + if angle_diff > ARMOR_ANGLE_DIFF_THRESHOLD: + continue + + # 距离比例筛选 + dx = lb2["center"][0] - lb1["center"][0] + dy = lb2["center"][1] - lb1["center"][1] + distance = np.sqrt(dx ** 2 + dy ** 2) + avg_length = (lb1["center_line_length"] + lb2["center_line_length"]) / 2 + distance_ratio = distance / avg_length + if not (ARMOR_DISTANCE_RATIO_RANGE[0] < distance_ratio < ARMOR_DISTANCE_RATIO_RANGE[1]): + continue + + # 长度差异筛选 + length_diff_ratio = abs(lb1["center_line_length"] - lb2["center_line_length"]) / avg_length + if length_diff_ratio > ARMOR_LENGTH_DIFF_RATIO: + continue + + armor_center = ( + (lb1["center"][0] + lb2["center"][0]) / 2, + (lb1["center"][1] + lb2["center"][1]) / 2 + ) + confidence = (lb1["area"] + lb2["area"]) / (distance + 1) + + armor_plates.append({ + "color": target_color, + "center": armor_center, + "confidence": confidence, + "pair": (lb1, lb2), + "corners_2d": None + }) + + used_indices.add(i) + used_indices.add(j) + break + + return sorted(armor_plates, key=lambda x: x["confidence"], reverse=True) + + def detect(self, mask, target_color): + """完整检测流程:灯条提取→合并→筛选→配对→3D估计""" + initial_light_bars = self._extract_initial_light_bars(mask) + if not initial_light_bars: + return [], [] + + processed_boxes = self._merge_nearby_light_bars(initial_light_bars) + if not processed_boxes: + return [], [] + + valid_light_bars = self._filter_valid_light_bars(processed_boxes) + if len(valid_light_bars) < 2: + return valid_light_bars, [] + + armor_plates = self._pair_light_bars_to_armor(valid_light_bars, target_color) + if not armor_plates: + return valid_light_bars, [] + + + return valid_light_bars, armor_plates \ No newline at end of file diff --git a/camera_utils.py b/camera_utils.py new file mode 100644 index 0000000..bed1f35 --- /dev/null +++ b/camera_utils.py @@ -0,0 +1,54 @@ +import cv2 + + +class Camera: + """摄像头初始化与帧读取(支持颜色参数)""" + + def __init__(self, cam_id=0, target_color="red"): + + self.cap = cv2.VideoCapture(cam_id) + self.is_opened = self.cap.isOpened() + if not self.is_opened: + raise RuntimeError("无法打开摄像头!") + + self.target_color = target_color.lower() + if self.target_color not in ["red", "blue"]: + raise ValueError("仅支持 'red' 或 'blue'") + + self._set_cam_params() + + def _set_cam_params(self): + """根据颜色设置摄像头参数""" + self.cap.set(cv2.CAP_PROP_AUTOFOCUS, 0) # 关闭自动对焦 + self.cap.set(cv2.CAP_PROP_AUTO_WB, 0) # 关闭自动白平衡 + self.cap.set(cv2.CAP_PROP_ISO_SPEED, 0) # 自动ISO + + if self.target_color == "red": + self.cap.set(cv2.CAP_PROP_CONTRAST, 10) + self.cap.set(cv2.CAP_PROP_EXPOSURE, -10) + self.cap.set(cv2.CAP_PROP_WB_TEMPERATURE, 5) + elif self.target_color == "blue": + self.cap.set(cv2.CAP_PROP_CONTRAST, 10) + self.cap.set(cv2.CAP_PROP_EXPOSURE, -9) + self.cap.set(cv2.CAP_PROP_WB_TEMPERATURE, 5) + + def read_frame(self): + """读取一帧""" + if not self.is_opened: + return False, None + return self.cap.read() + + def release(self): + """释放资源""" + if self.is_opened: + self.cap.release() + self.is_opened = False + + def switch_color(self, target_color): + """切换目标颜色并更新参数""" + target_color = target_color.lower() + if target_color in ["red", "blue"] and target_color != self.target_color: + self.target_color = target_color + self._set_cam_params() + return True + return False \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..3db5d9c --- /dev/null +++ b/config.py @@ -0,0 +1,36 @@ +import numpy as np + +# 相机内参(需实际标定) +CAMERA_MATRIX = np.array([ + [600, 0, 320], + [0, 600, 240], + [0, 0, 1] +], dtype=np.float32) +DIST_COEFFS = np.zeros(5) # 畸变系数(无畸变设为0) + +# 装甲板尺寸(毫米) +ARMOR_WIDTH = 135 +ARMOR_HEIGHT = 125 +ARMOR_THICKNESS = 20 +ARMOR_3D_POINTS = np.array([ + [-ARMOR_WIDTH / 2, -ARMOR_HEIGHT / 2, 0], + [ARMOR_WIDTH / 2, -ARMOR_HEIGHT / 2, 0], + [ARMOR_WIDTH / 2, ARMOR_HEIGHT / 2, 0], + [-ARMOR_WIDTH / 2, ARMOR_HEIGHT / 2, 0] +], dtype=np.float32) +ARMOR_BOX_3D_POINTS = np.vstack([ + ARMOR_3D_POINTS, + ARMOR_3D_POINTS + np.array([0, 0, -ARMOR_THICKNESS]) +]) + +# 检测阈值 +HORIZONTAL_ANGLE_THRESHOLD = 25 # 灯条水平角度阈值(度) +NEARBY_LIGHT_BAR_THRESHOLD = 500 # 灯条合并距离阈值(像素) +LIGHT_BAR_IOU_THRESHOLD = 0.05 # 灯条合并IOU阈值 +ARMOR_DISTANCE_RATIO_RANGE = (0.6, 3) # 装甲板灯条距离比例范围 +ARMOR_LENGTH_DIFF_RATIO = 0.5 # 装甲板灯条长度差异比例 +ARMOR_ANGLE_DIFF_THRESHOLD = np.radians(20) # 装甲板灯条角度差异阈值(弧度) + +# 卡尔曼滤波参数 +KF_PROCESS_NOISE = 0.02 # 过程噪声协方差 +KF_MEASUREMENT_NOISE = 0.5 # 测量噪声协方差 \ No newline at end of file diff --git a/image_preprocessor.py b/image_preprocessor.py new file mode 100644 index 0000000..dde8e9c --- /dev/null +++ b/image_preprocessor.py @@ -0,0 +1,57 @@ +import cv2 +import numpy as np + +class ImagePreprocessor: + """图像预处理:分离目标颜色并生成掩码""" + @staticmethod + def process_red_light(frame): + """红色灯条预处理""" + red_only = frame.copy() + red_only[:, :, 0] = 0 # B=0 + red_only[:, :, 1] = 0 # G=0 + + hsv = cv2.cvtColor(red_only, cv2.COLOR_BGR2HSV) + v_mask = cv2.inRange(hsv[:, :, 2], 70, 190) + + kernel_close = np.ones((7, 7), np.uint8) + kernel_dilate = np.ones((2, 2), np.uint8) + mask = cv2.morphologyEx(v_mask, cv2.MORPH_CLOSE, kernel_close) + mask = cv2.dilate(mask, kernel_dilate, iterations=1) + + kernel = np.ones((4, 4), np.uint8) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=1) + mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=1) + mask = cv2.dilate(mask, kernel, iterations=1) + + return mask, red_only + + @staticmethod + def process_blue_light(frame): + """蓝色灯条预处理""" + blue_only = frame.copy() + blue_only[:, :, 1] = 0 # G=0 + blue_only[:, :, 2] = 0 # R=0 + + hsv = cv2.cvtColor(blue_only, cv2.COLOR_BGR2HSV) + v_mask = cv2.inRange(hsv[:, :, 2], 120, 230) + + kernel_close = np.ones((7, 7), np.uint8) + kernel_dilate = np.ones((2, 2), np.uint8) + mask = cv2.morphologyEx(v_mask, cv2.MORPH_CLOSE, kernel_close) + mask = cv2.dilate(mask, kernel_dilate, iterations=1) + + kernel = np.ones((4, 4), np.uint8) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=1) + mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=1) + mask = cv2.dilate(mask, kernel, iterations=1) + + return mask, blue_only + + def get_mask(self, frame, target_color): + """根据颜色选择预处理逻辑""" + if target_color == "red": + return self.process_red_light(frame) + elif target_color == "blue": + return self.process_blue_light(frame) + else: + raise ValueError("仅支持 'red' 或 'blue'") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..8583a0e --- /dev/null +++ b/main.py @@ -0,0 +1,175 @@ +import cv2 +import time +from TTLCommunicator import TTLCommunicator # 导入入模块化的TTL通信类 +from camera_utils import Camera +from image_preprocessor import ImagePreprocessor +from armor_detector import ArmorDetector +from tracker import KalmanFilter +from visualizer import Visualizer +from Center_Predicted import BallisticPredictor + + +def output_control_data(ballistic_point, target_color, frame_counter, ttl_communicator, img_center, use_ttl): + """输出控制数据到TTL设备(带启用控制)""" + # 仅当启用TTL且且满足帧间隔时发送数据 + if use_ttl and frame_counter % 5 == 0: + if ballistic_point: + # 计算偏移量(基于实际图像中心) + ballistic_offset_x = int(ballistic_point[0] - img_center[0]) + ballistic_offset_y = int(ballistic_point[1] - img_center[1]) + + # 颜色简化映射 + color_map = {"red": "r", "blue": "b"} + simplified_color = color_map.get(target_color, target_color) + + # 构造发送字符串 + send_str = f"s {simplified_color} {ballistic_offset_x} {ballistic_offset_y}" + else: + # 无检测结果时的格式 + send_str = "s u 0 0" + + # 发送数据 + ttl_communicator.send_data(send_str) + + +def set_camera_resolution(cap, width, height): + """设置摄像头分辨率""" + cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + # 验证实际设置的分辨率 + actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + return actual_width, actual_height + + +def main(target_color="red", cam_id=0, default_resolution=(640, 480), use_ttl=True): + """主函数:整合模块运行检测流程与TTL通信(带TTL启用控制)""" + # 定义可选分辨率列表(根据摄像头支持情况调整) + resolutions = [ + (320, 240), # 低分辨率,高帧率 + (640, 480), # 标准分辨率 + (1280, 720), # HD分辨率 + (1920, 1080) # 全高清分辨率 + ] + res_index = resolutions.index(default_resolution) if default_resolution in resolutions else 1 + + # 初始化TTL通信(仅当启用时) + ttl = None + if use_ttl: + ttl = TTLCommunicator(baudrate=115200) + if not ttl.connect(): + print("警告:无法建立TTL连接,将继续运行但不发送数据") + else: + print("TTL通讯已禁用") + + # 初始化视觉处理模块 + camera = Camera(cam_id, target_color) + + # 初始化弹道预测器(可调节弹速,如16m/s) + ballistic_predictor = BallisticPredictor(bullet_speed=16.0) + # 新增:用于记录目标速度(从卡尔曼滤波器获取) + target_speed = (0.0, 0.0) + + # 设置初始分辨率 + current_res = set_camera_resolution(camera.cap, resolutions[res_index][0], resolutions[res_index][1]) + print(f"初始摄像头分辨率: {current_res[0]}x{current_res[1]}") + + preprocessor = ImagePreprocessor() + detector = ArmorDetector() + kalman_tracker = KalmanFilter() + visualizer = Visualizer() + + frame_counter = 0 # 控制输出频率的计数器 + max_consecutive_predicts = 20 # 最大连续预测次数 + consecutive_predicts = 0 # 当前连续预测计数 + + try: + while True: + ret, frame = camera.read_frame() + if not ret: + print("无法读取摄像头,退出!") + break + + # 获取图像实际尺寸并计算中心 + img_height, img_width = frame.shape[:2] + img_center = (img_width // 2, img_height // 2) + + # 预处理 + mask, color_only_frame = preprocessor.get_mask(frame, target_color) + + # 装甲板检测 + valid_light_bars, armor_plates = detector.detect(mask, target_color) + + # 卡尔曼滤波跟踪(修改部分:获取目标速度) + current_center = armor_plates[0]["center"] if (armor_plates and len(armor_plates) > 0) else None + + if current_center: + # 有检测结果:更新卡尔曼滤波器,重置连续预测计数 + kalman_tracker.update(current_center) + predicted_center = kalman_tracker.predict() + # 获取卡尔曼滤波器中的速度状态(dx, dy) + # 假设KalmanFilter类有state属性,包含[x, y, dx, dy] + if hasattr(kalman_tracker, 'kf'): + state = kalman_tracker.kf.statePost + target_speed = (state[2, 0], state[3, 0]) # dx, dy(像素/秒) + consecutive_predicts = 0 + else: + # 无检测结果:仅用卡尔曼预测,限制连续预测次数 + consecutive_predicts += 1 + if consecutive_predicts < max_consecutive_predicts: + predicted_center = kalman_tracker.predict() + else: + predicted_center = None + target_speed = (0.0, 0.0) + + # 确定显示的中心 + display_center = current_center if current_center else predicted_center + is_predicted = (display_center is not None) and (current_center is None) + + # 计算弹道预测点 + ballistic_point = ballistic_predictor.predict_ballistic_point( + predicted_center, img_center, target_speed + ) + + current_time = time.time() + + # 可视化 + frame = visualizer.draw_light_bars(frame, valid_light_bars, target_color) + if armor_plates and len(armor_plates) > 0: + frame = visualizer.draw_armor_plate(frame, armor_plates[0]) + frame = visualizer.draw_offset_text(frame, display_center, target_color, is_predicted) + + # 输出控制数据到TTL(传入use_ttl控制是否发送) + output_control_data(display_center, target_color, frame_counter, ttl, img_center, use_ttl) + + # 显示窗口 + cv2.imshow("Armor Detection", frame) + cv2.imshow(f"{target_color.capitalize()} Mask", mask) + cv2.imshow(f"{target_color.capitalize()} Only", color_only_frame) + + # 仅保留基础退出功能(按q退出) + if cv2.waitKey(1) == ord("q"): + break + + frame_counter += 1 + # 控制最大帧率(100 FPS) + time.sleep(max(0.0, 0.01 - (time.time() - current_time))) + + except KeyboardInterrupt: + print("\n用户终止程序") + finally: + camera.release() + cv2.destroyAllWindows() + # 仅当启用且初始化成功时关闭TTL连接 + if use_ttl and ttl is not None: + ttl.close() + + +if __name__ == "__main__": + # 可在此处设置是否启用TTL通讯(use_ttl=True/False) + main( + target_color="red", + cam_id=1, + default_resolution=(640, 480), + use_ttl=False # 设为False禁用TTL通讯 + ) \ No newline at end of file diff --git a/tracker.py b/tracker.py new file mode 100644 index 0000000..4e8941e --- /dev/null +++ b/tracker.py @@ -0,0 +1,56 @@ +import cv2 +import numpy as np +from config import KF_PROCESS_NOISE, KF_MEASUREMENT_NOISE + +class KalmanFilter: + """卡尔曼滤波器:平滑并预测装甲板位置""" + def __init__(self): + self.kf = cv2.KalmanFilter(4, 2) # 4状态(x,y,dx,dy),2测量(x,y) + self._init_params() + self.initialized = False + self.last_measurement = None + self.last_prediction = None + + def _init_params(self): + """初始化卡尔曼参数""" + self.kf.transitionMatrix = np.array([ + [1, 0, 1, 0], + [0, 1, 0, 1], + [0, 0, 1, 0], + [0, 0, 0, 1] + ], np.float32) + + self.kf.measurementMatrix = np.array([ + [1, 0, 0, 0], + [0, 1, 0, 0] + ], np.float32) + + self.kf.processNoiseCov = np.eye(4, dtype=np.float32) * KF_PROCESS_NOISE + self.kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * KF_MEASUREMENT_NOISE + self.kf.errorCovPost = np.eye(4, dtype=np.float32) + + def update(self, measurement): + """用测量值更新滤波器""" + measurement = np.array([[np.float32(measurement[0])], [np.float32(measurement[1])]]) + + if not self.initialized: + self.kf.statePost = np.array([ + [measurement[0, 0]], + [measurement[1, 0]], + [0.0], + [0.0] + ], np.float32) + self.initialized = True + else: + self.kf.correct(measurement) + + self.last_measurement = (measurement[0, 0], measurement[1, 0]) + + def predict(self): + """预测下一帧位置""" + if not self.initialized: + return None + + prediction = self.kf.predict() + self.last_prediction = (prediction[0, 0], prediction[1, 0]) + return self.last_prediction \ No newline at end of file diff --git a/visualizer.py b/visualizer.py new file mode 100644 index 0000000..82922b2 --- /dev/null +++ b/visualizer.py @@ -0,0 +1,80 @@ +import cv2 +import numpy as np + +class Visualizer: + """可视化:绘制灯条、装甲板、预测位置等""" + def __init__(self): + self.colors = { + "red": (0, 0, 255), + "blue": (255, 0, 0), + "light_bar": (0, 255, 255), + "armor": (0, 255, 0), + "prediction": (0, 165, 255), + "text": (255, 255, 255) + } + + def draw_light_bars(self, frame, light_bars, target_color): + """绘制灯条(矩形+中心线+标签)""" + bar_color = self.colors[target_color] + line_color = self.colors["light_bar"] + + for lb in light_bars: + cv2.drawContours(frame, [lb["box"]], 0, bar_color, 2) + end1 = tuple(map(int, lb["center_line"][0])) + end2 = tuple(map(int, lb["center_line"][1])) + cv2.line(frame, end1, end2, line_color, 2) + center = tuple(map(int, lb["center"])) + cv2.putText( + frame, target_color, (center[0], center[1] - 8), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, bar_color, 1 + ) + + return frame + + def draw_armor_plate(self, frame, armor): + """绘制装甲板(2D框+3D信息)""" + corners = armor.get("corners_2d") + if corners is not None: + # 关键修改:np.int0 替换为 np.int32 + cv2.polylines(frame, [np.int32(corners)], True, self.colors["armor"], 2) + + center = tuple(map(int, armor["center"])) + cv2.circle(frame, center, 3, self.colors["armor"], -1) + cv2.putText( + frame, "Armor", (center[0] + 5, center[1] - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors["armor"], 2 + ) + + return frame + + def draw_prediction(self, frame, predicted_pos, label="Predicted"): + """绘制预测位置(橙色圆点+标签)""" + if predicted_pos is None: + return frame + + pos = tuple(map(int, predicted_pos)) + cv2.circle(frame, pos, 5, self.colors["prediction"], -1) + cv2.putText( + frame, label, (pos[0] + 10, pos[1] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.colors["prediction"], 2 + ) + + return frame + + def draw_offset_text(self, frame, display_center, target_color, is_predicted): + """绘制装甲板相对图像中心的偏移量""" + if display_center is None: + text = "Armor: unknown | Offset: (0, 0)" + else: + img_center = (frame.shape[1] // 2, frame.shape[0] // 2) + offset_x = int(display_center[0] - img_center[0]) + offset_y = int(display_center[1] - img_center[1]) + color_text = f"{target_color} (predicted)" if is_predicted else target_color + text = f"Armor: {color_text} | Offset: ({offset_x}, {offset_y})" + + cv2.putText( + frame, text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, self.colors["text"], 2 + ) + + return frame \ No newline at end of file