Files
HighwayEventDet/test_vehicle_trajectory_analyzer.py
zimoyin 5f2e856ad7 feat(test): 添加车辆轨迹分析器用于逆向检测
- 实现基于YOLO跟踪的车辆轨迹分析系统
- 添加热力图和方向向量的地图涂鸦管理功能
- 集成车辆逆向、越线、可疑行为检测算法
- 实现ROI局部操作和懒衰减机制提升性能
- 添加网格采样方向箭头绘制优化显示效果
- 支持视频流处理和实时可视化输出
2026-01-10 09:50:44 +08:00

487 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
yolo_graffiti_lane_with_vectors.py
思路与架构(中文说明 + 简易架构图)
整体思路:
- 使用 YOLO 跟踪(Ultralytics 的 model.track(..., stream=True)),获取每帧检测与 track id。
- 每辆车的有效移动段(去抖后)会在画面上用矩形带(vehicle_width)在热力图上留下涂鸦(heat 0..MAX_HEAT),同时累积方向向量(dir_x/dir_y)。
- 判断逆向:当车辆经过已存在热区,计算区域平均方向与当前车辆方向的余弦相似度,结合热度与重叠比例判定 SUSPECT / CROSS_LINE / REVERSE。
- 为了性能,采用 ROI 局部操作和延迟(lazy)衰减:对涂鸦只在其最小包围矩形(ROI)内进行掩码计算与热力/方向更新,且热力的衰减通过“按帧懒处理”来减少整图操作。
架构(简化 ASCII 图):
+----------------------------+
| Video frames (stream) | <-- ultralytics model.track
+------------+---------------+
|
v
+------------+---------------+ +----------------+
| Detection & Tracking (YOLO)| -----> | per-track hist |
+------------+---------------+ +----------------+
| |
v v
+--------------------------------------+ update when
| Graffiti Manager (heat, dir maps) |<-- vehicle passes
| - lazy ROI decay |
| - apply/remove paint in ROI |
+--------------------------------------+
|
v
+--------------------------------------+
| Visualizer: |
| - draw bbox & track vectors |
| - draw polygon & local arrow |
| - draw grid-sampled persistent arrows (performance friendly)
+--------------------------------------+
性能关键点改进:
1. 避免每帧对整张 heat/dir 做逐像素衰减;改为:只有在需要更新的 ROI 上按累计帧数做懒衰减(lazy_decay),并且每 N 帧做一次整图清理(full decay)。
2. 对多边形掩码只在 ROIboundingRect)内执行,减少 fillPoly 的像素数。
3. 将热区方向的持久热箭头以网格采样绘制,且可设置每 N 帧绘制一次,减少绘制开销。
下面为优化后的代码,已全部改为中文注释并包含性能优化。
"""
import cv2
import numpy as np
from collections import deque, defaultdict
from ultralytics import YOLO
import math
# --------------------------
# 可调参数(按需调整)
# --------------------------
MAX_HEAT = 10.0 # 热力最大值
INC_PER_PASS = 1.0 # 每次经过热力增加量
DEC_PER_OPPOSITE = 1.0 # 逆向经过时减小热力的基准量
TRUST_THRESHOLD = 3.0 # 热力 >= 3 视为可信区域
PARTIAL_OVERLAP_THRESHOLD = 0.1
MAJORITY_OVERLAP_THRESHOLD = 0.5
MIN_MOVE_DIST = 6.0 # 抑制抖动:一帧移动小于此值不计入轨迹
MIN_CUM_DIST = 20.0 # 累计移动距离阈值,超过才认为驶过
HISTORY_LEN = 16 # 每个 track 保留历史点数
HEAT_DECAY_PER_FRAME = 0.002 # 每帧的线性衰减比例(小)
OPPOSITE_ANGLE_COS_THRESH = -0.9397 # 余弦 < -0.5(120°~180°) 认为明显相反方向
VEHICLE_CLASSES = {2, 3, 5, 7} # COCO 的车辆类别(car,motorcycle,bus,truck
ARROW_THICK = 2
ARROW_LEN_MIN = 10.0
# 懒衰减与整图清理参数
FULL_DECAY_EVERY = 30 # 每 N 帧对整图做一次完整衰减(把 pending decay 应用到全图)
# 网格采样用于在热力图上绘制持久方向箭头(性能友好)
GRID_STEP = 40
GRID_HEAT_THRESHOLD = 0.6 # 网格平均热度占最大热度的阈值
GRID_DRAW_EVERY = 2 # 每 N 帧绘制一次网格箭头
# --------------------------
# Graffiti 数据结构与方法(含懒衰减)
# --------------------------
class GraffitiLane:
def __init__(self, frame_w, frame_h):
self.w = frame_w
self.h = frame_h
# heat、dir_x、dir_y 为整图数组(float32
self.heat = np.zeros((self.h, self.w), dtype=np.float32)
self.dir_x = np.zeros((self.h, self.w), dtype=np.float32)
self.dir_y = np.zeros((self.h, self.w), dtype=np.float32)
# 记录上次全局衰减帧编号(用于懒衰减计算)
self.last_decay_frame = 0
def _decay_factor(self, frames):
"""计算在 frames 帧内累计衰减因子"""
if frames <= 0:
return 1.0
return (1.0 - HEAT_DECAY_PER_FRAME) ** frames
def lazy_decay_roi(self, bbox, current_frame):
"""
对给定 ROIbbox)应用从 self.last_decay_frame 到 current_frame 的累计衰减。
这样我们不需要每帧对整图做衰减,只在需要写入/读取 ROI 时才把 pending 衰减应用到 ROI 上。
bbox: (x, y, w, h)
"""
x, y, w, h = bbox
if w <= 0 or h <= 0:
return
frames = current_frame - self.last_decay_frame
if frames <= 0:
return
factor = self._decay_factor(frames)
# 只在 ROI 上乘以因子(就地修改)
self.heat[y:y+h, x:x+w] *= factor
self.dir_x[y:y+h, x:x+w] *= factor
self.dir_y[y:y+h, x:x+w] *= factor
def full_decay(self, current_frame):
"""
在整图上强制应用 pending 衰减(用于周期性清理,避免长期堆积误差)。
"""
frames = current_frame - self.last_decay_frame
if frames <= 0:
return
factor = self._decay_factor(frames)
if factor == 1.0:
self.last_decay_frame = current_frame
return
# 对整图应用因子
self.heat *= factor
self.dir_x *= factor
self.dir_y *= factor
self.last_decay_frame = current_frame
# 以下方法均采用 ROI 操作以提高性能
def apply_paint_roi(self, bbox, mask_roi, direction_vector, inc=INC_PER_PASS, current_frame=0):
# 先对 ROI 应用懒衰减(把 pending decay 应用在将要修改的区域上)
self.lazy_decay_roi(bbox, current_frame)
x, y, w, h = bbox
add = inc * mask_roi.astype(np.float32)
# 对 ROI 做写入(加热并累加方向)
self.heat[y:y+h, x:x+w] = np.minimum(MAX_HEAT, self.heat[y:y+h, x:x+w] + add)
dx, dy = direction_vector
self.dir_x[y:y+h, x:x+w] += dx * add
self.dir_y[y:y+h, x:x+w] += dy * add
def remove_paint_roi(self, bbox, mask_roi, amount=DEC_PER_OPPOSITE, current_frame=0):
# 先对 ROI 应用懒衰减
self.lazy_decay_roi(bbox, current_frame)
x, y, w, h = bbox
dec = amount * mask_roi.astype(np.float32)
roi_heat_before = self.heat[y:y+h, x:x+w].copy()
roi_heat_after = np.maximum(0.0, roi_heat_before - dec)
# 计算缩放比并更新方向累积
eps = 1e-6
ratio = np.ones_like(roi_heat_before)
nonzero_mask = roi_heat_before > eps
ratio[nonzero_mask] = roi_heat_after[nonzero_mask] / (roi_heat_before[nonzero_mask] + eps)
self.dir_x[y:y+h, x:x+w] *= ratio
self.dir_y[y:y+h, x:x+w] *= ratio
self.heat[y:y+h, x:x+w] = roi_heat_after
def region_avg_direction_roi(self, bbox, mask_roi):
x, y, w, h = bbox
mask = mask_roi.astype(bool)
if mask.sum() == 0:
return None
sx = self.dir_x[y:y+h, x:x+w][mask].sum()
sy = self.dir_y[y:y+h, x:x+w][mask].sum()
vec = np.array([sx, sy], dtype=np.float32)
norm = np.linalg.norm(vec)
if norm == 0:
return None
return vec / norm
def region_avg_heat_roi(self, bbox, mask_roi):
x, y, w, h = bbox
mask = mask_roi.astype(bool)
if mask.sum() == 0:
return 0.0
return float(self.heat[y:y+h, x:x+w][mask].mean())
# --------------------------
# 辅助函数(中文注释)
# --------------------------
def polygon_from_segment(center_prev, center_cur, veh_width_px):
"""从两点构造矩形带(作为车辆经过的涂鸦段)"""
x0, y0 = center_prev
x1, y1 = center_cur
vx = x1 - x0
vy = y1 - y0
dist = math.hypot(vx, vy)
if dist == 0:
return None
ux, uy = vx / dist, vy / dist
px, py = -uy, ux
half_w = veh_width_px / 2.0
p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w)))
p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w)))
p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w)))
p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w)))
return np.array([p0, p1, p2, p3], dtype=np.int32)
def polygon_mask_roi_from_pts(pts, img_shape):
"""在多边形的最小包围矩形内生成局部掩码并返回 bbox, mask_roi"""
if pts is None or len(pts) == 0:
return (0,0,0,0), np.zeros((0,0), dtype=np.uint8)
x, y, w, h = cv2.boundingRect(pts)
x2 = min(x+w, img_shape[1])
y2 = min(y+h, img_shape[0])
w = x2 - x
h = y2 - y
if w <= 0 or h <= 0:
return (0,0,0,0), np.zeros((0,0), dtype=np.uint8)
pts_roi = pts.copy()
pts_roi[:,0] -= x
pts_roi[:,1] -= y
mask = np.zeros((h, w), dtype=np.uint8)
cv2.fillPoly(mask, [pts_roi], 1)
return (x, y, w, h), mask
def center_from_bbox(x1, y1, x2, y2):
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
def normalize_vec(v):
v = np.array(v, dtype=np.float32)
norm = np.linalg.norm(v)
if norm == 0:
return np.array([0.0, 0.0], dtype=np.float32)
return v / norm
def cos_between(a, b):
an = normalize_vec(a)
bn = normalize_vec(b)
return float(np.dot(an, bn))
def draw_arrow(img, start, vec, color=(0,255,0), thickness=2, tip_length=0.3):
"""绘制箭头(保证最小像素长度以提高可见性)"""
sx, sy = int(round(start[0])), int(round(start[1]))
vx, vy = float(vec[0]), float(vec[1])
norm = math.hypot(vx, vy)
if norm < 1e-3:
return
scale = 1.0
if norm < ARROW_LEN_MIN:
scale = ARROW_LEN_MIN / (norm + 1e-6)
ex = int(round(sx + vx * scale))
ey = int(round(sy + vy * scale))
cv2.arrowedLine(img, (sx, sy), (ex, ey), color, thickness, tipLength=tip_length)
# --------------------------
# 主处理逻辑(含中文注释)
# --------------------------
def process_video_with_graffiti(video_path, model_path, out_path=None, show=True, device="0"):
"""主流程:读取视频流 -> YOLO 跟踪 -> 处理每帧检测 -> 可视化并保存"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"无法打开视频:{video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
graffiti = GraffitiLane(w, h)
# 存储每个 track 的历史中心点与累计距离(用于抑制抖动与判断驶过)
track_hist = defaultdict(lambda: deque(maxlen=HISTORY_LEN))
track_cum_dist = defaultdict(float)
track_last_frame = {}
frame_idx = 0
writer = None
if out_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
# 使用 stream=True 逐帧获取结果
results_stream = model.track(source=video_path, tracker="botsort.yaml", device=device, stream=True)
for res in results_stream:
frame_idx += 1
frame = getattr(res, 'orig_img', None)
if frame is None:
frame = getattr(res, 'img', None)
if frame is None:
continue
# 周期性对整图做完整衰减(清理 pending decay,避免值长时间不更新)
if frame_idx % FULL_DECAY_EVERY == 0:
graffiti.full_decay(frame_idx)
boxes = getattr(res, "boxes", None)
vis = frame.copy()
if boxes is not None:
for box in boxes:
# 解析 box,兼容不同 ultralytics 版本的返回格式
try:
xyxy = getattr(box, "xyxy", None)
if xyxy is None:
xyxy = box.data if hasattr(box, "data") else None
if xyxy is None:
continue
x1, y1, x2, y2 = map(float, xyxy[0].tolist()) if hasattr(xyxy, "__len__") and hasattr(xyxy[0], "tolist") else xyxy
cls = int(getattr(box, "cls", -1))
track_id = int(getattr(box, "id", -1))
except Exception:
try:
x1, y1, x2, y2 = box
cls = -1
track_id = -1
except:
continue
# 仅处理车辆类
if cls not in VEHICLE_CLASSES and len(VEHICLE_CLASSES) > 0:
continue
# 绘制车辆 bbox(黄色边框)
cv2.rectangle(vis, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,255), 2)
cx, cy = center_from_bbox(x1, y1, x2, y2)
# 更新并抑制抖动
prev_centers = track_hist[track_id]
if len(prev_centers) > 0:
last = prev_centers[-1]
move_dist = math.hypot(cx - last[0], cy - last[1])
else:
move_dist = 0.0
if len(prev_centers) > 0:
track_cum_dist[track_id] += move_dist
else:
track_cum_dist[track_id] = 0.0
if move_dist >= MIN_MOVE_DIST:
prev_centers.append((cx, cy))
else:
if len(prev_centers) == 0:
prev_centers.append((cx, cy))
# 当累计距离达到阈值时,构造一段涂鸦并更新 graffiti
if track_cum_dist[track_id] >= MIN_CUM_DIST and len(prev_centers) >= 2:
cp_prev = prev_centers[-2]
cp_cur = prev_centers[-1]
veh_width_px = max(8.0, x2 - x1)
poly = polygon_from_segment(cp_prev, cp_cur, veh_width_px)
if poly is None:
continue
# 在多边形最小包围矩形生成局部掩码并返回 ROI
bbox_roi, mask_roi = polygon_mask_roi_from_pts(poly, frame.shape)
if mask_roi.size == 0:
continue
#当前车辆方向(单位向量)
dir_vec = normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1]))
dir_vec_scaled = dir_vec.astype(np.float32) * 1.0
# 在 ROI 上查询区域信息(方法内部不做整图衰减,使用 lazy_decay)
region_heat = graffiti.region_avg_heat_roi(bbox_roi, mask_roi)
region_dir = graffiti.region_avg_direction_roi(bbox_roi, mask_roi)
# 计算在 ROI 内与现有热区的重叠比例
x, y, w_roi, h_roi = bbox_roi
overlap_pixels = np.logical_and(mask_roi > 0, graffiti.heat[y:y+h_roi, x:x+w_roi] > 0)
overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum())
# 根据规则判断是否为逆向 / 可疑 / 正常并更新热力
if region_heat >= 1.0 and region_dir is not None:
c = cos_between(region_dir, dir_vec_scaled)
if c < OPPOSITE_ANGLE_COS_THRESH:
if region_heat >= TRUST_THRESHOLD:
if overlap_ratio >= MAJORITY_OVERLAP_THRESHOLD:
event = "REVERSE"
elif overlap_ratio >= PARTIAL_OVERLAP_THRESHOLD:
event = "CROSS_LINE"
else:
event = "SUSPECT"
graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * overlap_ratio, current_frame=frame_idx)
else:
event = "SUSPECT"
graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * 0.5, current_frame=frame_idx)
else:
event = None
graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx)
else:
event = None
graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx)
# 重置累计距离以避免重复涂鸦
track_cum_dist[track_id] = 0.0
track_last_frame[track_id] = (frame_idx, event)
# 立即绘制当前段的多边形边界与方向箭头(局部)
try:
cv2.polylines(vis, [poly], True, (0,255,0), 2)
poly_center = np.mean(poly, axis=0)
draw_arrow(vis, (poly_center[0], poly_center[1]), (dir_vec_scaled[0]*20, dir_vec_scaled[1]*20), color=(0,255,0), thickness=2)
except Exception:
pass
# 始终绘制车辆当前方向向量(红色)
if len(prev_centers) >= 2:
p0 = prev_centers[-2]
p1 = prev_centers[-1]
v = (p1[0]-p0[0], p1[1]-p0[1])
draw_arrow(vis, (cx, cy), (v[0], v[1]), color=(0,0,255), thickness=2)
# 绘制车辆 id 文本
cv2.putText(vis, f"id:{track_id}", (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
# 可视化热力图:红色通道叠加(注意:热力在 ROI 更新时已应用懒衰减)
heat_norm = np.clip(graffiti.heat / MAX_HEAT, 0.0, 1.0)
heat_u8 = (heat_norm * 255).astype(np.uint8)
red_overlay = np.zeros_like(frame)
red_overlay[:, :, 2] = heat_u8
alpha = 0.45
vis = cv2.addWeighted(vis, 1.0, red_overlay, alpha, 0)
# 在网格上绘制方向箭头以展示持久热区方向(降低频率以提升性能)
if frame_idx % GRID_DRAW_EVERY == 0:
gh, gw = graffiti.h, graffiti.w
for gy in range(0, gh, GRID_STEP):
for gx in range(0, gw, GRID_STEP):
x0 = gx
y0 = gy
x1 = min(gx + GRID_STEP, gw)
y1 = min(gy + GRID_STEP, gh)
win = graffiti.heat[y0:y1, x0:x1]
if win.size == 0:
continue
avg_heat = float(win.mean())
if avg_heat / MAX_HEAT >= GRID_HEAT_THRESHOLD:
avg_dx = float(graffiti.dir_x[y0:y1, x0:x1].sum())
avg_dy = float(graffiti.dir_y[y0:y1, x0:x1].sum())
if avg_dx == 0 and avg_dy == 0:
continue
cell_center = (x0 + (x1-x0)/2.0, y0 + (y1-y0)/2.0)
draw_arrow(vis, cell_center, (avg_dx * 5.0, avg_dy * 5.0), color=(0,128,255), thickness=2)
# 绘制轨迹与事件标签
for tid, hist in track_hist.items():
if len(hist) == 0:
continue
pts = np.array(hist, dtype=np.int32)
for i in range(1, len(pts)):
cv2.line(vis, tuple(pts[i-1]), tuple(pts[i]), (255, 255, 255), 1)
cx, cy = hist[-1]
cv2.circle(vis, (int(cx), int(cy)), 3, (255, 255, 255), -1)
if tid in track_last_frame and track_last_frame[tid] is not None:
last_frame_idx, event = track_last_frame[tid]
if event is not None and frame_idx - last_frame_idx <= 5:
txt = f"id:{tid} {event}"
cv2.putText(vis, txt, (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2)
# 显示帧序号与调试信息
cv2.putText(vis, f"Frame:{frame_idx}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200,200,200), 2)
cv2.putText(vis, f"HeatMax:{MAX_HEAT} TrustThr:{TRUST_THRESHOLD}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200,200,200), 1)
if show:
cv2.imshow("graffiti_lane_vectors", vis)
key = cv2.waitKey(1)
if key == 27:
break
if writer is not None:
writer.write(vis)
if writer is not None:
writer.release()
cv2.destroyAllWindows()
# --------------------------
# 运行入口(示例)
# --------------------------
if __name__ == "__main__":
video_path = "./123.mp4"
model_path = "yolo11s.pt"
out_path = "out_graffiti_vectors_opt2.mp4"
process_video_with_graffiti(video_path, model_path, out_path=out_path, show=True, device="0")