测试添加视觉代码
This commit is contained in:
175
main.py
Normal file
175
main.py
Normal file
@@ -0,0 +1,175 @@
|
||||
import cv2
|
||||
import time
|
||||
from TTLCommunicator import TTLCommunicator # 导入入模块化的TTL通信类
|
||||
from camera_utils import Camera
|
||||
from image_preprocessor import ImagePreprocessor
|
||||
from armor_detector import ArmorDetector
|
||||
from tracker import KalmanFilter
|
||||
from visualizer import Visualizer
|
||||
from Center_Predicted import BallisticPredictor
|
||||
|
||||
|
||||
def output_control_data(ballistic_point, target_color, frame_counter, ttl_communicator, img_center, use_ttl):
|
||||
"""输出控制数据到TTL设备(带启用控制)"""
|
||||
# 仅当启用TTL且且满足帧间隔时发送数据
|
||||
if use_ttl and frame_counter % 5 == 0:
|
||||
if ballistic_point:
|
||||
# 计算偏移量(基于实际图像中心)
|
||||
ballistic_offset_x = int(ballistic_point[0] - img_center[0])
|
||||
ballistic_offset_y = int(ballistic_point[1] - img_center[1])
|
||||
|
||||
# 颜色简化映射
|
||||
color_map = {"red": "r", "blue": "b"}
|
||||
simplified_color = color_map.get(target_color, target_color)
|
||||
|
||||
# 构造发送字符串
|
||||
send_str = f"s {simplified_color} {ballistic_offset_x} {ballistic_offset_y}"
|
||||
else:
|
||||
# 无检测结果时的格式
|
||||
send_str = "s u 0 0"
|
||||
|
||||
# 发送数据
|
||||
ttl_communicator.send_data(send_str)
|
||||
|
||||
|
||||
def set_camera_resolution(cap, width, height):
|
||||
"""设置摄像头分辨率"""
|
||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||
# 验证实际设置的分辨率
|
||||
actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
return actual_width, actual_height
|
||||
|
||||
|
||||
def main(target_color="red", cam_id=0, default_resolution=(640, 480), use_ttl=True):
|
||||
"""主函数:整合模块运行检测流程与TTL通信(带TTL启用控制)"""
|
||||
# 定义可选分辨率列表(根据摄像头支持情况调整)
|
||||
resolutions = [
|
||||
(320, 240), # 低分辨率,高帧率
|
||||
(640, 480), # 标准分辨率
|
||||
(1280, 720), # HD分辨率
|
||||
(1920, 1080) # 全高清分辨率
|
||||
]
|
||||
res_index = resolutions.index(default_resolution) if default_resolution in resolutions else 1
|
||||
|
||||
# 初始化TTL通信(仅当启用时)
|
||||
ttl = None
|
||||
if use_ttl:
|
||||
ttl = TTLCommunicator(baudrate=115200)
|
||||
if not ttl.connect():
|
||||
print("警告:无法建立TTL连接,将继续运行但不发送数据")
|
||||
else:
|
||||
print("TTL通讯已禁用")
|
||||
|
||||
# 初始化视觉处理模块
|
||||
camera = Camera(cam_id, target_color)
|
||||
|
||||
# 初始化弹道预测器(可调节弹速,如16m/s)
|
||||
ballistic_predictor = BallisticPredictor(bullet_speed=16.0)
|
||||
# 新增:用于记录目标速度(从卡尔曼滤波器获取)
|
||||
target_speed = (0.0, 0.0)
|
||||
|
||||
# 设置初始分辨率
|
||||
current_res = set_camera_resolution(camera.cap, resolutions[res_index][0], resolutions[res_index][1])
|
||||
print(f"初始摄像头分辨率: {current_res[0]}x{current_res[1]}")
|
||||
|
||||
preprocessor = ImagePreprocessor()
|
||||
detector = ArmorDetector()
|
||||
kalman_tracker = KalmanFilter()
|
||||
visualizer = Visualizer()
|
||||
|
||||
frame_counter = 0 # 控制输出频率的计数器
|
||||
max_consecutive_predicts = 20 # 最大连续预测次数
|
||||
consecutive_predicts = 0 # 当前连续预测计数
|
||||
|
||||
try:
|
||||
while True:
|
||||
ret, frame = camera.read_frame()
|
||||
if not ret:
|
||||
print("无法读取摄像头,退出!")
|
||||
break
|
||||
|
||||
# 获取图像实际尺寸并计算中心
|
||||
img_height, img_width = frame.shape[:2]
|
||||
img_center = (img_width // 2, img_height // 2)
|
||||
|
||||
# 预处理
|
||||
mask, color_only_frame = preprocessor.get_mask(frame, target_color)
|
||||
|
||||
# 装甲板检测
|
||||
valid_light_bars, armor_plates = detector.detect(mask, target_color)
|
||||
|
||||
# 卡尔曼滤波跟踪(修改部分:获取目标速度)
|
||||
current_center = armor_plates[0]["center"] if (armor_plates and len(armor_plates) > 0) else None
|
||||
|
||||
if current_center:
|
||||
# 有检测结果:更新卡尔曼滤波器,重置连续预测计数
|
||||
kalman_tracker.update(current_center)
|
||||
predicted_center = kalman_tracker.predict()
|
||||
# 获取卡尔曼滤波器中的速度状态(dx, dy)
|
||||
# 假设KalmanFilter类有state属性,包含[x, y, dx, dy]
|
||||
if hasattr(kalman_tracker, 'kf'):
|
||||
state = kalman_tracker.kf.statePost
|
||||
target_speed = (state[2, 0], state[3, 0]) # dx, dy(像素/秒)
|
||||
consecutive_predicts = 0
|
||||
else:
|
||||
# 无检测结果:仅用卡尔曼预测,限制连续预测次数
|
||||
consecutive_predicts += 1
|
||||
if consecutive_predicts < max_consecutive_predicts:
|
||||
predicted_center = kalman_tracker.predict()
|
||||
else:
|
||||
predicted_center = None
|
||||
target_speed = (0.0, 0.0)
|
||||
|
||||
# 确定显示的中心
|
||||
display_center = current_center if current_center else predicted_center
|
||||
is_predicted = (display_center is not None) and (current_center is None)
|
||||
|
||||
# 计算弹道预测点
|
||||
ballistic_point = ballistic_predictor.predict_ballistic_point(
|
||||
predicted_center, img_center, target_speed
|
||||
)
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# 可视化
|
||||
frame = visualizer.draw_light_bars(frame, valid_light_bars, target_color)
|
||||
if armor_plates and len(armor_plates) > 0:
|
||||
frame = visualizer.draw_armor_plate(frame, armor_plates[0])
|
||||
frame = visualizer.draw_offset_text(frame, display_center, target_color, is_predicted)
|
||||
|
||||
# 输出控制数据到TTL(传入use_ttl控制是否发送)
|
||||
output_control_data(display_center, target_color, frame_counter, ttl, img_center, use_ttl)
|
||||
|
||||
# 显示窗口
|
||||
cv2.imshow("Armor Detection", frame)
|
||||
cv2.imshow(f"{target_color.capitalize()} Mask", mask)
|
||||
cv2.imshow(f"{target_color.capitalize()} Only", color_only_frame)
|
||||
|
||||
# 仅保留基础退出功能(按q退出)
|
||||
if cv2.waitKey(1) == ord("q"):
|
||||
break
|
||||
|
||||
frame_counter += 1
|
||||
# 控制最大帧率(100 FPS)
|
||||
time.sleep(max(0.0, 0.01 - (time.time() - current_time)))
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n用户终止程序")
|
||||
finally:
|
||||
camera.release()
|
||||
cv2.destroyAllWindows()
|
||||
# 仅当启用且初始化成功时关闭TTL连接
|
||||
if use_ttl and ttl is not None:
|
||||
ttl.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 可在此处设置是否启用TTL通讯(use_ttl=True/False)
|
||||
main(
|
||||
target_color="red",
|
||||
cam_id=1,
|
||||
default_resolution=(640, 480),
|
||||
use_ttl=False # 设为False禁用TTL通讯
|
||||
)
|
||||
Reference in New Issue
Block a user