diff --git a/test_vehicle_trajectory_analyzer.py b/test_vehicle_trajectory_analyzer.py new file mode 100644 index 0000000..8078761 --- /dev/null +++ b/test_vehicle_trajectory_analyzer.py @@ -0,0 +1,486 @@ +""" +yolo_graffiti_lane_with_vectors.py + +思路与架构(中文说明 + 简易架构图) + +整体思路: +- 使用 YOLO 跟踪(Ultralytics 的 model.track(..., stream=True)),获取每帧检测与 track id。 +- 每辆车的有效移动段(去抖后)会在画面上用矩形带(vehicle_width)在热力图上留下涂鸦(heat 0..MAX_HEAT),同时累积方向向量(dir_x/dir_y)。 +- 判断逆向:当车辆经过已存在热区,计算区域平均方向与当前车辆方向的余弦相似度,结合热度与重叠比例判定 SUSPECT / CROSS_LINE / REVERSE。 +- 为了性能,采用 ROI 局部操作和延迟(lazy)衰减:对涂鸦只在其最小包围矩形(ROI)内进行掩码计算与热力/方向更新,且热力的衰减通过“按帧懒处理”来减少整图操作。 + +架构(简化 ASCII 图): + + +----------------------------+ + | Video frames (stream) | <-- ultralytics model.track + +------------+---------------+ + | + v + +------------+---------------+ +----------------+ + | Detection & Tracking (YOLO)| -----> | per-track hist | + +------------+---------------+ +----------------+ + | | + v v + +--------------------------------------+ update when + | Graffiti Manager (heat, dir maps) |<-- vehicle passes + | - lazy ROI decay | + | - apply/remove paint in ROI | + +--------------------------------------+ + | + v + +--------------------------------------+ + | Visualizer: | + | - draw bbox & track vectors | + | - draw polygon & local arrow | + | - draw grid-sampled persistent arrows (performance friendly) + +--------------------------------------+ + +性能关键点改进: +1. 避免每帧对整张 heat/dir 做逐像素衰减;改为:只有在需要更新的 ROI 上按累计帧数做懒衰减(lazy_decay),并且每 N 帧做一次整图清理(full decay)。 +2. 对多边形掩码只在 ROI(boundingRect)内执行,减少 fillPoly 的像素数。 +3. 将热区方向的持久热箭头以网格采样绘制,且可设置每 N 帧绘制一次,减少绘制开销。 + +下面为优化后的代码,已全部改为中文注释并包含性能优化。 +""" + +import cv2 +import numpy as np +from collections import deque, defaultdict +from ultralytics import YOLO +import math + +# -------------------------- +# 可调参数(按需调整) +# -------------------------- +MAX_HEAT = 10.0 # 热力最大值 +INC_PER_PASS = 1.0 # 每次经过热力增加量 +DEC_PER_OPPOSITE = 1.0 # 逆向经过时减小热力的基准量 +TRUST_THRESHOLD = 3.0 # 热力 >= 3 视为可信区域 +PARTIAL_OVERLAP_THRESHOLD = 0.1 +MAJORITY_OVERLAP_THRESHOLD = 0.5 +MIN_MOVE_DIST = 6.0 # 抑制抖动:一帧移动小于此值不计入轨迹 +MIN_CUM_DIST = 20.0 # 累计移动距离阈值,超过才认为驶过 +HISTORY_LEN = 16 # 每个 track 保留历史点数 +HEAT_DECAY_PER_FRAME = 0.002 # 每帧的线性衰减比例(小) +OPPOSITE_ANGLE_COS_THRESH = -0.9397 # 余弦 < -0.5(120°~180°) 认为明显相反方向 +VEHICLE_CLASSES = {2, 3, 5, 7} # COCO 的车辆类别(car,motorcycle,bus,truck) +ARROW_THICK = 2 +ARROW_LEN_MIN = 10.0 + +# 懒衰减与整图清理参数 +FULL_DECAY_EVERY = 30 # 每 N 帧对整图做一次完整衰减(把 pending decay 应用到全图) +# 网格采样用于在热力图上绘制持久方向箭头(性能友好) +GRID_STEP = 40 +GRID_HEAT_THRESHOLD = 0.6 # 网格平均热度占最大热度的阈值 +GRID_DRAW_EVERY = 2 # 每 N 帧绘制一次网格箭头 + +# -------------------------- +# Graffiti 数据结构与方法(含懒衰减) +# -------------------------- +class GraffitiLane: + def __init__(self, frame_w, frame_h): + self.w = frame_w + self.h = frame_h + # heat、dir_x、dir_y 为整图数组(float32) + self.heat = np.zeros((self.h, self.w), dtype=np.float32) + self.dir_x = np.zeros((self.h, self.w), dtype=np.float32) + self.dir_y = np.zeros((self.h, self.w), dtype=np.float32) + # 记录上次全局衰减帧编号(用于懒衰减计算) + self.last_decay_frame = 0 + + def _decay_factor(self, frames): + """计算在 frames 帧内累计衰减因子""" + if frames <= 0: + return 1.0 + return (1.0 - HEAT_DECAY_PER_FRAME) ** frames + + def lazy_decay_roi(self, bbox, current_frame): + """ + 对给定 ROI(bbox)应用从 self.last_decay_frame 到 current_frame 的累计衰减。 + 这样我们不需要每帧对整图做衰减,只在需要写入/读取 ROI 时才把 pending 衰减应用到 ROI 上。 + bbox: (x, y, w, h) + """ + x, y, w, h = bbox + if w <= 0 or h <= 0: + return + frames = current_frame - self.last_decay_frame + if frames <= 0: + return + factor = self._decay_factor(frames) + # 只在 ROI 上乘以因子(就地修改) + self.heat[y:y+h, x:x+w] *= factor + self.dir_x[y:y+h, x:x+w] *= factor + self.dir_y[y:y+h, x:x+w] *= factor + + def full_decay(self, current_frame): + """ + 在整图上强制应用 pending 衰减(用于周期性清理,避免长期堆积误差)。 + """ + frames = current_frame - self.last_decay_frame + if frames <= 0: + return + factor = self._decay_factor(frames) + if factor == 1.0: + self.last_decay_frame = current_frame + return + # 对整图应用因子 + self.heat *= factor + self.dir_x *= factor + self.dir_y *= factor + self.last_decay_frame = current_frame + + # 以下方法均采用 ROI 操作以提高性能 + def apply_paint_roi(self, bbox, mask_roi, direction_vector, inc=INC_PER_PASS, current_frame=0): + # 先对 ROI 应用懒衰减(把 pending decay 应用在将要修改的区域上) + self.lazy_decay_roi(bbox, current_frame) + x, y, w, h = bbox + add = inc * mask_roi.astype(np.float32) + # 对 ROI 做写入(加热并累加方向) + self.heat[y:y+h, x:x+w] = np.minimum(MAX_HEAT, self.heat[y:y+h, x:x+w] + add) + dx, dy = direction_vector + self.dir_x[y:y+h, x:x+w] += dx * add + self.dir_y[y:y+h, x:x+w] += dy * add + + def remove_paint_roi(self, bbox, mask_roi, amount=DEC_PER_OPPOSITE, current_frame=0): + # 先对 ROI 应用懒衰减 + self.lazy_decay_roi(bbox, current_frame) + x, y, w, h = bbox + dec = amount * mask_roi.astype(np.float32) + roi_heat_before = self.heat[y:y+h, x:x+w].copy() + roi_heat_after = np.maximum(0.0, roi_heat_before - dec) + # 计算缩放比并更新方向累积 + eps = 1e-6 + ratio = np.ones_like(roi_heat_before) + nonzero_mask = roi_heat_before > eps + ratio[nonzero_mask] = roi_heat_after[nonzero_mask] / (roi_heat_before[nonzero_mask] + eps) + self.dir_x[y:y+h, x:x+w] *= ratio + self.dir_y[y:y+h, x:x+w] *= ratio + self.heat[y:y+h, x:x+w] = roi_heat_after + + def region_avg_direction_roi(self, bbox, mask_roi): + x, y, w, h = bbox + mask = mask_roi.astype(bool) + if mask.sum() == 0: + return None + sx = self.dir_x[y:y+h, x:x+w][mask].sum() + sy = self.dir_y[y:y+h, x:x+w][mask].sum() + vec = np.array([sx, sy], dtype=np.float32) + norm = np.linalg.norm(vec) + if norm == 0: + return None + return vec / norm + + def region_avg_heat_roi(self, bbox, mask_roi): + x, y, w, h = bbox + mask = mask_roi.astype(bool) + if mask.sum() == 0: + return 0.0 + return float(self.heat[y:y+h, x:x+w][mask].mean()) + +# -------------------------- +# 辅助函数(中文注释) +# -------------------------- + +def polygon_from_segment(center_prev, center_cur, veh_width_px): + """从两点构造矩形带(作为车辆经过的涂鸦段)""" + x0, y0 = center_prev + x1, y1 = center_cur + vx = x1 - x0 + vy = y1 - y0 + dist = math.hypot(vx, vy) + if dist == 0: + return None + ux, uy = vx / dist, vy / dist + px, py = -uy, ux + half_w = veh_width_px / 2.0 + p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w))) + p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w))) + p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w))) + p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w))) + return np.array([p0, p1, p2, p3], dtype=np.int32) + + +def polygon_mask_roi_from_pts(pts, img_shape): + """在多边形的最小包围矩形内生成局部掩码并返回 bbox, mask_roi""" + if pts is None or len(pts) == 0: + return (0,0,0,0), np.zeros((0,0), dtype=np.uint8) + x, y, w, h = cv2.boundingRect(pts) + x2 = min(x+w, img_shape[1]) + y2 = min(y+h, img_shape[0]) + w = x2 - x + h = y2 - y + if w <= 0 or h <= 0: + return (0,0,0,0), np.zeros((0,0), dtype=np.uint8) + pts_roi = pts.copy() + pts_roi[:,0] -= x + pts_roi[:,1] -= y + mask = np.zeros((h, w), dtype=np.uint8) + cv2.fillPoly(mask, [pts_roi], 1) + return (x, y, w, h), mask + + +def center_from_bbox(x1, y1, x2, y2): + return ((x1 + x2) / 2.0, (y1 + y2) / 2.0) + + +def normalize_vec(v): + v = np.array(v, dtype=np.float32) + norm = np.linalg.norm(v) + if norm == 0: + return np.array([0.0, 0.0], dtype=np.float32) + return v / norm + + +def cos_between(a, b): + an = normalize_vec(a) + bn = normalize_vec(b) + return float(np.dot(an, bn)) + + +def draw_arrow(img, start, vec, color=(0,255,0), thickness=2, tip_length=0.3): + """绘制箭头(保证最小像素长度以提高可见性)""" + sx, sy = int(round(start[0])), int(round(start[1])) + vx, vy = float(vec[0]), float(vec[1]) + norm = math.hypot(vx, vy) + if norm < 1e-3: + return + scale = 1.0 + if norm < ARROW_LEN_MIN: + scale = ARROW_LEN_MIN / (norm + 1e-6) + ex = int(round(sx + vx * scale)) + ey = int(round(sy + vy * scale)) + cv2.arrowedLine(img, (sx, sy), (ex, ey), color, thickness, tipLength=tip_length) + +# -------------------------- +# 主处理逻辑(含中文注释) +# -------------------------- + +def process_video_with_graffiti(video_path, model_path, out_path=None, show=True, device="0"): + """主流程:读取视频流 -> YOLO 跟踪 -> 处理每帧检测 -> 可视化并保存""" + model = YOLO(model_path) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + raise RuntimeError(f"无法打开视频:{video_path}") + fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 + w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + cap.release() + + graffiti = GraffitiLane(w, h) + + # 存储每个 track 的历史中心点与累计距离(用于抑制抖动与判断驶过) + track_hist = defaultdict(lambda: deque(maxlen=HISTORY_LEN)) + track_cum_dist = defaultdict(float) + track_last_frame = {} + frame_idx = 0 + + writer = None + if out_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h)) + + # 使用 stream=True 逐帧获取结果 + results_stream = model.track(source=video_path, tracker="botsort.yaml", device=device, stream=True) + + for res in results_stream: + frame_idx += 1 + frame = getattr(res, 'orig_img', None) + if frame is None: + frame = getattr(res, 'img', None) + if frame is None: + continue + + # 周期性对整图做完整衰减(清理 pending decay,避免值长时间不更新) + if frame_idx % FULL_DECAY_EVERY == 0: + graffiti.full_decay(frame_idx) + + boxes = getattr(res, "boxes", None) + vis = frame.copy() + + if boxes is not None: + for box in boxes: + # 解析 box,兼容不同 ultralytics 版本的返回格式 + try: + xyxy = getattr(box, "xyxy", None) + if xyxy is None: + xyxy = box.data if hasattr(box, "data") else None + if xyxy is None: + continue + x1, y1, x2, y2 = map(float, xyxy[0].tolist()) if hasattr(xyxy, "__len__") and hasattr(xyxy[0], "tolist") else xyxy + cls = int(getattr(box, "cls", -1)) + track_id = int(getattr(box, "id", -1)) + except Exception: + try: + x1, y1, x2, y2 = box + cls = -1 + track_id = -1 + except: + continue + + # 仅处理车辆类 + if cls not in VEHICLE_CLASSES and len(VEHICLE_CLASSES) > 0: + continue + + # 绘制车辆 bbox(黄色边框) + cv2.rectangle(vis, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,255), 2) + cx, cy = center_from_bbox(x1, y1, x2, y2) + + # 更新并抑制抖动 + prev_centers = track_hist[track_id] + if len(prev_centers) > 0: + last = prev_centers[-1] + move_dist = math.hypot(cx - last[0], cy - last[1]) + else: + move_dist = 0.0 + + if len(prev_centers) > 0: + track_cum_dist[track_id] += move_dist + else: + track_cum_dist[track_id] = 0.0 + + if move_dist >= MIN_MOVE_DIST: + prev_centers.append((cx, cy)) + else: + if len(prev_centers) == 0: + prev_centers.append((cx, cy)) + + # 当累计距离达到阈值时,构造一段涂鸦并更新 graffiti + if track_cum_dist[track_id] >= MIN_CUM_DIST and len(prev_centers) >= 2: + cp_prev = prev_centers[-2] + cp_cur = prev_centers[-1] + veh_width_px = max(8.0, x2 - x1) + poly = polygon_from_segment(cp_prev, cp_cur, veh_width_px) + if poly is None: + continue + + # 在多边形最小包围矩形生成局部掩码并返回 ROI + bbox_roi, mask_roi = polygon_mask_roi_from_pts(poly, frame.shape) + if mask_roi.size == 0: + continue + + #当前车辆方向(单位向量) + dir_vec = normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1])) + dir_vec_scaled = dir_vec.astype(np.float32) * 1.0 + + # 在 ROI 上查询区域信息(方法内部不做整图衰减,使用 lazy_decay) + region_heat = graffiti.region_avg_heat_roi(bbox_roi, mask_roi) + region_dir = graffiti.region_avg_direction_roi(bbox_roi, mask_roi) + + # 计算在 ROI 内与现有热区的重叠比例 + x, y, w_roi, h_roi = bbox_roi + overlap_pixels = np.logical_and(mask_roi > 0, graffiti.heat[y:y+h_roi, x:x+w_roi] > 0) + overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum()) + + # 根据规则判断是否为逆向 / 可疑 / 正常并更新热力 + if region_heat >= 1.0 and region_dir is not None: + c = cos_between(region_dir, dir_vec_scaled) + if c < OPPOSITE_ANGLE_COS_THRESH: + if region_heat >= TRUST_THRESHOLD: + if overlap_ratio >= MAJORITY_OVERLAP_THRESHOLD: + event = "REVERSE" + elif overlap_ratio >= PARTIAL_OVERLAP_THRESHOLD: + event = "CROSS_LINE" + else: + event = "SUSPECT" + graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * overlap_ratio, current_frame=frame_idx) + else: + event = "SUSPECT" + graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * 0.5, current_frame=frame_idx) + else: + event = None + graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx) + else: + event = None + graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx) + + # 重置累计距离以避免重复涂鸦 + track_cum_dist[track_id] = 0.0 + track_last_frame[track_id] = (frame_idx, event) + + # 立即绘制当前段的多边形边界与方向箭头(局部) + try: + cv2.polylines(vis, [poly], True, (0,255,0), 2) + poly_center = np.mean(poly, axis=0) + draw_arrow(vis, (poly_center[0], poly_center[1]), (dir_vec_scaled[0]*20, dir_vec_scaled[1]*20), color=(0,255,0), thickness=2) + except Exception: + pass + + # 始终绘制车辆当前方向向量(红色) + if len(prev_centers) >= 2: + p0 = prev_centers[-2] + p1 = prev_centers[-1] + v = (p1[0]-p0[0], p1[1]-p0[1]) + draw_arrow(vis, (cx, cy), (v[0], v[1]), color=(0,0,255), thickness=2) + + # 绘制车辆 id 文本 + cv2.putText(vis, f"id:{track_id}", (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1) + + # 可视化热力图:红色通道叠加(注意:热力在 ROI 更新时已应用懒衰减) + heat_norm = np.clip(graffiti.heat / MAX_HEAT, 0.0, 1.0) + heat_u8 = (heat_norm * 255).astype(np.uint8) + red_overlay = np.zeros_like(frame) + red_overlay[:, :, 2] = heat_u8 + alpha = 0.45 + vis = cv2.addWeighted(vis, 1.0, red_overlay, alpha, 0) + + # 在网格上绘制方向箭头以展示持久热区方向(降低频率以提升性能) + if frame_idx % GRID_DRAW_EVERY == 0: + gh, gw = graffiti.h, graffiti.w + for gy in range(0, gh, GRID_STEP): + for gx in range(0, gw, GRID_STEP): + x0 = gx + y0 = gy + x1 = min(gx + GRID_STEP, gw) + y1 = min(gy + GRID_STEP, gh) + win = graffiti.heat[y0:y1, x0:x1] + if win.size == 0: + continue + avg_heat = float(win.mean()) + if avg_heat / MAX_HEAT >= GRID_HEAT_THRESHOLD: + avg_dx = float(graffiti.dir_x[y0:y1, x0:x1].sum()) + avg_dy = float(graffiti.dir_y[y0:y1, x0:x1].sum()) + if avg_dx == 0 and avg_dy == 0: + continue + cell_center = (x0 + (x1-x0)/2.0, y0 + (y1-y0)/2.0) + draw_arrow(vis, cell_center, (avg_dx * 5.0, avg_dy * 5.0), color=(0,128,255), thickness=2) + + # 绘制轨迹与事件标签 + for tid, hist in track_hist.items(): + if len(hist) == 0: + continue + pts = np.array(hist, dtype=np.int32) + for i in range(1, len(pts)): + cv2.line(vis, tuple(pts[i-1]), tuple(pts[i]), (255, 255, 255), 1) + cx, cy = hist[-1] + cv2.circle(vis, (int(cx), int(cy)), 3, (255, 255, 255), -1) + if tid in track_last_frame and track_last_frame[tid] is not None: + last_frame_idx, event = track_last_frame[tid] + if event is not None and frame_idx - last_frame_idx <= 5: + txt = f"id:{tid} {event}" + cv2.putText(vis, txt, (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2) + + # 显示帧序号与调试信息 + cv2.putText(vis, f"Frame:{frame_idx}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200,200,200), 2) + cv2.putText(vis, f"HeatMax:{MAX_HEAT} TrustThr:{TRUST_THRESHOLD}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200,200,200), 1) + + if show: + cv2.imshow("graffiti_lane_vectors", vis) + key = cv2.waitKey(1) + if key == 27: + break + if writer is not None: + writer.write(vis) + + if writer is not None: + writer.release() + cv2.destroyAllWindows() + +# -------------------------- +# 运行入口(示例) +# -------------------------- +if __name__ == "__main__": + video_path = "./123.mp4" + model_path = "yolo11s.pt" + out_path = "out_graffiti_vectors_opt2.mp4" + process_video_with_graffiti(video_path, model_path, out_path=out_path, show=True, device="0")