From 564037d705c277112d8bbc36294913a372134686 Mon Sep 17 00:00:00 2001 From: zimoyin <2556608754@qq.com> Date: Sat, 10 Jan 2026 10:40:54 +0800 Subject: [PATCH] =?UTF-8?q?fix(pipeline):=20=E4=BF=AE=E5=A4=8D=E7=AE=A1?= =?UTF-8?q?=E9=81=93=E5=A4=84=E7=90=86=E5=99=A8=E5=BC=82=E5=B8=B8=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在同步处理器中添加异常捕获和错误提示 - 修复异步处理器异常处理的日志输出格式 - 添加数据复制机制避免异步处理时的数据竞争 - 优化数据流转逻辑确保处理后的数据正确返回 --- pipeline/__init__.py | 0 pipeline/pipeline.py | 14 ++++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) create mode 100644 pipeline/__init__.py diff --git a/pipeline/__init__.py b/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/pipeline.py b/pipeline/pipeline.py index 1789986..c4243df 100644 --- a/pipeline/pipeline.py +++ b/pipeline/pipeline.py @@ -6,6 +6,7 @@ from .pipeline_data import PipelineData class Pipeline: """管道核心类,管理多个处理器,负责数据流转""" + def __init__(self): self.processors: List[BaseProcessor] = [] # 同步处理器列表 self.async_processors: List[BaseProcessor] = [] # 异步处理器列表 @@ -43,6 +44,7 @@ class Pipeline: :param data_iterator: 数据源迭代器(如视频+YOLO的迭代器) :return: 处理后的数据包迭代器 """ + # 创建异步任务列表,用于管理异步处理器 async def run_async_processors(data: PipelineData): tasks = [] @@ -55,8 +57,12 @@ class Pipeline: for data in data_iterator: # 依次执行每个同步处理器的处理逻辑 for processor in self.processors: - data = processor.process(data) - + try: + data = processor.process(data) + yield data + except Exception as e: + print(f"同步处理器 {processor.name} 执行出错: {e}") + # 异步执行异步处理器,不阻塞数据流 if self.async_processors: # 在独立的事件循环中运行异步处理器(如果可用) @@ -68,7 +74,7 @@ class Pipeline: except RuntimeError: # 如果没有运行中的事件循环,则创建一个新的 asyncio.run(run_async_processors(data.copy() if hasattr(data, 'copy') else data)) - + yield data # 返回处理后的数据包 async def _process_async(self, processor, data): @@ -81,4 +87,4 @@ class Pipeline: # 如果处理器不支持异步处理,则同步处理 processor.process(data) except Exception as e: - print(f"异步处理器 {processor.name} 执行出错: {e}") \ No newline at end of file + print(f"异步处理器 {processor.name} 执行出错: {e}")