feat(pipeline): 添加Pipeline与Handle设计框架
- doc: 各处理器独立实现特定功能,支持解耦合和复用。同步处理保证依赖性,异步处理提升性能,异步处理作为管道终端操作后续将引入BUS机制,作为事件的发布者。统一的数据存取接口,内置类型转换和验证机制 - 创建BaseProcessor抽象基类定义统一处理接口 - 实现video_yolo_iterator和video_yolo_detect_iterator数据源 - 构建Pipeline核心类管理同步和异步处理器 - 设计PipelineData数据包承载检测结果和缓存信息 - 支持同步和异步处理器的混合执行模式 - 提供数据缓存管理和内部数据存储功能
This commit is contained in:
@@ -0,0 +1,29 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
import asyncio
|
||||||
|
from .pipeline_data import PipelineData
|
||||||
|
|
||||||
|
|
||||||
|
class BaseProcessor(ABC):
|
||||||
|
"""所有处理器的抽象基类,定义统一的处理接口"""
|
||||||
|
def __init__(self, name: str):
|
||||||
|
self.name = name # 处理器名称(便于日志/调试)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def process(self, data: PipelineData) -> PipelineData:
|
||||||
|
"""
|
||||||
|
核心处理方法(必须由子类实现)
|
||||||
|
:param data: 管道数据包
|
||||||
|
:return: 处理后的数据包(可修改原数据或返回新数据)
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def process_async(self, data: PipelineData):
|
||||||
|
"""
|
||||||
|
异步处理方法(可由子类实现)
|
||||||
|
:param data: 管道数据包
|
||||||
|
"""
|
||||||
|
# 默认实现:调用同步处理方法
|
||||||
|
self.process(data)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return f"Processor[{self.name}]"
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
from typing import Iterator
|
||||||
|
import cv2
|
||||||
|
from ultralytics import YOLO
|
||||||
|
from .pipeline_data import PipelineData
|
||||||
|
|
||||||
|
|
||||||
|
def video_yolo_iterator(video_path: str, model_path: str = "yolov8n.pt", tracker: str = "botsort.yaml",
|
||||||
|
device: str = "0") -> Iterator[PipelineData]:
|
||||||
|
"""
|
||||||
|
从视频读取帧,经YOLO跟踪检测,返回PipelineData迭代器
|
||||||
|
:param video_path: 视频文件路径(或0表示摄像头)
|
||||||
|
:param model_path: YOLO模型路径(默认使用yolov8n)
|
||||||
|
:param tracker: 跟踪器配置文件(默认使用botsort)
|
||||||
|
:param device: 设备标识(默认使用GPU 0)
|
||||||
|
:return: 包含YOLO跟踪结果的PipelineData迭代器
|
||||||
|
"""
|
||||||
|
# 加载YOLO模型
|
||||||
|
model = YOLO(model_path)
|
||||||
|
# 打开视频
|
||||||
|
cap = cv2.VideoCapture(video_path)
|
||||||
|
if not cap.isOpened():
|
||||||
|
raise ValueError(f"无法打开视频: {video_path}")
|
||||||
|
|
||||||
|
frame_idx = 0
|
||||||
|
try:
|
||||||
|
# 使用 model.track 替代 model 进行跟踪检测
|
||||||
|
results_stream = model.track(source=video_path, tracker=tracker, device=device, stream=True)
|
||||||
|
|
||||||
|
last_data = PipelineData()
|
||||||
|
for r in results_stream: # 迭代跟踪结果
|
||||||
|
# 读取对应的帧
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
break # 视频读取完毕
|
||||||
|
|
||||||
|
|
||||||
|
# 构建数据包
|
||||||
|
data = PipelineData()
|
||||||
|
data.set_to_cache(last_data.result_cache)
|
||||||
|
data.current_result = r
|
||||||
|
data.frame = frame
|
||||||
|
data.frame_idx = frame_idx
|
||||||
|
# 获取时间戳(毫秒)
|
||||||
|
data.timestamp = cap.get(cv2.CAP_PROP_POS_MSEC)
|
||||||
|
# 将结果加入缓存
|
||||||
|
data.add_to_cache(r)
|
||||||
|
|
||||||
|
yield data
|
||||||
|
last_data = data
|
||||||
|
frame_idx += 1
|
||||||
|
finally:
|
||||||
|
# 确保视频流关闭
|
||||||
|
cap.release()
|
||||||
|
|
||||||
|
|
||||||
|
def video_yolo_detect_iterator(video_path: str, model_path: str = "yolov8n.pt", device: str = "0") -> Iterator[
|
||||||
|
PipelineData]:
|
||||||
|
"""
|
||||||
|
从视频读取帧,经YOLO检测(非跟踪),返回PipelineData迭代器
|
||||||
|
:param video_path: 视频文件路径(或0表示摄像头)
|
||||||
|
:param model_path: YOLO模型路径(默认使用yolov8n)
|
||||||
|
:param device: 设备标识(默认使用GPU 0)
|
||||||
|
:return: 包含YOLO检测结果的PipelineData迭代器
|
||||||
|
"""
|
||||||
|
# 加载YOLO模型
|
||||||
|
model = YOLO(model_path)
|
||||||
|
# 打开视频
|
||||||
|
cap = cv2.VideoCapture(video_path)
|
||||||
|
if not cap.isOpened():
|
||||||
|
raise ValueError(f"无法打开视频: {video_path}")
|
||||||
|
|
||||||
|
frame_idx = 0
|
||||||
|
try:
|
||||||
|
while cap.isOpened():
|
||||||
|
# 读取一帧
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
break # 视频读取完毕
|
||||||
|
|
||||||
|
# YOLO检测(stream=True表示流式处理,提升效率)
|
||||||
|
results = model(frame, device=device, stream=True)
|
||||||
|
for r in results: # 迭代结果(单帧只有一个结果)
|
||||||
|
# 构建数据包
|
||||||
|
data = PipelineData()
|
||||||
|
data.current_result = r
|
||||||
|
data.frame = frame
|
||||||
|
data.frame_idx = frame_idx
|
||||||
|
# 获取时间戳(毫秒)
|
||||||
|
data.timestamp = cap.get(cv2.CAP_PROP_POS_MSEC)
|
||||||
|
# 将结果加入缓存(默认最多30帧)
|
||||||
|
data.add_to_cache(r)
|
||||||
|
|
||||||
|
yield data
|
||||||
|
frame_idx += 1
|
||||||
|
finally:
|
||||||
|
# 确保视频流关闭
|
||||||
|
cap.release()
|
||||||
@@ -0,0 +1,84 @@
|
|||||||
|
from typing import List, Iterator
|
||||||
|
import asyncio
|
||||||
|
from .base_processor import BaseProcessor
|
||||||
|
from .pipeline_data import PipelineData
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
"""管道核心类,管理多个处理器,负责数据流转"""
|
||||||
|
def __init__(self):
|
||||||
|
self.processors: List[BaseProcessor] = [] # 同步处理器列表
|
||||||
|
self.async_processors: List[BaseProcessor] = [] # 异步处理器列表
|
||||||
|
|
||||||
|
def add_processor(self, processor: BaseProcessor) -> None:
|
||||||
|
"""向管道添加同步处理器(按添加顺序执行)"""
|
||||||
|
self.processors.append(processor)
|
||||||
|
print(f"管道已添加同步处理器: {processor}")
|
||||||
|
|
||||||
|
def add_processor_async(self, processor: BaseProcessor) -> None:
|
||||||
|
"""向管道添加异步处理器(异步执行,不阻塞数据流)"""
|
||||||
|
self.async_processors.append(processor)
|
||||||
|
print(f"管道已添加异步处理器: {processor}")
|
||||||
|
|
||||||
|
def remove_processor(self, processor_name: str) -> bool:
|
||||||
|
"""移除指定名称的处理器"""
|
||||||
|
# 检查同步处理器列表
|
||||||
|
for idx, p in enumerate(self.processors):
|
||||||
|
if p.name == processor_name:
|
||||||
|
self.processors.pop(idx)
|
||||||
|
print(f"管道已移除同步处理器: {p}")
|
||||||
|
return True
|
||||||
|
# 检查异步处理器列表
|
||||||
|
for idx, p in enumerate(self.async_processors):
|
||||||
|
if p.name == processor_name:
|
||||||
|
self.async_processors.pop(idx)
|
||||||
|
print(f"管道已移除异步处理器: {p}")
|
||||||
|
return True
|
||||||
|
print(f"未找到处理器: {processor_name}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def run(self, data_iterator: Iterator[PipelineData]) -> Iterator[PipelineData]:
|
||||||
|
"""
|
||||||
|
运行管道:迭代输入数据,依次经过所有同步处理器,同时异步执行异步处理器
|
||||||
|
:param data_iterator: 数据源迭代器(如视频+YOLO的迭代器)
|
||||||
|
:return: 处理后的数据包迭代器
|
||||||
|
"""
|
||||||
|
# 创建异步任务列表,用于管理异步处理器
|
||||||
|
async def run_async_processors(data: PipelineData):
|
||||||
|
tasks = []
|
||||||
|
for async_processor in self.async_processors:
|
||||||
|
task = asyncio.create_task(self._process_async(async_processor, data))
|
||||||
|
tasks.append(task)
|
||||||
|
if tasks:
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
for data in data_iterator:
|
||||||
|
# 依次执行每个同步处理器的处理逻辑
|
||||||
|
for processor in self.processors:
|
||||||
|
data = processor.process(data)
|
||||||
|
|
||||||
|
# 异步执行异步处理器,不阻塞数据流
|
||||||
|
if self.async_processors:
|
||||||
|
# 在独立的事件循环中运行异步处理器(如果可用)
|
||||||
|
try:
|
||||||
|
# 尝试获取当前事件循环
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
# 在当前事件循环中调度异步处理器
|
||||||
|
asyncio.create_task(run_async_processors(data.copy() if hasattr(data, 'copy') else data))
|
||||||
|
except RuntimeError:
|
||||||
|
# 如果没有运行中的事件循环,则创建一个新的
|
||||||
|
asyncio.run(run_async_processors(data.copy() if hasattr(data, 'copy') else data))
|
||||||
|
|
||||||
|
yield data # 返回处理后的数据包
|
||||||
|
|
||||||
|
async def _process_async(self, processor, data):
|
||||||
|
"""异步执行单个处理器"""
|
||||||
|
try:
|
||||||
|
# 对数据进行异步处理(如果处理器支持)
|
||||||
|
if hasattr(processor, 'process_async'):
|
||||||
|
await processor.process_async(data)
|
||||||
|
else:
|
||||||
|
# 如果处理器不支持异步处理,则同步处理
|
||||||
|
processor.process(data)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"异步处理器 {processor.name} 执行出错: {e}")
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
from typing import List, Optional, Dict, Any
|
||||||
|
import copy
|
||||||
|
import cv2
|
||||||
|
from ultralytics.engine.results import Results
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineData:
|
||||||
|
"""管道传输的数据包,承载YOLO检测结果、缓存及辅助信息"""
|
||||||
|
def __init__(self):
|
||||||
|
# 当前帧的YOLO检测结果
|
||||||
|
self.current_result: Optional[Results] = None
|
||||||
|
# 检测结果缓存(list[Results])
|
||||||
|
self.result_cache: List[Results] = []
|
||||||
|
# 辅助信息:帧数据、时间戳、帧序号(便于调试/扩展)
|
||||||
|
self.frame: Optional[cv2.Mat] = None
|
||||||
|
self.timestamp: Optional[float] = None
|
||||||
|
self.frame_idx: int = 0
|
||||||
|
# 内部数据存储map
|
||||||
|
self.internal_map: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
def add_to_cache(self, result: Results, max_cache_size: int = 125) -> None:
|
||||||
|
"""将结果加入缓存,控制缓存最大长度"""
|
||||||
|
self.result_cache.append(result)
|
||||||
|
if len(self.result_cache) > max_cache_size:
|
||||||
|
self.result_cache.pop(0) # 移除最旧的缓存
|
||||||
|
|
||||||
|
def set_to_cache(self, result_cache: List[Results]) -> None:
|
||||||
|
"""设置缓存"""
|
||||||
|
self.result_cache = result_cache
|
||||||
|
|
||||||
|
def clear_cache(self) -> None:
|
||||||
|
"""清空缓存"""
|
||||||
|
self.result_cache.clear()
|
||||||
|
|
||||||
|
def copy(self):
|
||||||
|
"""创建当前PipelineData的副本"""
|
||||||
|
new_data = PipelineData()
|
||||||
|
new_data.current_result = self.current_result
|
||||||
|
new_data.result_cache = self.result_cache.copy()
|
||||||
|
new_data.frame = self.frame.copy() if self.frame is not None else None
|
||||||
|
new_data.timestamp = self.timestamp
|
||||||
|
new_data.frame_idx = self.frame_idx
|
||||||
|
new_data.internal_map = self.internal_map.copy()
|
||||||
|
return new_data
|
||||||
|
|
||||||
|
def put_data(self, data_type: str, key: str, value: Any) -> None:
|
||||||
|
"""存储数据到内部map
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_type: 数据类型标识
|
||||||
|
key: 数据键名
|
||||||
|
value: 数据值
|
||||||
|
"""
|
||||||
|
map_key = f"{data_type}:{key}"
|
||||||
|
self.internal_map[map_key] = value
|
||||||
|
|
||||||
|
def get_data(self, data_label: str, key: str, target_type=None):
|
||||||
|
"""从内部map获取数据
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_label: 数据来源标识
|
||||||
|
key: 数据键名
|
||||||
|
target_type: 目标类型,用于类型转换
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
获取的数据,如果不存在则返回None
|
||||||
|
"""
|
||||||
|
map_key = f"{data_label}:{key}"
|
||||||
|
value = self.internal_map.get(map_key)
|
||||||
|
|
||||||
|
if value is not None and target_type is not None:
|
||||||
|
try:
|
||||||
|
value = target_type(value)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
# 如果类型转换失败,返回原始值
|
||||||
|
pass
|
||||||
|
|
||||||
|
return value
|
||||||
Reference in New Issue
Block a user