import cv2 import time from TTLCommunicator import TTLCommunicator # 导入入模块化的TTL通信类 from 视觉代码python.camera_utils import Camera from 视觉代码python.image_preprocessor import ImagePreprocessor from 视觉代码python.armor_detector import ArmorDetector from tracker import KalmanFilter from visualizer import Visualizer from 视觉代码python.Center_Predicted import BallisticPredictor def output_control_data(ballistic_point, target_color, frame_counter, ttl_communicator, img_center, use_ttl): """输出控制数据到TTL设备(带启用控制)""" # 仅当启用TTL且且满足帧间隔时发送数据 if use_ttl and frame_counter % 5 == 0: if ballistic_point: # 计算偏移量(基于实际图像中心) ballistic_offset_x = int(ballistic_point[0] - img_center[0]) ballistic_offset_y = int(ballistic_point[1] - img_center[1]) # 颜色简化映射 color_map = {"red": "r", "blue": "b"} simplified_color = color_map.get(target_color, target_color) # 构造发送字符串 send_str = f"s {simplified_color} {ballistic_offset_x} {ballistic_offset_y}" else: # 无检测结果时的格式 send_str = "s u 0 0" # 发送数据 ttl_communicator.send_data(send_str) def set_camera_resolution(cap, width, height): """设置摄像头分辨率""" cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) # 验证实际设置的分辨率 actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) return actual_width, actual_height def main(target_color="red", cam_id=0, default_resolution=(640, 480), use_ttl=True): """主函数:整合模块运行检测流程与TTL通信(带TTL启用控制)""" # 定义可选分辨率列表(根据摄像头支持情况调整) resolutions = [ (320, 240), # 低分辨率,高帧率 (640, 480), # 标准分辨率 (1280, 720), # HD分辨率 (1920, 1080) # 全高清分辨率 ] res_index = resolutions.index(default_resolution) if default_resolution in resolutions else 1 # 初始化TTL通信(仅当启用时) ttl = None if use_ttl: ttl = TTLCommunicator(baudrate=115200) if not ttl.connect(): print("警告:无法建立TTL连接,将继续运行但不发送数据") else: print("TTL通讯已禁用") # 初始化视觉处理模块 camera = Camera(cam_id, target_color) # 初始化弹道预测器(可调节弹速,如16m/s) ballistic_predictor = BallisticPredictor(bullet_speed=16.0) # 新增:用于记录目标速度(从卡尔曼滤波器获取) target_speed = (0.0, 0.0) # 设置初始分辨率 current_res = set_camera_resolution(camera.cap, resolutions[res_index][0], resolutions[res_index][1]) print(f"初始摄像头分辨率: {current_res[0]}x{current_res[1]}") preprocessor = ImagePreprocessor() detector = ArmorDetector() kalman_tracker = KalmanFilter() visualizer = Visualizer() frame_counter = 0 # 控制输出频率的计数器 max_consecutive_predicts = 20 # 最大连续预测次数 consecutive_predicts = 0 # 当前连续预测计数 try: while True: ret, frame = camera.read_frame() if not ret: print("无法读取摄像头,退出!") break # 获取图像实际尺寸并计算中心 img_height, img_width = frame.shape[:2] img_center = (img_width // 2, img_height // 2) # 预处理 mask, color_only_frame = preprocessor.get_mask(frame, target_color) # 装甲板检测 valid_light_bars, armor_plates = detector.detect(mask, target_color) # 卡尔曼滤波跟踪(修改部分:获取目标速度) current_center = armor_plates[0]["center"] if (armor_plates and len(armor_plates) > 0) else None if current_center: # 有检测结果:更新卡尔曼滤波器,重置连续预测计数 kalman_tracker.update(current_center) predicted_center = kalman_tracker.predict() # 获取卡尔曼滤波器中的速度状态(dx, dy) # 假设KalmanFilter类有state属性,包含[x, y, dx, dy] if hasattr(kalman_tracker, 'kf'): state = kalman_tracker.kf.statePost target_speed = (state[2, 0], state[3, 0]) # dx, dy(像素/秒) consecutive_predicts = 0 else: # 无检测结果:仅用卡尔曼预测,限制连续预测次数 consecutive_predicts += 1 if consecutive_predicts < max_consecutive_predicts: predicted_center = kalman_tracker.predict() else: predicted_center = None target_speed = (0.0, 0.0) # 确定显示的中心 display_center = current_center if current_center else predicted_center is_predicted = (display_center is not None) and (current_center is None) # 计算弹道预测点 ballistic_point = ballistic_predictor.predict_ballistic_point( predicted_center, img_center, target_speed ) current_time = time.time() # 可视化 frame = visualizer.draw_light_bars(frame, valid_light_bars, target_color) if armor_plates and len(armor_plates) > 0: frame = visualizer.draw_armor_plate(frame, armor_plates[0]) frame = visualizer.draw_offset_text(frame, display_center, target_color, is_predicted) # 输出控制数据到TTL(传入use_ttl控制是否发送) output_control_data(display_center, target_color, frame_counter, ttl, img_center, use_ttl) # 显示窗口 cv2.imshow("Armor Detection", frame) cv2.imshow(f"{target_color.capitalize()} Mask", mask) cv2.imshow(f"{target_color.capitalize()} Only", color_only_frame) # 仅保留基础退出功能(按q退出) if cv2.waitKey(1) == ord("q"): break frame_counter += 1 # 控制最大帧率(100 FPS) time.sleep(max(0.0, 0.01 - (time.time() - current_time))) except KeyboardInterrupt: print("\n用户终止程序") finally: camera.release() cv2.destroyAllWindows() # 仅当启用且初始化成功时关闭TTL连接 if use_ttl and ttl is not None: ttl.close() if __name__ == "__main__": # 可在此处设置是否启用TTL通讯(use_ttl=True/False) main( target_color="red", cam_id=1, default_resolution=(640, 480), use_ttl=False # 设为False禁用TTL通讯 )