From 963e8583411008d76a67ed8484d5e6b24075dd83 Mon Sep 17 00:00:00 2001 From: zimoyin <2556608754@qq.com> Date: Sat, 10 Jan 2026 09:40:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20=E6=B7=BB=E5=8A=A0Pipeline?= =?UTF-8?q?=E4=B8=8EHandle=E8=AE=BE=E8=AE=A1=E6=A1=86=E6=9E=B6=20-=20doc:?= =?UTF-8?q?=20=E5=90=84=E5=A4=84=E7=90=86=E5=99=A8=E7=8B=AC=E7=AB=8B?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E7=89=B9=E5=AE=9A=E5=8A=9F=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=A7=A3=E8=80=A6=E5=90=88=E5=92=8C=E5=A4=8D?= =?UTF-8?q?=E7=94=A8=E3=80=82=E5=90=8C=E6=AD=A5=E5=A4=84=E7=90=86=E4=BF=9D?= =?UTF-8?q?=E8=AF=81=E4=BE=9D=E8=B5=96=E6=80=A7=EF=BC=8C=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=8F=90=E5=8D=87=E6=80=A7=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86=E4=BD=9C=E4=B8=BA=E7=AE=A1?= =?UTF-8?q?=E9=81=93=E7=BB=88=E7=AB=AF=E6=93=8D=E4=BD=9C=E5=90=8E=E7=BB=AD?= =?UTF-8?q?=E5=B0=86=E5=BC=95=E5=85=A5BUS=E6=9C=BA=E5=88=B6=EF=BC=8C?= =?UTF-8?q?=E4=BD=9C=E4=B8=BA=E4=BA=8B=E4=BB=B6=E7=9A=84=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E8=80=85=E3=80=82=E7=BB=9F=E4=B8=80=E7=9A=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=AD=98=E5=8F=96=E6=8E=A5=E5=8F=A3=EF=BC=8C=E5=86=85=E7=BD=AE?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E8=BD=AC=E6=8D=A2=E5=92=8C=E9=AA=8C=E8=AF=81?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=20-=20=E5=88=9B=E5=BB=BABaseProcessor?= =?UTF-8?q?=E6=8A=BD=E8=B1=A1=E5=9F=BA=E7=B1=BB=E5=AE=9A=E4=B9=89=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E5=A4=84=E7=90=86=E6=8E=A5=E5=8F=A3=20-=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0video=5Fyolo=5Fiterator=E5=92=8Cvideo=5Fyolo=5Fdetect?= =?UTF-8?q?=5Fiterator=E6=95=B0=E6=8D=AE=E6=BA=90=20-=20=E6=9E=84=E5=BB=BA?= =?UTF-8?q?Pipeline=E6=A0=B8=E5=BF=83=E7=B1=BB=E7=AE=A1=E7=90=86=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=92=8C=E5=BC=82=E6=AD=A5=E5=A4=84=E7=90=86=E5=99=A8?= =?UTF-8?q?=20-=20=E8=AE=BE=E8=AE=A1PipelineData=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=8C=85=E6=89=BF=E8=BD=BD=E6=A3=80=E6=B5=8B=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E5=92=8C=E7=BC=93=E5=AD=98=E4=BF=A1=E6=81=AF=20-=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=90=8C=E6=AD=A5=E5=92=8C=E5=BC=82=E6=AD=A5=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=E7=9A=84=E6=B7=B7=E5=90=88=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=20-=20=E6=8F=90=E4=BE=9B=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E7=AE=A1=E7=90=86=E5=92=8C=E5=86=85=E9=83=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AD=98=E5=82=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipeline/base_processor.py | 29 ++++++++++++ pipeline/data_source.py | 97 ++++++++++++++++++++++++++++++++++++++ pipeline/pipeline.py | 84 +++++++++++++++++++++++++++++++++ pipeline/pipeline_data.py | 78 ++++++++++++++++++++++++++++++ 4 files changed, 288 insertions(+) create mode 100644 pipeline/base_processor.py create mode 100644 pipeline/data_source.py create mode 100644 pipeline/pipeline.py create mode 100644 pipeline/pipeline_data.py diff --git a/pipeline/base_processor.py b/pipeline/base_processor.py new file mode 100644 index 0000000..5581f50 --- /dev/null +++ b/pipeline/base_processor.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod +import asyncio +from .pipeline_data import PipelineData + + +class BaseProcessor(ABC): + """所有处理器的抽象基类,定义统一的处理接口""" + def __init__(self, name: str): + self.name = name # 处理器名称(便于日志/调试) + + @abstractmethod + def process(self, data: PipelineData) -> PipelineData: + """ + 核心处理方法(必须由子类实现) + :param data: 管道数据包 + :return: 处理后的数据包(可修改原数据或返回新数据) + """ + pass + + async def process_async(self, data: PipelineData): + """ + 异步处理方法(可由子类实现) + :param data: 管道数据包 + """ + # 默认实现:调用同步处理方法 + self.process(data) + + def __repr__(self) -> str: + return f"Processor[{self.name}]" \ No newline at end of file diff --git a/pipeline/data_source.py b/pipeline/data_source.py new file mode 100644 index 0000000..11327a7 --- /dev/null +++ b/pipeline/data_source.py @@ -0,0 +1,97 @@ +from typing import Iterator +import cv2 +from ultralytics import YOLO +from .pipeline_data import PipelineData + + +def video_yolo_iterator(video_path: str, model_path: str = "yolov8n.pt", tracker: str = "botsort.yaml", + device: str = "0") -> Iterator[PipelineData]: + """ + 从视频读取帧,经YOLO跟踪检测,返回PipelineData迭代器 + :param video_path: 视频文件路径(或0表示摄像头) + :param model_path: YOLO模型路径(默认使用yolov8n) + :param tracker: 跟踪器配置文件(默认使用botsort) + :param device: 设备标识(默认使用GPU 0) + :return: 包含YOLO跟踪结果的PipelineData迭代器 + """ + # 加载YOLO模型 + model = YOLO(model_path) + # 打开视频 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise ValueError(f"无法打开视频: {video_path}") + + frame_idx = 0 + try: + # 使用 model.track 替代 model 进行跟踪检测 + results_stream = model.track(source=video_path, tracker=tracker, device=device, stream=True) + + last_data = PipelineData() + for r in results_stream: # 迭代跟踪结果 + # 读取对应的帧 + ret, frame = cap.read() + if not ret: + break # 视频读取完毕 + + + # 构建数据包 + data = PipelineData() + data.set_to_cache(last_data.result_cache) + data.current_result = r + data.frame = frame + data.frame_idx = frame_idx + # 获取时间戳(毫秒) + data.timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) + # 将结果加入缓存 + data.add_to_cache(r) + + yield data + last_data = data + frame_idx += 1 + finally: + # 确保视频流关闭 + cap.release() + + +def video_yolo_detect_iterator(video_path: str, model_path: str = "yolov8n.pt", device: str = "0") -> Iterator[ + PipelineData]: + """ + 从视频读取帧,经YOLO检测(非跟踪),返回PipelineData迭代器 + :param video_path: 视频文件路径(或0表示摄像头) + :param model_path: YOLO模型路径(默认使用yolov8n) + :param device: 设备标识(默认使用GPU 0) + :return: 包含YOLO检测结果的PipelineData迭代器 + """ + # 加载YOLO模型 + model = YOLO(model_path) + # 打开视频 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise ValueError(f"无法打开视频: {video_path}") + + frame_idx = 0 + try: + while cap.isOpened(): + # 读取一帧 + ret, frame = cap.read() + if not ret: + break # 视频读取完毕 + + # YOLO检测(stream=True表示流式处理,提升效率) + results = model(frame, device=device, stream=True) + for r in results: # 迭代结果(单帧只有一个结果) + # 构建数据包 + data = PipelineData() + data.current_result = r + data.frame = frame + data.frame_idx = frame_idx + # 获取时间戳(毫秒) + data.timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) + # 将结果加入缓存(默认最多30帧) + data.add_to_cache(r) + + yield data + frame_idx += 1 + finally: + # 确保视频流关闭 + cap.release() diff --git a/pipeline/pipeline.py b/pipeline/pipeline.py new file mode 100644 index 0000000..1789986 --- /dev/null +++ b/pipeline/pipeline.py @@ -0,0 +1,84 @@ +from typing import List, Iterator +import asyncio +from .base_processor import BaseProcessor +from .pipeline_data import PipelineData + + +class Pipeline: + """管道核心类,管理多个处理器,负责数据流转""" + def __init__(self): + self.processors: List[BaseProcessor] = [] # 同步处理器列表 + self.async_processors: List[BaseProcessor] = [] # 异步处理器列表 + + def add_processor(self, processor: BaseProcessor) -> None: + """向管道添加同步处理器(按添加顺序执行)""" + self.processors.append(processor) + print(f"管道已添加同步处理器: {processor}") + + def add_processor_async(self, processor: BaseProcessor) -> None: + """向管道添加异步处理器(异步执行,不阻塞数据流)""" + self.async_processors.append(processor) + print(f"管道已添加异步处理器: {processor}") + + def remove_processor(self, processor_name: str) -> bool: + """移除指定名称的处理器""" + # 检查同步处理器列表 + for idx, p in enumerate(self.processors): + if p.name == processor_name: + self.processors.pop(idx) + print(f"管道已移除同步处理器: {p}") + return True + # 检查异步处理器列表 + for idx, p in enumerate(self.async_processors): + if p.name == processor_name: + self.async_processors.pop(idx) + print(f"管道已移除异步处理器: {p}") + return True + print(f"未找到处理器: {processor_name}") + return False + + def run(self, data_iterator: Iterator[PipelineData]) -> Iterator[PipelineData]: + """ + 运行管道:迭代输入数据,依次经过所有同步处理器,同时异步执行异步处理器 + :param data_iterator: 数据源迭代器(如视频+YOLO的迭代器) + :return: 处理后的数据包迭代器 + """ + # 创建异步任务列表,用于管理异步处理器 + async def run_async_processors(data: PipelineData): + tasks = [] + for async_processor in self.async_processors: + task = asyncio.create_task(self._process_async(async_processor, data)) + tasks.append(task) + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + for data in data_iterator: + # 依次执行每个同步处理器的处理逻辑 + for processor in self.processors: + data = processor.process(data) + + # 异步执行异步处理器,不阻塞数据流 + if self.async_processors: + # 在独立的事件循环中运行异步处理器(如果可用) + try: + # 尝试获取当前事件循环 + loop = asyncio.get_running_loop() + # 在当前事件循环中调度异步处理器 + asyncio.create_task(run_async_processors(data.copy() if hasattr(data, 'copy') else data)) + except RuntimeError: + # 如果没有运行中的事件循环,则创建一个新的 + asyncio.run(run_async_processors(data.copy() if hasattr(data, 'copy') else data)) + + yield data # 返回处理后的数据包 + + async def _process_async(self, processor, data): + """异步执行单个处理器""" + try: + # 对数据进行异步处理(如果处理器支持) + if hasattr(processor, 'process_async'): + await processor.process_async(data) + else: + # 如果处理器不支持异步处理,则同步处理 + processor.process(data) + except Exception as e: + print(f"异步处理器 {processor.name} 执行出错: {e}") \ No newline at end of file diff --git a/pipeline/pipeline_data.py b/pipeline/pipeline_data.py new file mode 100644 index 0000000..5b336f0 --- /dev/null +++ b/pipeline/pipeline_data.py @@ -0,0 +1,78 @@ +from typing import List, Optional, Dict, Any +import copy +import cv2 +from ultralytics.engine.results import Results + + +class PipelineData: + """管道传输的数据包,承载YOLO检测结果、缓存及辅助信息""" + def __init__(self): + # 当前帧的YOLO检测结果 + self.current_result: Optional[Results] = None + # 检测结果缓存(list[Results]) + self.result_cache: List[Results] = [] + # 辅助信息:帧数据、时间戳、帧序号(便于调试/扩展) + self.frame: Optional[cv2.Mat] = None + self.timestamp: Optional[float] = None + self.frame_idx: int = 0 + # 内部数据存储map + self.internal_map: Dict[str, Any] = {} + + def add_to_cache(self, result: Results, max_cache_size: int = 125) -> None: + """将结果加入缓存,控制缓存最大长度""" + self.result_cache.append(result) + if len(self.result_cache) > max_cache_size: + self.result_cache.pop(0) # 移除最旧的缓存 + + def set_to_cache(self, result_cache: List[Results]) -> None: + """设置缓存""" + self.result_cache = result_cache + + def clear_cache(self) -> None: + """清空缓存""" + self.result_cache.clear() + + def copy(self): + """创建当前PipelineData的副本""" + new_data = PipelineData() + new_data.current_result = self.current_result + new_data.result_cache = self.result_cache.copy() + new_data.frame = self.frame.copy() if self.frame is not None else None + new_data.timestamp = self.timestamp + new_data.frame_idx = self.frame_idx + new_data.internal_map = self.internal_map.copy() + return new_data + + def put_data(self, data_type: str, key: str, value: Any) -> None: + """存储数据到内部map + + Args: + data_type: 数据类型标识 + key: 数据键名 + value: 数据值 + """ + map_key = f"{data_type}:{key}" + self.internal_map[map_key] = value + + def get_data(self, data_label: str, key: str, target_type=None): + """从内部map获取数据 + + Args: + data_label: 数据来源标识 + key: 数据键名 + target_type: 目标类型,用于类型转换 + + Returns: + 获取的数据,如果不存在则返回None + """ + map_key = f"{data_label}:{key}" + value = self.internal_map.get(map_key) + + if value is not None and target_type is not None: + try: + value = target_type(value) + except (ValueError, TypeError): + # 如果类型转换失败,返回原始值 + pass + + return value \ No newline at end of file