from loguru import logger from typing import Iterator import cv2 from ultralytics import YOLO from .pipeline_data import PipelineData def video_yolo_iterator(video_path: str, model_path: str = "yolov8n.pt", tracker: str = "botsort.yaml", device: str = "0") -> Iterator[PipelineData]: """ 从视频读取帧,经YOLO跟踪检测,返回PipelineData迭代器 :param video_path: 视频文件路径(或0表示摄像头) :param model_path: YOLO模型路径(默认使用yolov8n) :param tracker: 跟踪器配置文件(默认使用botsort) :param device: 设备标识(默认使用GPU 0) :return: 包含YOLO跟踪结果的PipelineData迭代器 """ # 加载YOLO模型 model = YOLO(model_path) logger.info(f"已加载YOLO模型: {model_path}") # 打开视频 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"无法打开视频: {video_path}") raise ValueError(f"无法打开视频: {video_path}") frame_idx = 0 try: # 使用 model.track 替代 model 进行跟踪检测 results_stream = model.track(source=video_path, tracker=tracker, device=device, stream=True) logger.info(f"已开始跟踪视频: {video_path}") last_data = PipelineData() for r in results_stream: # 迭代跟踪结果 # 读取对应的帧 ret, frame = cap.read() if not ret: break # 视频读取完毕 # 构建数据包 data = PipelineData() data.set_to_cache(last_data.result_cache) data.current_result = r data.frame = frame data.frame_idx = frame_idx # 获取时间戳(毫秒) data.timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) # 将结果加入缓存 data.add_to_cache(r) yield data last_data = data frame_idx += 1 finally: # 确保视频流关闭 cap.release() def video_yolo_detect_iterator(video_path: str, model_path: str = "yolov8n.pt", device: str = "0") -> Iterator[ PipelineData]: """ 从视频读取帧,经YOLO检测(非跟踪),返回PipelineData迭代器 :param video_path: 视频文件路径(或0表示摄像头) :param model_path: YOLO模型路径(默认使用yolov8n) :param device: 设备标识(默认使用GPU 0) :return: 包含YOLO检测结果的PipelineData迭代器 """ # 加载YOLO模型 model = YOLO(model_path) # 打开视频 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error(f"无法打开视频: {video_path}") raise ValueError(f"无法打开视频: {video_path}") frame_idx = 0 try: while cap.isOpened(): # 读取一帧 ret, frame = cap.read() if not ret: break # 视频读取完毕 # YOLO检测(stream=True表示流式处理,提升效率) results = model(frame, device=device, stream=True) for r in results: # 迭代结果(单帧只有一个结果) # 构建数据包 data = PipelineData() data.current_result = r data.frame = frame data.frame_idx = frame_idx # 获取时间戳(毫秒) data.timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) # 将结果加入缓存(默认最多30帧) data.add_to_cache(r) yield data frame_idx += 1 finally: # 确保视频流关闭 cap.release()