feat(test): 添加车辆轨迹分析器用于逆向检测

- 实现基于YOLO跟踪的车辆轨迹分析系统
- 添加热力图和方向向量的地图涂鸦管理功能
- 集成车辆逆向、越线、可疑行为检测算法
- 实现ROI局部操作和懒衰减机制提升性能
- 添加网格采样方向箭头绘制优化显示效果
- 支持视频流处理和实时可视化输出
This commit is contained in:
zimoyin
2026-01-10 09:50:44 +08:00
parent a4e12b8d6d
commit 5f2e856ad7
+486
View File
@@ -0,0 +1,486 @@
"""
yolo_graffiti_lane_with_vectors.py
思路与架构(中文说明 + 简易架构图)
整体思路:
- 使用 YOLO 跟踪(Ultralytics 的 model.track(..., stream=True)),获取每帧检测与 track id。
- 每辆车的有效移动段(去抖后)会在画面上用矩形带(vehicle_width)在热力图上留下涂鸦(heat 0..MAX_HEAT),同时累积方向向量(dir_x/dir_y)。
- 判断逆向:当车辆经过已存在热区,计算区域平均方向与当前车辆方向的余弦相似度,结合热度与重叠比例判定 SUSPECT / CROSS_LINE / REVERSE。
- 为了性能,采用 ROI 局部操作和延迟(lazy)衰减:对涂鸦只在其最小包围矩形(ROI)内进行掩码计算与热力/方向更新,且热力的衰减通过“按帧懒处理”来减少整图操作。
架构(简化 ASCII 图):
+----------------------------+
| Video frames (stream) | <-- ultralytics model.track
+------------+---------------+
|
v
+------------+---------------+ +----------------+
| Detection & Tracking (YOLO)| -----> | per-track hist |
+------------+---------------+ +----------------+
| |
v v
+--------------------------------------+ update when
| Graffiti Manager (heat, dir maps) |<-- vehicle passes
| - lazy ROI decay |
| - apply/remove paint in ROI |
+--------------------------------------+
|
v
+--------------------------------------+
| Visualizer: |
| - draw bbox & track vectors |
| - draw polygon & local arrow |
| - draw grid-sampled persistent arrows (performance friendly)
+--------------------------------------+
性能关键点改进:
1. 避免每帧对整张 heat/dir 做逐像素衰减;改为:只有在需要更新的 ROI 上按累计帧数做懒衰减(lazy_decay),并且每 N 帧做一次整图清理(full decay)。
2. 对多边形掩码只在 ROIboundingRect)内执行,减少 fillPoly 的像素数。
3. 将热区方向的持久热箭头以网格采样绘制,且可设置每 N 帧绘制一次,减少绘制开销。
下面为优化后的代码,已全部改为中文注释并包含性能优化。
"""
import cv2
import numpy as np
from collections import deque, defaultdict
from ultralytics import YOLO
import math
# --------------------------
# 可调参数(按需调整)
# --------------------------
MAX_HEAT = 10.0 # 热力最大值
INC_PER_PASS = 1.0 # 每次经过热力增加量
DEC_PER_OPPOSITE = 1.0 # 逆向经过时减小热力的基准量
TRUST_THRESHOLD = 3.0 # 热力 >= 3 视为可信区域
PARTIAL_OVERLAP_THRESHOLD = 0.1
MAJORITY_OVERLAP_THRESHOLD = 0.5
MIN_MOVE_DIST = 6.0 # 抑制抖动:一帧移动小于此值不计入轨迹
MIN_CUM_DIST = 20.0 # 累计移动距离阈值,超过才认为驶过
HISTORY_LEN = 16 # 每个 track 保留历史点数
HEAT_DECAY_PER_FRAME = 0.002 # 每帧的线性衰减比例(小)
OPPOSITE_ANGLE_COS_THRESH = -0.9397 # 余弦 < -0.5(120°~180°) 认为明显相反方向
VEHICLE_CLASSES = {2, 3, 5, 7} # COCO 的车辆类别(car,motorcycle,bus,truck
ARROW_THICK = 2
ARROW_LEN_MIN = 10.0
# 懒衰减与整图清理参数
FULL_DECAY_EVERY = 30 # 每 N 帧对整图做一次完整衰减(把 pending decay 应用到全图)
# 网格采样用于在热力图上绘制持久方向箭头(性能友好)
GRID_STEP = 40
GRID_HEAT_THRESHOLD = 0.6 # 网格平均热度占最大热度的阈值
GRID_DRAW_EVERY = 2 # 每 N 帧绘制一次网格箭头
# --------------------------
# Graffiti 数据结构与方法(含懒衰减)
# --------------------------
class GraffitiLane:
def __init__(self, frame_w, frame_h):
self.w = frame_w
self.h = frame_h
# heat、dir_x、dir_y 为整图数组(float32
self.heat = np.zeros((self.h, self.w), dtype=np.float32)
self.dir_x = np.zeros((self.h, self.w), dtype=np.float32)
self.dir_y = np.zeros((self.h, self.w), dtype=np.float32)
# 记录上次全局衰减帧编号(用于懒衰减计算)
self.last_decay_frame = 0
def _decay_factor(self, frames):
"""计算在 frames 帧内累计衰减因子"""
if frames <= 0:
return 1.0
return (1.0 - HEAT_DECAY_PER_FRAME) ** frames
def lazy_decay_roi(self, bbox, current_frame):
"""
对给定 ROIbbox)应用从 self.last_decay_frame 到 current_frame 的累计衰减。
这样我们不需要每帧对整图做衰减,只在需要写入/读取 ROI 时才把 pending 衰减应用到 ROI 上。
bbox: (x, y, w, h)
"""
x, y, w, h = bbox
if w <= 0 or h <= 0:
return
frames = current_frame - self.last_decay_frame
if frames <= 0:
return
factor = self._decay_factor(frames)
# 只在 ROI 上乘以因子(就地修改)
self.heat[y:y+h, x:x+w] *= factor
self.dir_x[y:y+h, x:x+w] *= factor
self.dir_y[y:y+h, x:x+w] *= factor
def full_decay(self, current_frame):
"""
在整图上强制应用 pending 衰减(用于周期性清理,避免长期堆积误差)。
"""
frames = current_frame - self.last_decay_frame
if frames <= 0:
return
factor = self._decay_factor(frames)
if factor == 1.0:
self.last_decay_frame = current_frame
return
# 对整图应用因子
self.heat *= factor
self.dir_x *= factor
self.dir_y *= factor
self.last_decay_frame = current_frame
# 以下方法均采用 ROI 操作以提高性能
def apply_paint_roi(self, bbox, mask_roi, direction_vector, inc=INC_PER_PASS, current_frame=0):
# 先对 ROI 应用懒衰减(把 pending decay 应用在将要修改的区域上)
self.lazy_decay_roi(bbox, current_frame)
x, y, w, h = bbox
add = inc * mask_roi.astype(np.float32)
# 对 ROI 做写入(加热并累加方向)
self.heat[y:y+h, x:x+w] = np.minimum(MAX_HEAT, self.heat[y:y+h, x:x+w] + add)
dx, dy = direction_vector
self.dir_x[y:y+h, x:x+w] += dx * add
self.dir_y[y:y+h, x:x+w] += dy * add
def remove_paint_roi(self, bbox, mask_roi, amount=DEC_PER_OPPOSITE, current_frame=0):
# 先对 ROI 应用懒衰减
self.lazy_decay_roi(bbox, current_frame)
x, y, w, h = bbox
dec = amount * mask_roi.astype(np.float32)
roi_heat_before = self.heat[y:y+h, x:x+w].copy()
roi_heat_after = np.maximum(0.0, roi_heat_before - dec)
# 计算缩放比并更新方向累积
eps = 1e-6
ratio = np.ones_like(roi_heat_before)
nonzero_mask = roi_heat_before > eps
ratio[nonzero_mask] = roi_heat_after[nonzero_mask] / (roi_heat_before[nonzero_mask] + eps)
self.dir_x[y:y+h, x:x+w] *= ratio
self.dir_y[y:y+h, x:x+w] *= ratio
self.heat[y:y+h, x:x+w] = roi_heat_after
def region_avg_direction_roi(self, bbox, mask_roi):
x, y, w, h = bbox
mask = mask_roi.astype(bool)
if mask.sum() == 0:
return None
sx = self.dir_x[y:y+h, x:x+w][mask].sum()
sy = self.dir_y[y:y+h, x:x+w][mask].sum()
vec = np.array([sx, sy], dtype=np.float32)
norm = np.linalg.norm(vec)
if norm == 0:
return None
return vec / norm
def region_avg_heat_roi(self, bbox, mask_roi):
x, y, w, h = bbox
mask = mask_roi.astype(bool)
if mask.sum() == 0:
return 0.0
return float(self.heat[y:y+h, x:x+w][mask].mean())
# --------------------------
# 辅助函数(中文注释)
# --------------------------
def polygon_from_segment(center_prev, center_cur, veh_width_px):
"""从两点构造矩形带(作为车辆经过的涂鸦段)"""
x0, y0 = center_prev
x1, y1 = center_cur
vx = x1 - x0
vy = y1 - y0
dist = math.hypot(vx, vy)
if dist == 0:
return None
ux, uy = vx / dist, vy / dist
px, py = -uy, ux
half_w = veh_width_px / 2.0
p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w)))
p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w)))
p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w)))
p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w)))
return np.array([p0, p1, p2, p3], dtype=np.int32)
def polygon_mask_roi_from_pts(pts, img_shape):
"""在多边形的最小包围矩形内生成局部掩码并返回 bbox, mask_roi"""
if pts is None or len(pts) == 0:
return (0,0,0,0), np.zeros((0,0), dtype=np.uint8)
x, y, w, h = cv2.boundingRect(pts)
x2 = min(x+w, img_shape[1])
y2 = min(y+h, img_shape[0])
w = x2 - x
h = y2 - y
if w <= 0 or h <= 0:
return (0,0,0,0), np.zeros((0,0), dtype=np.uint8)
pts_roi = pts.copy()
pts_roi[:,0] -= x
pts_roi[:,1] -= y
mask = np.zeros((h, w), dtype=np.uint8)
cv2.fillPoly(mask, [pts_roi], 1)
return (x, y, w, h), mask
def center_from_bbox(x1, y1, x2, y2):
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
def normalize_vec(v):
v = np.array(v, dtype=np.float32)
norm = np.linalg.norm(v)
if norm == 0:
return np.array([0.0, 0.0], dtype=np.float32)
return v / norm
def cos_between(a, b):
an = normalize_vec(a)
bn = normalize_vec(b)
return float(np.dot(an, bn))
def draw_arrow(img, start, vec, color=(0,255,0), thickness=2, tip_length=0.3):
"""绘制箭头(保证最小像素长度以提高可见性)"""
sx, sy = int(round(start[0])), int(round(start[1]))
vx, vy = float(vec[0]), float(vec[1])
norm = math.hypot(vx, vy)
if norm < 1e-3:
return
scale = 1.0
if norm < ARROW_LEN_MIN:
scale = ARROW_LEN_MIN / (norm + 1e-6)
ex = int(round(sx + vx * scale))
ey = int(round(sy + vy * scale))
cv2.arrowedLine(img, (sx, sy), (ex, ey), color, thickness, tipLength=tip_length)
# --------------------------
# 主处理逻辑(含中文注释)
# --------------------------
def process_video_with_graffiti(video_path, model_path, out_path=None, show=True, device="0"):
"""主流程:读取视频流 -> YOLO 跟踪 -> 处理每帧检测 -> 可视化并保存"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"无法打开视频:{video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
graffiti = GraffitiLane(w, h)
# 存储每个 track 的历史中心点与累计距离(用于抑制抖动与判断驶过)
track_hist = defaultdict(lambda: deque(maxlen=HISTORY_LEN))
track_cum_dist = defaultdict(float)
track_last_frame = {}
frame_idx = 0
writer = None
if out_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
# 使用 stream=True 逐帧获取结果
results_stream = model.track(source=video_path, tracker="botsort.yaml", device=device, stream=True)
for res in results_stream:
frame_idx += 1
frame = getattr(res, 'orig_img', None)
if frame is None:
frame = getattr(res, 'img', None)
if frame is None:
continue
# 周期性对整图做完整衰减(清理 pending decay,避免值长时间不更新)
if frame_idx % FULL_DECAY_EVERY == 0:
graffiti.full_decay(frame_idx)
boxes = getattr(res, "boxes", None)
vis = frame.copy()
if boxes is not None:
for box in boxes:
# 解析 box,兼容不同 ultralytics 版本的返回格式
try:
xyxy = getattr(box, "xyxy", None)
if xyxy is None:
xyxy = box.data if hasattr(box, "data") else None
if xyxy is None:
continue
x1, y1, x2, y2 = map(float, xyxy[0].tolist()) if hasattr(xyxy, "__len__") and hasattr(xyxy[0], "tolist") else xyxy
cls = int(getattr(box, "cls", -1))
track_id = int(getattr(box, "id", -1))
except Exception:
try:
x1, y1, x2, y2 = box
cls = -1
track_id = -1
except:
continue
# 仅处理车辆类
if cls not in VEHICLE_CLASSES and len(VEHICLE_CLASSES) > 0:
continue
# 绘制车辆 bbox(黄色边框)
cv2.rectangle(vis, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,255), 2)
cx, cy = center_from_bbox(x1, y1, x2, y2)
# 更新并抑制抖动
prev_centers = track_hist[track_id]
if len(prev_centers) > 0:
last = prev_centers[-1]
move_dist = math.hypot(cx - last[0], cy - last[1])
else:
move_dist = 0.0
if len(prev_centers) > 0:
track_cum_dist[track_id] += move_dist
else:
track_cum_dist[track_id] = 0.0
if move_dist >= MIN_MOVE_DIST:
prev_centers.append((cx, cy))
else:
if len(prev_centers) == 0:
prev_centers.append((cx, cy))
# 当累计距离达到阈值时,构造一段涂鸦并更新 graffiti
if track_cum_dist[track_id] >= MIN_CUM_DIST and len(prev_centers) >= 2:
cp_prev = prev_centers[-2]
cp_cur = prev_centers[-1]
veh_width_px = max(8.0, x2 - x1)
poly = polygon_from_segment(cp_prev, cp_cur, veh_width_px)
if poly is None:
continue
# 在多边形最小包围矩形生成局部掩码并返回 ROI
bbox_roi, mask_roi = polygon_mask_roi_from_pts(poly, frame.shape)
if mask_roi.size == 0:
continue
#当前车辆方向(单位向量)
dir_vec = normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1]))
dir_vec_scaled = dir_vec.astype(np.float32) * 1.0
# 在 ROI 上查询区域信息(方法内部不做整图衰减,使用 lazy_decay)
region_heat = graffiti.region_avg_heat_roi(bbox_roi, mask_roi)
region_dir = graffiti.region_avg_direction_roi(bbox_roi, mask_roi)
# 计算在 ROI 内与现有热区的重叠比例
x, y, w_roi, h_roi = bbox_roi
overlap_pixels = np.logical_and(mask_roi > 0, graffiti.heat[y:y+h_roi, x:x+w_roi] > 0)
overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum())
# 根据规则判断是否为逆向 / 可疑 / 正常并更新热力
if region_heat >= 1.0 and region_dir is not None:
c = cos_between(region_dir, dir_vec_scaled)
if c < OPPOSITE_ANGLE_COS_THRESH:
if region_heat >= TRUST_THRESHOLD:
if overlap_ratio >= MAJORITY_OVERLAP_THRESHOLD:
event = "REVERSE"
elif overlap_ratio >= PARTIAL_OVERLAP_THRESHOLD:
event = "CROSS_LINE"
else:
event = "SUSPECT"
graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * overlap_ratio, current_frame=frame_idx)
else:
event = "SUSPECT"
graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * 0.5, current_frame=frame_idx)
else:
event = None
graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx)
else:
event = None
graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx)
# 重置累计距离以避免重复涂鸦
track_cum_dist[track_id] = 0.0
track_last_frame[track_id] = (frame_idx, event)
# 立即绘制当前段的多边形边界与方向箭头(局部)
try:
cv2.polylines(vis, [poly], True, (0,255,0), 2)
poly_center = np.mean(poly, axis=0)
draw_arrow(vis, (poly_center[0], poly_center[1]), (dir_vec_scaled[0]*20, dir_vec_scaled[1]*20), color=(0,255,0), thickness=2)
except Exception:
pass
# 始终绘制车辆当前方向向量(红色)
if len(prev_centers) >= 2:
p0 = prev_centers[-2]
p1 = prev_centers[-1]
v = (p1[0]-p0[0], p1[1]-p0[1])
draw_arrow(vis, (cx, cy), (v[0], v[1]), color=(0,0,255), thickness=2)
# 绘制车辆 id 文本
cv2.putText(vis, f"id:{track_id}", (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
# 可视化热力图:红色通道叠加(注意:热力在 ROI 更新时已应用懒衰减)
heat_norm = np.clip(graffiti.heat / MAX_HEAT, 0.0, 1.0)
heat_u8 = (heat_norm * 255).astype(np.uint8)
red_overlay = np.zeros_like(frame)
red_overlay[:, :, 2] = heat_u8
alpha = 0.45
vis = cv2.addWeighted(vis, 1.0, red_overlay, alpha, 0)
# 在网格上绘制方向箭头以展示持久热区方向(降低频率以提升性能)
if frame_idx % GRID_DRAW_EVERY == 0:
gh, gw = graffiti.h, graffiti.w
for gy in range(0, gh, GRID_STEP):
for gx in range(0, gw, GRID_STEP):
x0 = gx
y0 = gy
x1 = min(gx + GRID_STEP, gw)
y1 = min(gy + GRID_STEP, gh)
win = graffiti.heat[y0:y1, x0:x1]
if win.size == 0:
continue
avg_heat = float(win.mean())
if avg_heat / MAX_HEAT >= GRID_HEAT_THRESHOLD:
avg_dx = float(graffiti.dir_x[y0:y1, x0:x1].sum())
avg_dy = float(graffiti.dir_y[y0:y1, x0:x1].sum())
if avg_dx == 0 and avg_dy == 0:
continue
cell_center = (x0 + (x1-x0)/2.0, y0 + (y1-y0)/2.0)
draw_arrow(vis, cell_center, (avg_dx * 5.0, avg_dy * 5.0), color=(0,128,255), thickness=2)
# 绘制轨迹与事件标签
for tid, hist in track_hist.items():
if len(hist) == 0:
continue
pts = np.array(hist, dtype=np.int32)
for i in range(1, len(pts)):
cv2.line(vis, tuple(pts[i-1]), tuple(pts[i]), (255, 255, 255), 1)
cx, cy = hist[-1]
cv2.circle(vis, (int(cx), int(cy)), 3, (255, 255, 255), -1)
if tid in track_last_frame and track_last_frame[tid] is not None:
last_frame_idx, event = track_last_frame[tid]
if event is not None and frame_idx - last_frame_idx <= 5:
txt = f"id:{tid} {event}"
cv2.putText(vis, txt, (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2)
# 显示帧序号与调试信息
cv2.putText(vis, f"Frame:{frame_idx}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200,200,200), 2)
cv2.putText(vis, f"HeatMax:{MAX_HEAT} TrustThr:{TRUST_THRESHOLD}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200,200,200), 1)
if show:
cv2.imshow("graffiti_lane_vectors", vis)
key = cv2.waitKey(1)
if key == 27:
break
if writer is not None:
writer.write(vis)
if writer is not None:
writer.release()
cv2.destroyAllWindows()
# --------------------------
# 运行入口(示例)
# --------------------------
if __name__ == "__main__":
video_path = "./123.mp4"
model_path = "yolo11s.pt"
out_path = "out_graffiti_vectors_opt2.mp4"
process_video_with_graffiti(video_path, model_path, out_path=out_path, show=True, device="0")