Files
2025-11-05 17:13:01 +08:00

175 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
from TTLCommunicator import TTLCommunicator # 导入入模块化的TTL通信类
from 视觉代码python.camera_utils import Camera
from 视觉代码python.image_preprocessor import ImagePreprocessor
from 视觉代码python.armor_detector import ArmorDetector
from tracker import KalmanFilter
from visualizer import Visualizer
from 视觉代码python.Center_Predicted import BallisticPredictor
def output_control_data(ballistic_point, target_color, frame_counter, ttl_communicator, img_center, use_ttl):
"""输出控制数据到TTL设备带启用控制"""
# 仅当启用TTL且且满足帧间隔时发送数据
if use_ttl and frame_counter % 5 == 0:
if ballistic_point:
# 计算偏移量(基于实际图像中心)
ballistic_offset_x = int(ballistic_point[0] - img_center[0])
ballistic_offset_y = int(ballistic_point[1] - img_center[1])
# 颜色简化映射
color_map = {"red": "r", "blue": "b"}
simplified_color = color_map.get(target_color, target_color)
# 构造发送字符串
send_str = f"s {simplified_color} {ballistic_offset_x} {ballistic_offset_y}"
else:
# 无检测结果时的格式
send_str = "s u 0 0"
# 发送数据
ttl_communicator.send_data(send_str)
def set_camera_resolution(cap, width, height):
"""设置摄像头分辨率"""
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# 验证实际设置的分辨率
actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return actual_width, actual_height
def main(target_color="red", cam_id=0, default_resolution=(640, 480), use_ttl=True):
"""主函数整合模块运行检测流程与TTL通信带TTL启用控制"""
# 定义可选分辨率列表(根据摄像头支持情况调整)
resolutions = [
(320, 240), # 低分辨率,高帧率
(640, 480), # 标准分辨率
(1280, 720), # HD分辨率
(1920, 1080) # 全高清分辨率
]
res_index = resolutions.index(default_resolution) if default_resolution in resolutions else 1
# 初始化TTL通信仅当启用时
ttl = None
if use_ttl:
ttl = TTLCommunicator(baudrate=115200)
if not ttl.connect():
print("警告无法建立TTL连接将继续运行但不发送数据")
else:
print("TTL通讯已禁用")
# 初始化视觉处理模块
camera = Camera(cam_id, target_color)
# 初始化弹道预测器可调节弹速如16m/s
ballistic_predictor = BallisticPredictor(bullet_speed=16.0)
# 新增:用于记录目标速度(从卡尔曼滤波器获取)
target_speed = (0.0, 0.0)
# 设置初始分辨率
current_res = set_camera_resolution(camera.cap, resolutions[res_index][0], resolutions[res_index][1])
print(f"初始摄像头分辨率: {current_res[0]}x{current_res[1]}")
preprocessor = ImagePreprocessor()
detector = ArmorDetector()
kalman_tracker = KalmanFilter()
visualizer = Visualizer()
frame_counter = 0 # 控制输出频率的计数器
max_consecutive_predicts = 20 # 最大连续预测次数
consecutive_predicts = 0 # 当前连续预测计数
try:
while True:
ret, frame = camera.read_frame()
if not ret:
print("无法读取摄像头,退出!")
break
# 获取图像实际尺寸并计算中心
img_height, img_width = frame.shape[:2]
img_center = (img_width // 2, img_height // 2)
# 预处理
mask, color_only_frame = preprocessor.get_mask(frame, target_color)
# 装甲板检测
valid_light_bars, armor_plates = detector.detect(mask, target_color)
# 卡尔曼滤波跟踪(修改部分:获取目标速度)
current_center = armor_plates[0]["center"] if (armor_plates and len(armor_plates) > 0) else None
if current_center:
# 有检测结果:更新卡尔曼滤波器,重置连续预测计数
kalman_tracker.update(current_center)
predicted_center = kalman_tracker.predict()
# 获取卡尔曼滤波器中的速度状态dx, dy
# 假设KalmanFilter类有state属性包含[x, y, dx, dy]
if hasattr(kalman_tracker, 'kf'):
state = kalman_tracker.kf.statePost
target_speed = (state[2, 0], state[3, 0]) # dx, dy像素/秒)
consecutive_predicts = 0
else:
# 无检测结果:仅用卡尔曼预测,限制连续预测次数
consecutive_predicts += 1
if consecutive_predicts < max_consecutive_predicts:
predicted_center = kalman_tracker.predict()
else:
predicted_center = None
target_speed = (0.0, 0.0)
# 确定显示的中心
display_center = current_center if current_center else predicted_center
is_predicted = (display_center is not None) and (current_center is None)
# 计算弹道预测点
ballistic_point = ballistic_predictor.predict_ballistic_point(
predicted_center, img_center, target_speed
)
current_time = time.time()
# 可视化
frame = visualizer.draw_light_bars(frame, valid_light_bars, target_color)
if armor_plates and len(armor_plates) > 0:
frame = visualizer.draw_armor_plate(frame, armor_plates[0])
frame = visualizer.draw_offset_text(frame, display_center, target_color, is_predicted)
# 输出控制数据到TTL传入use_ttl控制是否发送
output_control_data(display_center, target_color, frame_counter, ttl, img_center, use_ttl)
# 显示窗口
cv2.imshow("Armor Detection", frame)
cv2.imshow(f"{target_color.capitalize()} Mask", mask)
cv2.imshow(f"{target_color.capitalize()} Only", color_only_frame)
# 仅保留基础退出功能按q退出
if cv2.waitKey(1) == ord("q"):
break
frame_counter += 1
# 控制最大帧率100 FPS
time.sleep(max(0.0, 0.01 - (time.time() - current_time)))
except KeyboardInterrupt:
print("\n用户终止程序")
finally:
camera.release()
cv2.destroyAllWindows()
# 仅当启用且初始化成功时关闭TTL连接
if use_ttl and ttl is not None:
ttl.close()
if __name__ == "__main__":
# 可在此处设置是否启用TTL通讯use_ttl=True/False
main(
target_color="red",
cam_id=1,
default_resolution=(640, 480),
use_ttl=False # 设为False禁用TTL通讯
)