Files
zimoyin cdf228fe56 feat(app): 集成loguru日志系统并优化错误处理
- 在app.py中引入loguru并配置日志轮转、异步输出等功能
- 添加全局日志初始化函数和程序启动/退出日志记录
- 将所有print语句替换为logger.info/error/debug/warning方法
- 在data_source.py中添加模型加载和视频打开的日志记录
- 在各个处理器中集成日志记录器实例并记录处理状态
- 修改处理器模块导入路径以符合相对导入规范
- 在requirements.txt中添加loguru依赖包
- 统一异常处理的日志记录方式,便于调试和监控
2026-01-10 18:03:18 +08:00

245 lines
8.7 KiB
Python

import cv2
import numpy as np
from ..base_processor import BaseProcessor
from ..pipeline_data import PipelineData
import math
class RetrogradeProcessor(BaseProcessor):
"""逆行检测处理器:检测逆向行驶的车辆"""
def __init__(self, name: str = "逆行检测处理器"):
super().__init__(name)
# 可调参数
self.TRUST_THRESHOLD = 3.0
self.PARTIAL_OVERLAP_THRESHOLD = 0.1
self.MAJORITY_OVERLAP_THRESHOLD = 0.5
self.DEC_PER_OPPOSITE = 1.0
# self.OPPOSITE_ANGLE_COS_THRESH = -0.9397
self.OPPOSITE_ANGLE_COS_THRESH = math.cos(math.radians(1))
self.VEHICLE_CLASSES = {2, 3, 5, 7}
# 存储上一帧的事件信息
self.track_last_event = {}
self.frame_idx = 0
def process(self, data: PipelineData) -> PipelineData:
"""检测逆行行为"""
if data.current_result is None:
return data
self.frame_idx = data.frame_idx
# 获取涂鸦数据
heat_map = data.get_data("graffiti", "heat_map")
dir_x_map = data.get_data("graffiti", "dir_x_map")
dir_y_map = data.get_data("graffiti", "dir_y_map")
track_hist = data.get_data("graffiti", "track_hist")
if heat_map is None or track_hist is None:
return data
boxes = getattr(data.current_result, "boxes", None)
if boxes is None:
return data
frame_events = {}
events_all0 = {}
# 处理每个检测框
for box in boxes:
try:
# 解析检测框
xyxy = getattr(box, "xyxy", None)
if xyxy is None:
continue
x1, y1, x2, y2 = map(float, xyxy[0].tolist())
cls = int(getattr(box, "cls", -1))
track_id = int(getattr(box, "id", -1))
# 仅处理车辆类
if cls not in self.VEHICLE_CLASSES:
continue
# 获取车辆历史轨迹
hist = track_hist.get(track_id, [])
if len(hist) < 2:
continue
# 计算当前方向
cp_prev = hist[-2]
cp_cur = hist[-1]
dir_vec = self._normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1]))
# 计算车辆经过的区域
vehicle_width = max(8.0, x2 - x1)
poly = self._polygon_from_segment(cp_prev, cp_cur, vehicle_width)
if poly is None:
continue
# 创建ROI掩码
frame_shape = heat_map.shape[:2] if len(heat_map.shape) == 2 else heat_map.shape
bbox_roi, mask_roi = self._polygon_mask_roi_from_pts(poly, frame_shape)
if mask_roi.size == 0:
continue
# 计算与现有涂鸦的重叠
x, y, w_roi, h_roi = bbox_roi
heat_roi = heat_map[y:y + h_roi, x:x + w_roi]
overlap_pixels = np.logical_and(mask_roi > 0, heat_roi > 0)
overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum())
# 计算区域平均热力和方向
region_heat = self._region_avg_heat(heat_roi, mask_roi)
region_dir = self._region_avg_direction(dir_x_map[y:y + h_roi, x:x + w_roi],
dir_y_map[y:y + h_roi, x:x + w_roi],
mask_roi)
# 判断事件类型
event = self._detect_retrograde_event(
region_heat, region_dir, dir_vec, overlap_ratio
)
events_all0[track_id] = event
if event:
frame_events[track_id] = event
# 减少涂鸦区域热力(逆向行驶会减少涂鸦可信度)
self._reduce_paint(heat_map, dir_x_map, dir_y_map, bbox_roi, mask_roi, overlap_ratio)
except Exception as e:
continue
# 存储事件信息
data.put_data("retrograde", "current_events", frame_events)
data.put_data("retrograde", "all_events", self.track_last_event)
# 输出逆行信息
for track_id, event in events_all0.items():
self.logger.info(f"【逆行检测器】帧{self.frame_idx} - 检测到事件:{event} - 轨迹ID: {track_id}")
# 更新事件历史
for track_id, event in frame_events.items():
self.track_last_event[track_id] = (self.frame_idx, event)
return data
def _detect_retrograde_event(self, region_heat, region_dir, current_dir, overlap_ratio):
"""检测逆行事件类型"""
if region_heat >= 1.0 and region_dir is not None:
cos_sim = self._cos_between(region_dir, current_dir)
if cos_sim < self.OPPOSITE_ANGLE_COS_THRESH:
if region_heat >= self.TRUST_THRESHOLD:
if overlap_ratio >= self.MAJORITY_OVERLAP_THRESHOLD:
return "REVERSE"
elif overlap_ratio >= self.PARTIAL_OVERLAP_THRESHOLD:
return "CROSS_LINE"
else:
return "SUSPECT"
else:
return "SUSPECT"
else:
return "NORMAL"
return None
def _reduce_paint(self, heat_map, dir_x_map, dir_y_map, bbox_roi, mask_roi, overlap_ratio):
"""减少涂鸦区域热力(逆向行驶惩罚)"""
x, y, w, h = bbox_roi
amount = self.DEC_PER_OPPOSITE * overlap_ratio
dec = amount * mask_roi.astype(np.float32)
# 减少热力
heat_roi = heat_map[y:y + h, x:x + w]
heat_roi_before = heat_roi.copy()
heat_roi_after = np.maximum(0.0, heat_roi_before - dec)
# 按比例减少方向强度
eps = 1e-6
ratio = np.ones_like(heat_roi_before)
nonzero_mask = heat_roi_before > eps
ratio[nonzero_mask] = heat_roi_after[nonzero_mask] / (heat_roi_before[nonzero_mask] + eps)
dir_x_map[y:y + h, x:x + w] *= ratio
dir_y_map[y:y + h, x:x + w] *= ratio
heat_map[y:y + h, x:x + w] = heat_roi_after
def _polygon_from_segment(self, center_prev, center_cur, veh_width_px):
"""从两点构造矩形带"""
x0, y0 = center_prev
x1, y1 = center_cur
vx = x1 - x0
vy = y1 - y0
dist = math.hypot(vx, vy)
if dist == 0:
return None
ux, uy = vx / dist, vy / dist
px, py = -uy, ux
half_w = veh_width_px / 2.0
p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w)))
p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w)))
p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w)))
p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w)))
return np.array([p0, p1, p2, p3], dtype=np.int32)
def _polygon_mask_roi_from_pts(self, pts, img_shape):
"""在多边形最小包围矩形内生成局部掩码"""
if pts is None or len(pts) == 0:
return (0, 0, 0, 0), np.zeros((0, 0), dtype=np.uint8)
x, y, w, h = cv2.boundingRect(pts)
x2 = min(x + w, img_shape[1])
y2 = min(y + h, img_shape[0])
w = x2 - x
h = y2 - y
if w <= 0 or h <= 0:
return (0, 0, 0, 0), np.zeros((0, 0), dtype=np.uint8)
pts_roi = pts.copy()
pts_roi[:, 0] -= x
pts_roi[:, 1] -= y
mask = np.zeros((h, w), dtype=np.uint8)
cv2.fillPoly(mask, [pts_roi], 1)
return (x, y, w, h), mask
def _region_avg_heat(self, heat_roi, mask_roi):
"""计算区域平均热力"""
mask = mask_roi.astype(bool)
if mask.sum() == 0:
return 0.0
return float(heat_roi[mask].mean())
def _region_avg_direction(self, dir_x_roi, dir_y_roi, mask_roi):
"""计算区域平均方向"""
mask = mask_roi.astype(bool)
if mask.sum() == 0:
return None
sx = dir_x_roi[mask].sum()
sy = dir_y_roi[mask].sum()
vec = np.array([sx, sy], dtype=np.float32)
norm = np.linalg.norm(vec)
if norm == 0:
return None
return vec / norm
def _normalize_vec(self, v):
"""归一化向量"""
v = np.array(v, dtype=np.float32)
norm = np.linalg.norm(v)
if norm == 0:
return np.array([0.0, 0.0], dtype=np.float32)
return v / norm
def _cos_between(self, a, b):
"""计算两个向量的余弦相似度"""
an = self._normalize_vec(a)
bn = self._normalize_vec(b)
return float(np.dot(an, bn))