""" yolo_graffiti_lane_with_vectors.py 思路与架构(中文说明 + 简易架构图) 整体思路: - 使用 YOLO 跟踪(Ultralytics 的 model.track(..., stream=True)),获取每帧检测与 track id。 - 每辆车的有效移动段(去抖后)会在画面上用矩形带(vehicle_width)在热力图上留下涂鸦(heat 0..MAX_HEAT),同时累积方向向量(dir_x/dir_y)。 - 判断逆向:当车辆经过已存在热区,计算区域平均方向与当前车辆方向的余弦相似度,结合热度与重叠比例判定 SUSPECT / CROSS_LINE / REVERSE。 - 为了性能,采用 ROI 局部操作和延迟(lazy)衰减:对涂鸦只在其最小包围矩形(ROI)内进行掩码计算与热力/方向更新,且热力的衰减通过“按帧懒处理”来减少整图操作。 架构(简化 ASCII 图): +----------------------------+ | Video frames (stream) | <-- ultralytics model.track +------------+---------------+ | v +------------+---------------+ +----------------+ | Detection & Tracking (YOLO)| -----> | per-track hist | +------------+---------------+ +----------------+ | | v v +--------------------------------------+ update when | Graffiti Manager (heat, dir maps) |<-- vehicle passes | - lazy ROI decay | | - apply/remove paint in ROI | +--------------------------------------+ | v +--------------------------------------+ | Visualizer: | | - draw bbox & track vectors | | - draw polygon & local arrow | | - draw grid-sampled persistent arrows (performance friendly) +--------------------------------------+ 性能关键点改进: 1. 避免每帧对整张 heat/dir 做逐像素衰减;改为:只有在需要更新的 ROI 上按累计帧数做懒衰减(lazy_decay),并且每 N 帧做一次整图清理(full decay)。 2. 对多边形掩码只在 ROI(boundingRect)内执行,减少 fillPoly 的像素数。 3. 将热区方向的持久热箭头以网格采样绘制,且可设置每 N 帧绘制一次,减少绘制开销。 下面为优化后的代码,已全部改为中文注释并包含性能优化。 """ import cv2 import numpy as np from collections import deque, defaultdict from ultralytics import YOLO import math # -------------------------- # 可调参数(按需调整) # -------------------------- MAX_HEAT = 10.0 # 热力最大值 INC_PER_PASS = 1.0 # 每次经过热力增加量 DEC_PER_OPPOSITE = 1.0 # 逆向经过时减小热力的基准量 TRUST_THRESHOLD = 3.0 # 热力 >= 3 视为可信区域 PARTIAL_OVERLAP_THRESHOLD = 0.1 MAJORITY_OVERLAP_THRESHOLD = 0.5 MIN_MOVE_DIST = 6.0 # 抑制抖动:一帧移动小于此值不计入轨迹 MIN_CUM_DIST = 20.0 # 累计移动距离阈值,超过才认为驶过 HISTORY_LEN = 16 # 每个 track 保留历史点数 HEAT_DECAY_PER_FRAME = 0.002 # 每帧的线性衰减比例(小) OPPOSITE_ANGLE_COS_THRESH = -0.9397 # 余弦 < -0.5(120°~180°) 认为明显相反方向 VEHICLE_CLASSES = {2, 3, 5, 7} # COCO 的车辆类别(car,motorcycle,bus,truck) ARROW_THICK = 2 ARROW_LEN_MIN = 10.0 # 懒衰减与整图清理参数 FULL_DECAY_EVERY = 30 # 每 N 帧对整图做一次完整衰减(把 pending decay 应用到全图) # 网格采样用于在热力图上绘制持久方向箭头(性能友好) GRID_STEP = 40 GRID_HEAT_THRESHOLD = 0.6 # 网格平均热度占最大热度的阈值 GRID_DRAW_EVERY = 2 # 每 N 帧绘制一次网格箭头 # -------------------------- # Graffiti 数据结构与方法(含懒衰减) # -------------------------- class GraffitiLane: def __init__(self, frame_w, frame_h): self.w = frame_w self.h = frame_h # heat、dir_x、dir_y 为整图数组(float32) self.heat = np.zeros((self.h, self.w), dtype=np.float32) self.dir_x = np.zeros((self.h, self.w), dtype=np.float32) self.dir_y = np.zeros((self.h, self.w), dtype=np.float32) # 记录上次全局衰减帧编号(用于懒衰减计算) self.last_decay_frame = 0 def _decay_factor(self, frames): """计算在 frames 帧内累计衰减因子""" if frames <= 0: return 1.0 return (1.0 - HEAT_DECAY_PER_FRAME) ** frames def lazy_decay_roi(self, bbox, current_frame): """ 对给定 ROI(bbox)应用从 self.last_decay_frame 到 current_frame 的累计衰减。 这样我们不需要每帧对整图做衰减,只在需要写入/读取 ROI 时才把 pending 衰减应用到 ROI 上。 bbox: (x, y, w, h) """ x, y, w, h = bbox if w <= 0 or h <= 0: return frames = current_frame - self.last_decay_frame if frames <= 0: return factor = self._decay_factor(frames) # 只在 ROI 上乘以因子(就地修改) self.heat[y:y+h, x:x+w] *= factor self.dir_x[y:y+h, x:x+w] *= factor self.dir_y[y:y+h, x:x+w] *= factor def full_decay(self, current_frame): """ 在整图上强制应用 pending 衰减(用于周期性清理,避免长期堆积误差)。 """ frames = current_frame - self.last_decay_frame if frames <= 0: return factor = self._decay_factor(frames) if factor == 1.0: self.last_decay_frame = current_frame return # 对整图应用因子 self.heat *= factor self.dir_x *= factor self.dir_y *= factor self.last_decay_frame = current_frame # 以下方法均采用 ROI 操作以提高性能 def apply_paint_roi(self, bbox, mask_roi, direction_vector, inc=INC_PER_PASS, current_frame=0): # 先对 ROI 应用懒衰减(把 pending decay 应用在将要修改的区域上) self.lazy_decay_roi(bbox, current_frame) x, y, w, h = bbox add = inc * mask_roi.astype(np.float32) # 对 ROI 做写入(加热并累加方向) self.heat[y:y+h, x:x+w] = np.minimum(MAX_HEAT, self.heat[y:y+h, x:x+w] + add) dx, dy = direction_vector self.dir_x[y:y+h, x:x+w] += dx * add self.dir_y[y:y+h, x:x+w] += dy * add def remove_paint_roi(self, bbox, mask_roi, amount=DEC_PER_OPPOSITE, current_frame=0): # 先对 ROI 应用懒衰减 self.lazy_decay_roi(bbox, current_frame) x, y, w, h = bbox dec = amount * mask_roi.astype(np.float32) roi_heat_before = self.heat[y:y+h, x:x+w].copy() roi_heat_after = np.maximum(0.0, roi_heat_before - dec) # 计算缩放比并更新方向累积 eps = 1e-6 ratio = np.ones_like(roi_heat_before) nonzero_mask = roi_heat_before > eps ratio[nonzero_mask] = roi_heat_after[nonzero_mask] / (roi_heat_before[nonzero_mask] + eps) self.dir_x[y:y+h, x:x+w] *= ratio self.dir_y[y:y+h, x:x+w] *= ratio self.heat[y:y+h, x:x+w] = roi_heat_after def region_avg_direction_roi(self, bbox, mask_roi): x, y, w, h = bbox mask = mask_roi.astype(bool) if mask.sum() == 0: return None sx = self.dir_x[y:y+h, x:x+w][mask].sum() sy = self.dir_y[y:y+h, x:x+w][mask].sum() vec = np.array([sx, sy], dtype=np.float32) norm = np.linalg.norm(vec) if norm == 0: return None return vec / norm def region_avg_heat_roi(self, bbox, mask_roi): x, y, w, h = bbox mask = mask_roi.astype(bool) if mask.sum() == 0: return 0.0 return float(self.heat[y:y+h, x:x+w][mask].mean()) # -------------------------- # 辅助函数(中文注释) # -------------------------- def polygon_from_segment(center_prev, center_cur, veh_width_px): """从两点构造矩形带(作为车辆经过的涂鸦段)""" x0, y0 = center_prev x1, y1 = center_cur vx = x1 - x0 vy = y1 - y0 dist = math.hypot(vx, vy) if dist == 0: return None ux, uy = vx / dist, vy / dist px, py = -uy, ux half_w = veh_width_px / 2.0 p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w))) p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w))) p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w))) p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w))) return np.array([p0, p1, p2, p3], dtype=np.int32) def polygon_mask_roi_from_pts(pts, img_shape): """在多边形的最小包围矩形内生成局部掩码并返回 bbox, mask_roi""" if pts is None or len(pts) == 0: return (0,0,0,0), np.zeros((0,0), dtype=np.uint8) x, y, w, h = cv2.boundingRect(pts) x2 = min(x+w, img_shape[1]) y2 = min(y+h, img_shape[0]) w = x2 - x h = y2 - y if w <= 0 or h <= 0: return (0,0,0,0), np.zeros((0,0), dtype=np.uint8) pts_roi = pts.copy() pts_roi[:,0] -= x pts_roi[:,1] -= y mask = np.zeros((h, w), dtype=np.uint8) cv2.fillPoly(mask, [pts_roi], 1) return (x, y, w, h), mask def center_from_bbox(x1, y1, x2, y2): return ((x1 + x2) / 2.0, (y1 + y2) / 2.0) def normalize_vec(v): v = np.array(v, dtype=np.float32) norm = np.linalg.norm(v) if norm == 0: return np.array([0.0, 0.0], dtype=np.float32) return v / norm def cos_between(a, b): an = normalize_vec(a) bn = normalize_vec(b) return float(np.dot(an, bn)) def draw_arrow(img, start, vec, color=(0,255,0), thickness=2, tip_length=0.3): """绘制箭头(保证最小像素长度以提高可见性)""" sx, sy = int(round(start[0])), int(round(start[1])) vx, vy = float(vec[0]), float(vec[1]) norm = math.hypot(vx, vy) if norm < 1e-3: return scale = 1.0 if norm < ARROW_LEN_MIN: scale = ARROW_LEN_MIN / (norm + 1e-6) ex = int(round(sx + vx * scale)) ey = int(round(sy + vy * scale)) cv2.arrowedLine(img, (sx, sy), (ex, ey), color, thickness, tipLength=tip_length) # -------------------------- # 主处理逻辑(含中文注释) # -------------------------- def process_video_with_graffiti(video_path, model_path, out_path=None, show=True, device="0"): """主流程:读取视频流 -> YOLO 跟踪 -> 处理每帧检测 -> 可视化并保存""" model = YOLO(model_path) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"无法打开视频:{video_path}") fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() graffiti = GraffitiLane(w, h) # 存储每个 track 的历史中心点与累计距离(用于抑制抖动与判断驶过) track_hist = defaultdict(lambda: deque(maxlen=HISTORY_LEN)) track_cum_dist = defaultdict(float) track_last_frame = {} frame_idx = 0 writer = None if out_path: fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h)) # 使用 stream=True 逐帧获取结果 results_stream = model.track(source=video_path, tracker="botsort.yaml", device=device, stream=True) for res in results_stream: frame_idx += 1 frame = getattr(res, 'orig_img', None) if frame is None: frame = getattr(res, 'img', None) if frame is None: continue # 周期性对整图做完整衰减(清理 pending decay,避免值长时间不更新) if frame_idx % FULL_DECAY_EVERY == 0: graffiti.full_decay(frame_idx) boxes = getattr(res, "boxes", None) vis = frame.copy() if boxes is not None: for box in boxes: # 解析 box,兼容不同 ultralytics 版本的返回格式 try: xyxy = getattr(box, "xyxy", None) if xyxy is None: xyxy = box.data if hasattr(box, "data") else None if xyxy is None: continue x1, y1, x2, y2 = map(float, xyxy[0].tolist()) if hasattr(xyxy, "__len__") and hasattr(xyxy[0], "tolist") else xyxy cls = int(getattr(box, "cls", -1)) track_id = int(getattr(box, "id", -1)) except Exception: try: x1, y1, x2, y2 = box cls = -1 track_id = -1 except: continue # 仅处理车辆类 if cls not in VEHICLE_CLASSES and len(VEHICLE_CLASSES) > 0: continue # 绘制车辆 bbox(黄色边框) cv2.rectangle(vis, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,255), 2) cx, cy = center_from_bbox(x1, y1, x2, y2) # 更新并抑制抖动 prev_centers = track_hist[track_id] if len(prev_centers) > 0: last = prev_centers[-1] move_dist = math.hypot(cx - last[0], cy - last[1]) else: move_dist = 0.0 if len(prev_centers) > 0: track_cum_dist[track_id] += move_dist else: track_cum_dist[track_id] = 0.0 if move_dist >= MIN_MOVE_DIST: prev_centers.append((cx, cy)) else: if len(prev_centers) == 0: prev_centers.append((cx, cy)) # 当累计距离达到阈值时,构造一段涂鸦并更新 graffiti if track_cum_dist[track_id] >= MIN_CUM_DIST and len(prev_centers) >= 2: cp_prev = prev_centers[-2] cp_cur = prev_centers[-1] veh_width_px = max(8.0, x2 - x1) poly = polygon_from_segment(cp_prev, cp_cur, veh_width_px) if poly is None: continue # 在多边形最小包围矩形生成局部掩码并返回 ROI bbox_roi, mask_roi = polygon_mask_roi_from_pts(poly, frame.shape) if mask_roi.size == 0: continue #当前车辆方向(单位向量) dir_vec = normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1])) dir_vec_scaled = dir_vec.astype(np.float32) * 1.0 # 在 ROI 上查询区域信息(方法内部不做整图衰减,使用 lazy_decay) region_heat = graffiti.region_avg_heat_roi(bbox_roi, mask_roi) region_dir = graffiti.region_avg_direction_roi(bbox_roi, mask_roi) # 计算在 ROI 内与现有热区的重叠比例 x, y, w_roi, h_roi = bbox_roi overlap_pixels = np.logical_and(mask_roi > 0, graffiti.heat[y:y+h_roi, x:x+w_roi] > 0) overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum()) # 根据规则判断是否为逆向 / 可疑 / 正常并更新热力 if region_heat >= 1.0 and region_dir is not None: c = cos_between(region_dir, dir_vec_scaled) if c < OPPOSITE_ANGLE_COS_THRESH: if region_heat >= TRUST_THRESHOLD: if overlap_ratio >= MAJORITY_OVERLAP_THRESHOLD: event = "REVERSE" elif overlap_ratio >= PARTIAL_OVERLAP_THRESHOLD: event = "CROSS_LINE" else: event = "SUSPECT" graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * overlap_ratio, current_frame=frame_idx) else: event = "SUSPECT" graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * 0.5, current_frame=frame_idx) else: event = None graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx) else: event = None graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx) # 重置累计距离以避免重复涂鸦 track_cum_dist[track_id] = 0.0 track_last_frame[track_id] = (frame_idx, event) # 立即绘制当前段的多边形边界与方向箭头(局部) try: cv2.polylines(vis, [poly], True, (0,255,0), 2) poly_center = np.mean(poly, axis=0) draw_arrow(vis, (poly_center[0], poly_center[1]), (dir_vec_scaled[0]*20, dir_vec_scaled[1]*20), color=(0,255,0), thickness=2) except Exception: pass # 始终绘制车辆当前方向向量(红色) if len(prev_centers) >= 2: p0 = prev_centers[-2] p1 = prev_centers[-1] v = (p1[0]-p0[0], p1[1]-p0[1]) draw_arrow(vis, (cx, cy), (v[0], v[1]), color=(0,0,255), thickness=2) # 绘制车辆 id 文本 cv2.putText(vis, f"id:{track_id}", (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1) # 可视化热力图:红色通道叠加(注意:热力在 ROI 更新时已应用懒衰减) heat_norm = np.clip(graffiti.heat / MAX_HEAT, 0.0, 1.0) heat_u8 = (heat_norm * 255).astype(np.uint8) red_overlay = np.zeros_like(frame) red_overlay[:, :, 2] = heat_u8 alpha = 0.45 vis = cv2.addWeighted(vis, 1.0, red_overlay, alpha, 0) # 在网格上绘制方向箭头以展示持久热区方向(降低频率以提升性能) if frame_idx % GRID_DRAW_EVERY == 0: gh, gw = graffiti.h, graffiti.w for gy in range(0, gh, GRID_STEP): for gx in range(0, gw, GRID_STEP): x0 = gx y0 = gy x1 = min(gx + GRID_STEP, gw) y1 = min(gy + GRID_STEP, gh) win = graffiti.heat[y0:y1, x0:x1] if win.size == 0: continue avg_heat = float(win.mean()) if avg_heat / MAX_HEAT >= GRID_HEAT_THRESHOLD: avg_dx = float(graffiti.dir_x[y0:y1, x0:x1].sum()) avg_dy = float(graffiti.dir_y[y0:y1, x0:x1].sum()) if avg_dx == 0 and avg_dy == 0: continue cell_center = (x0 + (x1-x0)/2.0, y0 + (y1-y0)/2.0) draw_arrow(vis, cell_center, (avg_dx * 5.0, avg_dy * 5.0), color=(0,128,255), thickness=2) # 绘制轨迹与事件标签 for tid, hist in track_hist.items(): if len(hist) == 0: continue pts = np.array(hist, dtype=np.int32) for i in range(1, len(pts)): cv2.line(vis, tuple(pts[i-1]), tuple(pts[i]), (255, 255, 255), 1) cx, cy = hist[-1] cv2.circle(vis, (int(cx), int(cy)), 3, (255, 255, 255), -1) if tid in track_last_frame and track_last_frame[tid] is not None: last_frame_idx, event = track_last_frame[tid] if event is not None and frame_idx - last_frame_idx <= 5: txt = f"id:{tid} {event}" cv2.putText(vis, txt, (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2) # 显示帧序号与调试信息 cv2.putText(vis, f"Frame:{frame_idx}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200,200,200), 2) cv2.putText(vis, f"HeatMax:{MAX_HEAT} TrustThr:{TRUST_THRESHOLD}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200,200,200), 1) if show: cv2.imshow("graffiti_lane_vectors", vis) key = cv2.waitKey(1) if key == 27: break if writer is not None: writer.write(vis) if writer is not None: writer.release() cv2.destroyAllWindows() # -------------------------- # 运行入口(示例) # -------------------------- if __name__ == "__main__": video_path = "./123.mp4" model_path = "yolo11s.pt" out_path = "out_graffiti_vectors_opt2.mp4" process_video_with_graffiti(video_path, model_path, out_path=out_path, show=True, device="0")