fix(pipeline): 修复管道处理器异常处理机制

- 在同步处理器中添加异常捕获和错误提示
- 修复异步处理器异常处理的日志输出格式
- 添加数据复制机制避免异步处理时的数据竞争
- 优化数据流转逻辑确保处理后的数据正确返回
This commit is contained in:
zimoyin
2026-01-10 10:40:54 +08:00
parent 5f2e856ad7
commit 564037d705
2 changed files with 10 additions and 4 deletions
View File
+10 -4
View File
@@ -6,6 +6,7 @@ from .pipeline_data import PipelineData
class Pipeline: class Pipeline:
"""管道核心类,管理多个处理器,负责数据流转""" """管道核心类,管理多个处理器,负责数据流转"""
def __init__(self): def __init__(self):
self.processors: List[BaseProcessor] = [] # 同步处理器列表 self.processors: List[BaseProcessor] = [] # 同步处理器列表
self.async_processors: List[BaseProcessor] = [] # 异步处理器列表 self.async_processors: List[BaseProcessor] = [] # 异步处理器列表
@@ -43,6 +44,7 @@ class Pipeline:
:param data_iterator: 数据源迭代器(如视频+YOLO的迭代器) :param data_iterator: 数据源迭代器(如视频+YOLO的迭代器)
:return: 处理后的数据包迭代器 :return: 处理后的数据包迭代器
""" """
# 创建异步任务列表,用于管理异步处理器 # 创建异步任务列表,用于管理异步处理器
async def run_async_processors(data: PipelineData): async def run_async_processors(data: PipelineData):
tasks = [] tasks = []
@@ -55,8 +57,12 @@ class Pipeline:
for data in data_iterator: for data in data_iterator:
# 依次执行每个同步处理器的处理逻辑 # 依次执行每个同步处理器的处理逻辑
for processor in self.processors: for processor in self.processors:
data = processor.process(data) try:
data = processor.process(data)
yield data
except Exception as e:
print(f"同步处理器 {processor.name} 执行出错: {e}")
# 异步执行异步处理器,不阻塞数据流 # 异步执行异步处理器,不阻塞数据流
if self.async_processors: if self.async_processors:
# 在独立的事件循环中运行异步处理器(如果可用) # 在独立的事件循环中运行异步处理器(如果可用)
@@ -68,7 +74,7 @@ class Pipeline:
except RuntimeError: except RuntimeError:
# 如果没有运行中的事件循环,则创建一个新的 # 如果没有运行中的事件循环,则创建一个新的
asyncio.run(run_async_processors(data.copy() if hasattr(data, 'copy') else data)) asyncio.run(run_async_processors(data.copy() if hasattr(data, 'copy') else data))
yield data # 返回处理后的数据包 yield data # 返回处理后的数据包
async def _process_async(self, processor, data): async def _process_async(self, processor, data):
@@ -81,4 +87,4 @@ class Pipeline:
# 如果处理器不支持异步处理,则同步处理 # 如果处理器不支持异步处理,则同步处理
processor.process(data) processor.process(data)
except Exception as e: except Exception as e:
print(f"异步处理器 {processor.name} 执行出错: {e}") print(f"异步处理器 {processor.name} 执行出错: {e}")