import cv2 import numpy as np from yolo_gs.pipeline.base_processor import BaseProcessor from yolo_gs.pipeline.pipeline_data import PipelineData import math class RetrogradeProcessor(BaseProcessor): """逆行检测处理器:检测逆向行驶的车辆""" def __init__(self, name: str = "逆行检测处理器"): super().__init__(name) # 可调参数 self.TRUST_THRESHOLD = 3.0 self.PARTIAL_OVERLAP_THRESHOLD = 0.1 self.MAJORITY_OVERLAP_THRESHOLD = 0.5 self.DEC_PER_OPPOSITE = 1.0 # self.OPPOSITE_ANGLE_COS_THRESH = -0.9397 self.OPPOSITE_ANGLE_COS_THRESH = math.cos(math.radians(1)) self.VEHICLE_CLASSES = {2, 3, 5, 7} # 存储上一帧的事件信息 self.track_last_event = {} self.frame_idx = 0 def process(self, data: PipelineData) -> PipelineData: """检测逆行行为""" if data.current_result is None: return data self.frame_idx = data.frame_idx # 获取涂鸦数据 heat_map = data.get_data("graffiti", "heat_map") dir_x_map = data.get_data("graffiti", "dir_x_map") dir_y_map = data.get_data("graffiti", "dir_y_map") track_hist = data.get_data("graffiti", "track_hist") if heat_map is None or track_hist is None: return data boxes = getattr(data.current_result, "boxes", None) if boxes is None: return data frame_events = {} events_all0 = {} # 处理每个检测框 for box in boxes: try: # 解析检测框 xyxy = getattr(box, "xyxy", None) if xyxy is None: continue x1, y1, x2, y2 = map(float, xyxy[0].tolist()) cls = int(getattr(box, "cls", -1)) track_id = int(getattr(box, "id", -1)) # 仅处理车辆类 if cls not in self.VEHICLE_CLASSES: continue # 获取车辆历史轨迹 hist = track_hist.get(track_id, []) if len(hist) < 2: continue # 计算当前方向 cp_prev = hist[-2] cp_cur = hist[-1] dir_vec = self._normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1])) # 计算车辆经过的区域 vehicle_width = max(8.0, x2 - x1) poly = self._polygon_from_segment(cp_prev, cp_cur, vehicle_width) if poly is None: continue # 创建ROI掩码 frame_shape = heat_map.shape[:2] if len(heat_map.shape) == 2 else heat_map.shape bbox_roi, mask_roi = self._polygon_mask_roi_from_pts(poly, frame_shape) if mask_roi.size == 0: continue # 计算与现有涂鸦的重叠 x, y, w_roi, h_roi = bbox_roi heat_roi = heat_map[y:y + h_roi, x:x + w_roi] overlap_pixels = np.logical_and(mask_roi > 0, heat_roi > 0) overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum()) # 计算区域平均热力和方向 region_heat = self._region_avg_heat(heat_roi, mask_roi) region_dir = self._region_avg_direction(dir_x_map[y:y + h_roi, x:x + w_roi], dir_y_map[y:y + h_roi, x:x + w_roi], mask_roi) # 判断事件类型 event = self._detect_retrograde_event( region_heat, region_dir, dir_vec, overlap_ratio ) events_all0[track_id] = event if event: frame_events[track_id] = event # 减少涂鸦区域热力(逆向行驶会减少涂鸦可信度) self._reduce_paint(heat_map, dir_x_map, dir_y_map, bbox_roi, mask_roi, overlap_ratio) except Exception as e: continue # 存储事件信息 data.put_data("retrograde", "current_events", frame_events) data.put_data("retrograde", "all_events", self.track_last_event) # 输出逆行信息 for track_id, event in events_all0.items(): print(f"【逆行检测器】帧{self.frame_idx} - 检测到事件:{event} - 轨迹ID: {track_id}") # 更新事件历史 for track_id, event in frame_events.items(): self.track_last_event[track_id] = (self.frame_idx, event) return data def _detect_retrograde_event(self, region_heat, region_dir, current_dir, overlap_ratio): """检测逆行事件类型""" if region_heat >= 1.0 and region_dir is not None: cos_sim = self._cos_between(region_dir, current_dir) if cos_sim < self.OPPOSITE_ANGLE_COS_THRESH: if region_heat >= self.TRUST_THRESHOLD: if overlap_ratio >= self.MAJORITY_OVERLAP_THRESHOLD: return "REVERSE" elif overlap_ratio >= self.PARTIAL_OVERLAP_THRESHOLD: return "CROSS_LINE" else: return "SUSPECT" else: return "SUSPECT" else: return "NORMAL" return None def _reduce_paint(self, heat_map, dir_x_map, dir_y_map, bbox_roi, mask_roi, overlap_ratio): """减少涂鸦区域热力(逆向行驶惩罚)""" x, y, w, h = bbox_roi amount = self.DEC_PER_OPPOSITE * overlap_ratio dec = amount * mask_roi.astype(np.float32) # 减少热力 heat_roi = heat_map[y:y + h, x:x + w] heat_roi_before = heat_roi.copy() heat_roi_after = np.maximum(0.0, heat_roi_before - dec) # 按比例减少方向强度 eps = 1e-6 ratio = np.ones_like(heat_roi_before) nonzero_mask = heat_roi_before > eps ratio[nonzero_mask] = heat_roi_after[nonzero_mask] / (heat_roi_before[nonzero_mask] + eps) dir_x_map[y:y + h, x:x + w] *= ratio dir_y_map[y:y + h, x:x + w] *= ratio heat_map[y:y + h, x:x + w] = heat_roi_after def _polygon_from_segment(self, center_prev, center_cur, veh_width_px): """从两点构造矩形带""" x0, y0 = center_prev x1, y1 = center_cur vx = x1 - x0 vy = y1 - y0 dist = math.hypot(vx, vy) if dist == 0: return None ux, uy = vx / dist, vy / dist px, py = -uy, ux half_w = veh_width_px / 2.0 p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w))) p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w))) p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w))) p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w))) return np.array([p0, p1, p2, p3], dtype=np.int32) def _polygon_mask_roi_from_pts(self, pts, img_shape): """在多边形最小包围矩形内生成局部掩码""" if pts is None or len(pts) == 0: return (0, 0, 0, 0), np.zeros((0, 0), dtype=np.uint8) x, y, w, h = cv2.boundingRect(pts) x2 = min(x + w, img_shape[1]) y2 = min(y + h, img_shape[0]) w = x2 - x h = y2 - y if w <= 0 or h <= 0: return (0, 0, 0, 0), np.zeros((0, 0), dtype=np.uint8) pts_roi = pts.copy() pts_roi[:, 0] -= x pts_roi[:, 1] -= y mask = np.zeros((h, w), dtype=np.uint8) cv2.fillPoly(mask, [pts_roi], 1) return (x, y, w, h), mask def _region_avg_heat(self, heat_roi, mask_roi): """计算区域平均热力""" mask = mask_roi.astype(bool) if mask.sum() == 0: return 0.0 return float(heat_roi[mask].mean()) def _region_avg_direction(self, dir_x_roi, dir_y_roi, mask_roi): """计算区域平均方向""" mask = mask_roi.astype(bool) if mask.sum() == 0: return None sx = dir_x_roi[mask].sum() sy = dir_y_roi[mask].sum() vec = np.array([sx, sy], dtype=np.float32) norm = np.linalg.norm(vec) if norm == 0: return None return vec / norm def _normalize_vec(self, v): """归一化向量""" v = np.array(v, dtype=np.float32) norm = np.linalg.norm(v) if norm == 0: return np.array([0.0, 0.0], dtype=np.float32) return v / norm def _cos_between(self, a, b): """计算两个向量的余弦相似度""" an = self._normalize_vec(a) bn = self._normalize_vec(b) return float(np.dot(an, bn))