5f2e856ad7
- 实现基于YOLO跟踪的车辆轨迹分析系统 - 添加热力图和方向向量的地图涂鸦管理功能 - 集成车辆逆向、越线、可疑行为检测算法 - 实现ROI局部操作和懒衰减机制提升性能 - 添加网格采样方向箭头绘制优化显示效果 - 支持视频流处理和实时可视化输出
487 lines
21 KiB
Python
487 lines
21 KiB
Python
"""
|
||
yolo_graffiti_lane_with_vectors.py
|
||
|
||
思路与架构(中文说明 + 简易架构图)
|
||
|
||
整体思路:
|
||
- 使用 YOLO 跟踪(Ultralytics 的 model.track(..., stream=True)),获取每帧检测与 track id。
|
||
- 每辆车的有效移动段(去抖后)会在画面上用矩形带(vehicle_width)在热力图上留下涂鸦(heat 0..MAX_HEAT),同时累积方向向量(dir_x/dir_y)。
|
||
- 判断逆向:当车辆经过已存在热区,计算区域平均方向与当前车辆方向的余弦相似度,结合热度与重叠比例判定 SUSPECT / CROSS_LINE / REVERSE。
|
||
- 为了性能,采用 ROI 局部操作和延迟(lazy)衰减:对涂鸦只在其最小包围矩形(ROI)内进行掩码计算与热力/方向更新,且热力的衰减通过“按帧懒处理”来减少整图操作。
|
||
|
||
架构(简化 ASCII 图):
|
||
|
||
+----------------------------+
|
||
| Video frames (stream) | <-- ultralytics model.track
|
||
+------------+---------------+
|
||
|
|
||
v
|
||
+------------+---------------+ +----------------+
|
||
| Detection & Tracking (YOLO)| -----> | per-track hist |
|
||
+------------+---------------+ +----------------+
|
||
| |
|
||
v v
|
||
+--------------------------------------+ update when
|
||
| Graffiti Manager (heat, dir maps) |<-- vehicle passes
|
||
| - lazy ROI decay |
|
||
| - apply/remove paint in ROI |
|
||
+--------------------------------------+
|
||
|
|
||
v
|
||
+--------------------------------------+
|
||
| Visualizer: |
|
||
| - draw bbox & track vectors |
|
||
| - draw polygon & local arrow |
|
||
| - draw grid-sampled persistent arrows (performance friendly)
|
||
+--------------------------------------+
|
||
|
||
性能关键点改进:
|
||
1. 避免每帧对整张 heat/dir 做逐像素衰减;改为:只有在需要更新的 ROI 上按累计帧数做懒衰减(lazy_decay),并且每 N 帧做一次整图清理(full decay)。
|
||
2. 对多边形掩码只在 ROI(boundingRect)内执行,减少 fillPoly 的像素数。
|
||
3. 将热区方向的持久热箭头以网格采样绘制,且可设置每 N 帧绘制一次,减少绘制开销。
|
||
|
||
下面为优化后的代码,已全部改为中文注释并包含性能优化。
|
||
"""
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from collections import deque, defaultdict
|
||
from ultralytics import YOLO
|
||
import math
|
||
|
||
# --------------------------
|
||
# 可调参数(按需调整)
|
||
# --------------------------
|
||
MAX_HEAT = 10.0 # 热力最大值
|
||
INC_PER_PASS = 1.0 # 每次经过热力增加量
|
||
DEC_PER_OPPOSITE = 1.0 # 逆向经过时减小热力的基准量
|
||
TRUST_THRESHOLD = 3.0 # 热力 >= 3 视为可信区域
|
||
PARTIAL_OVERLAP_THRESHOLD = 0.1
|
||
MAJORITY_OVERLAP_THRESHOLD = 0.5
|
||
MIN_MOVE_DIST = 6.0 # 抑制抖动:一帧移动小于此值不计入轨迹
|
||
MIN_CUM_DIST = 20.0 # 累计移动距离阈值,超过才认为驶过
|
||
HISTORY_LEN = 16 # 每个 track 保留历史点数
|
||
HEAT_DECAY_PER_FRAME = 0.002 # 每帧的线性衰减比例(小)
|
||
OPPOSITE_ANGLE_COS_THRESH = -0.9397 # 余弦 < -0.5(120°~180°) 认为明显相反方向
|
||
VEHICLE_CLASSES = {2, 3, 5, 7} # COCO 的车辆类别(car,motorcycle,bus,truck)
|
||
ARROW_THICK = 2
|
||
ARROW_LEN_MIN = 10.0
|
||
|
||
# 懒衰减与整图清理参数
|
||
FULL_DECAY_EVERY = 30 # 每 N 帧对整图做一次完整衰减(把 pending decay 应用到全图)
|
||
# 网格采样用于在热力图上绘制持久方向箭头(性能友好)
|
||
GRID_STEP = 40
|
||
GRID_HEAT_THRESHOLD = 0.6 # 网格平均热度占最大热度的阈值
|
||
GRID_DRAW_EVERY = 2 # 每 N 帧绘制一次网格箭头
|
||
|
||
# --------------------------
|
||
# Graffiti 数据结构与方法(含懒衰减)
|
||
# --------------------------
|
||
class GraffitiLane:
|
||
def __init__(self, frame_w, frame_h):
|
||
self.w = frame_w
|
||
self.h = frame_h
|
||
# heat、dir_x、dir_y 为整图数组(float32)
|
||
self.heat = np.zeros((self.h, self.w), dtype=np.float32)
|
||
self.dir_x = np.zeros((self.h, self.w), dtype=np.float32)
|
||
self.dir_y = np.zeros((self.h, self.w), dtype=np.float32)
|
||
# 记录上次全局衰减帧编号(用于懒衰减计算)
|
||
self.last_decay_frame = 0
|
||
|
||
def _decay_factor(self, frames):
|
||
"""计算在 frames 帧内累计衰减因子"""
|
||
if frames <= 0:
|
||
return 1.0
|
||
return (1.0 - HEAT_DECAY_PER_FRAME) ** frames
|
||
|
||
def lazy_decay_roi(self, bbox, current_frame):
|
||
"""
|
||
对给定 ROI(bbox)应用从 self.last_decay_frame 到 current_frame 的累计衰减。
|
||
这样我们不需要每帧对整图做衰减,只在需要写入/读取 ROI 时才把 pending 衰减应用到 ROI 上。
|
||
bbox: (x, y, w, h)
|
||
"""
|
||
x, y, w, h = bbox
|
||
if w <= 0 or h <= 0:
|
||
return
|
||
frames = current_frame - self.last_decay_frame
|
||
if frames <= 0:
|
||
return
|
||
factor = self._decay_factor(frames)
|
||
# 只在 ROI 上乘以因子(就地修改)
|
||
self.heat[y:y+h, x:x+w] *= factor
|
||
self.dir_x[y:y+h, x:x+w] *= factor
|
||
self.dir_y[y:y+h, x:x+w] *= factor
|
||
|
||
def full_decay(self, current_frame):
|
||
"""
|
||
在整图上强制应用 pending 衰减(用于周期性清理,避免长期堆积误差)。
|
||
"""
|
||
frames = current_frame - self.last_decay_frame
|
||
if frames <= 0:
|
||
return
|
||
factor = self._decay_factor(frames)
|
||
if factor == 1.0:
|
||
self.last_decay_frame = current_frame
|
||
return
|
||
# 对整图应用因子
|
||
self.heat *= factor
|
||
self.dir_x *= factor
|
||
self.dir_y *= factor
|
||
self.last_decay_frame = current_frame
|
||
|
||
# 以下方法均采用 ROI 操作以提高性能
|
||
def apply_paint_roi(self, bbox, mask_roi, direction_vector, inc=INC_PER_PASS, current_frame=0):
|
||
# 先对 ROI 应用懒衰减(把 pending decay 应用在将要修改的区域上)
|
||
self.lazy_decay_roi(bbox, current_frame)
|
||
x, y, w, h = bbox
|
||
add = inc * mask_roi.astype(np.float32)
|
||
# 对 ROI 做写入(加热并累加方向)
|
||
self.heat[y:y+h, x:x+w] = np.minimum(MAX_HEAT, self.heat[y:y+h, x:x+w] + add)
|
||
dx, dy = direction_vector
|
||
self.dir_x[y:y+h, x:x+w] += dx * add
|
||
self.dir_y[y:y+h, x:x+w] += dy * add
|
||
|
||
def remove_paint_roi(self, bbox, mask_roi, amount=DEC_PER_OPPOSITE, current_frame=0):
|
||
# 先对 ROI 应用懒衰减
|
||
self.lazy_decay_roi(bbox, current_frame)
|
||
x, y, w, h = bbox
|
||
dec = amount * mask_roi.astype(np.float32)
|
||
roi_heat_before = self.heat[y:y+h, x:x+w].copy()
|
||
roi_heat_after = np.maximum(0.0, roi_heat_before - dec)
|
||
# 计算缩放比并更新方向累积
|
||
eps = 1e-6
|
||
ratio = np.ones_like(roi_heat_before)
|
||
nonzero_mask = roi_heat_before > eps
|
||
ratio[nonzero_mask] = roi_heat_after[nonzero_mask] / (roi_heat_before[nonzero_mask] + eps)
|
||
self.dir_x[y:y+h, x:x+w] *= ratio
|
||
self.dir_y[y:y+h, x:x+w] *= ratio
|
||
self.heat[y:y+h, x:x+w] = roi_heat_after
|
||
|
||
def region_avg_direction_roi(self, bbox, mask_roi):
|
||
x, y, w, h = bbox
|
||
mask = mask_roi.astype(bool)
|
||
if mask.sum() == 0:
|
||
return None
|
||
sx = self.dir_x[y:y+h, x:x+w][mask].sum()
|
||
sy = self.dir_y[y:y+h, x:x+w][mask].sum()
|
||
vec = np.array([sx, sy], dtype=np.float32)
|
||
norm = np.linalg.norm(vec)
|
||
if norm == 0:
|
||
return None
|
||
return vec / norm
|
||
|
||
def region_avg_heat_roi(self, bbox, mask_roi):
|
||
x, y, w, h = bbox
|
||
mask = mask_roi.astype(bool)
|
||
if mask.sum() == 0:
|
||
return 0.0
|
||
return float(self.heat[y:y+h, x:x+w][mask].mean())
|
||
|
||
# --------------------------
|
||
# 辅助函数(中文注释)
|
||
# --------------------------
|
||
|
||
def polygon_from_segment(center_prev, center_cur, veh_width_px):
|
||
"""从两点构造矩形带(作为车辆经过的涂鸦段)"""
|
||
x0, y0 = center_prev
|
||
x1, y1 = center_cur
|
||
vx = x1 - x0
|
||
vy = y1 - y0
|
||
dist = math.hypot(vx, vy)
|
||
if dist == 0:
|
||
return None
|
||
ux, uy = vx / dist, vy / dist
|
||
px, py = -uy, ux
|
||
half_w = veh_width_px / 2.0
|
||
p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w)))
|
||
p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w)))
|
||
p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w)))
|
||
p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w)))
|
||
return np.array([p0, p1, p2, p3], dtype=np.int32)
|
||
|
||
|
||
def polygon_mask_roi_from_pts(pts, img_shape):
|
||
"""在多边形的最小包围矩形内生成局部掩码并返回 bbox, mask_roi"""
|
||
if pts is None or len(pts) == 0:
|
||
return (0,0,0,0), np.zeros((0,0), dtype=np.uint8)
|
||
x, y, w, h = cv2.boundingRect(pts)
|
||
x2 = min(x+w, img_shape[1])
|
||
y2 = min(y+h, img_shape[0])
|
||
w = x2 - x
|
||
h = y2 - y
|
||
if w <= 0 or h <= 0:
|
||
return (0,0,0,0), np.zeros((0,0), dtype=np.uint8)
|
||
pts_roi = pts.copy()
|
||
pts_roi[:,0] -= x
|
||
pts_roi[:,1] -= y
|
||
mask = np.zeros((h, w), dtype=np.uint8)
|
||
cv2.fillPoly(mask, [pts_roi], 1)
|
||
return (x, y, w, h), mask
|
||
|
||
|
||
def center_from_bbox(x1, y1, x2, y2):
|
||
return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
|
||
|
||
|
||
def normalize_vec(v):
|
||
v = np.array(v, dtype=np.float32)
|
||
norm = np.linalg.norm(v)
|
||
if norm == 0:
|
||
return np.array([0.0, 0.0], dtype=np.float32)
|
||
return v / norm
|
||
|
||
|
||
def cos_between(a, b):
|
||
an = normalize_vec(a)
|
||
bn = normalize_vec(b)
|
||
return float(np.dot(an, bn))
|
||
|
||
|
||
def draw_arrow(img, start, vec, color=(0,255,0), thickness=2, tip_length=0.3):
|
||
"""绘制箭头(保证最小像素长度以提高可见性)"""
|
||
sx, sy = int(round(start[0])), int(round(start[1]))
|
||
vx, vy = float(vec[0]), float(vec[1])
|
||
norm = math.hypot(vx, vy)
|
||
if norm < 1e-3:
|
||
return
|
||
scale = 1.0
|
||
if norm < ARROW_LEN_MIN:
|
||
scale = ARROW_LEN_MIN / (norm + 1e-6)
|
||
ex = int(round(sx + vx * scale))
|
||
ey = int(round(sy + vy * scale))
|
||
cv2.arrowedLine(img, (sx, sy), (ex, ey), color, thickness, tipLength=tip_length)
|
||
|
||
# --------------------------
|
||
# 主处理逻辑(含中文注释)
|
||
# --------------------------
|
||
|
||
def process_video_with_graffiti(video_path, model_path, out_path=None, show=True, device="0"):
|
||
"""主流程:读取视频流 -> YOLO 跟踪 -> 处理每帧检测 -> 可视化并保存"""
|
||
model = YOLO(model_path)
|
||
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
raise RuntimeError(f"无法打开视频:{video_path}")
|
||
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
||
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
cap.release()
|
||
|
||
graffiti = GraffitiLane(w, h)
|
||
|
||
# 存储每个 track 的历史中心点与累计距离(用于抑制抖动与判断驶过)
|
||
track_hist = defaultdict(lambda: deque(maxlen=HISTORY_LEN))
|
||
track_cum_dist = defaultdict(float)
|
||
track_last_frame = {}
|
||
frame_idx = 0
|
||
|
||
writer = None
|
||
if out_path:
|
||
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||
writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
|
||
|
||
# 使用 stream=True 逐帧获取结果
|
||
results_stream = model.track(source=video_path, tracker="botsort.yaml", device=device, stream=True)
|
||
|
||
for res in results_stream:
|
||
frame_idx += 1
|
||
frame = getattr(res, 'orig_img', None)
|
||
if frame is None:
|
||
frame = getattr(res, 'img', None)
|
||
if frame is None:
|
||
continue
|
||
|
||
# 周期性对整图做完整衰减(清理 pending decay,避免值长时间不更新)
|
||
if frame_idx % FULL_DECAY_EVERY == 0:
|
||
graffiti.full_decay(frame_idx)
|
||
|
||
boxes = getattr(res, "boxes", None)
|
||
vis = frame.copy()
|
||
|
||
if boxes is not None:
|
||
for box in boxes:
|
||
# 解析 box,兼容不同 ultralytics 版本的返回格式
|
||
try:
|
||
xyxy = getattr(box, "xyxy", None)
|
||
if xyxy is None:
|
||
xyxy = box.data if hasattr(box, "data") else None
|
||
if xyxy is None:
|
||
continue
|
||
x1, y1, x2, y2 = map(float, xyxy[0].tolist()) if hasattr(xyxy, "__len__") and hasattr(xyxy[0], "tolist") else xyxy
|
||
cls = int(getattr(box, "cls", -1))
|
||
track_id = int(getattr(box, "id", -1))
|
||
except Exception:
|
||
try:
|
||
x1, y1, x2, y2 = box
|
||
cls = -1
|
||
track_id = -1
|
||
except:
|
||
continue
|
||
|
||
# 仅处理车辆类
|
||
if cls not in VEHICLE_CLASSES and len(VEHICLE_CLASSES) > 0:
|
||
continue
|
||
|
||
# 绘制车辆 bbox(黄色边框)
|
||
cv2.rectangle(vis, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,255), 2)
|
||
cx, cy = center_from_bbox(x1, y1, x2, y2)
|
||
|
||
# 更新并抑制抖动
|
||
prev_centers = track_hist[track_id]
|
||
if len(prev_centers) > 0:
|
||
last = prev_centers[-1]
|
||
move_dist = math.hypot(cx - last[0], cy - last[1])
|
||
else:
|
||
move_dist = 0.0
|
||
|
||
if len(prev_centers) > 0:
|
||
track_cum_dist[track_id] += move_dist
|
||
else:
|
||
track_cum_dist[track_id] = 0.0
|
||
|
||
if move_dist >= MIN_MOVE_DIST:
|
||
prev_centers.append((cx, cy))
|
||
else:
|
||
if len(prev_centers) == 0:
|
||
prev_centers.append((cx, cy))
|
||
|
||
# 当累计距离达到阈值时,构造一段涂鸦并更新 graffiti
|
||
if track_cum_dist[track_id] >= MIN_CUM_DIST and len(prev_centers) >= 2:
|
||
cp_prev = prev_centers[-2]
|
||
cp_cur = prev_centers[-1]
|
||
veh_width_px = max(8.0, x2 - x1)
|
||
poly = polygon_from_segment(cp_prev, cp_cur, veh_width_px)
|
||
if poly is None:
|
||
continue
|
||
|
||
# 在多边形最小包围矩形生成局部掩码并返回 ROI
|
||
bbox_roi, mask_roi = polygon_mask_roi_from_pts(poly, frame.shape)
|
||
if mask_roi.size == 0:
|
||
continue
|
||
|
||
#当前车辆方向(单位向量)
|
||
dir_vec = normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1]))
|
||
dir_vec_scaled = dir_vec.astype(np.float32) * 1.0
|
||
|
||
# 在 ROI 上查询区域信息(方法内部不做整图衰减,使用 lazy_decay)
|
||
region_heat = graffiti.region_avg_heat_roi(bbox_roi, mask_roi)
|
||
region_dir = graffiti.region_avg_direction_roi(bbox_roi, mask_roi)
|
||
|
||
# 计算在 ROI 内与现有热区的重叠比例
|
||
x, y, w_roi, h_roi = bbox_roi
|
||
overlap_pixels = np.logical_and(mask_roi > 0, graffiti.heat[y:y+h_roi, x:x+w_roi] > 0)
|
||
overlap_ratio = float(overlap_pixels.sum()) / max(1.0, mask_roi.sum())
|
||
|
||
# 根据规则判断是否为逆向 / 可疑 / 正常并更新热力
|
||
if region_heat >= 1.0 and region_dir is not None:
|
||
c = cos_between(region_dir, dir_vec_scaled)
|
||
if c < OPPOSITE_ANGLE_COS_THRESH:
|
||
if region_heat >= TRUST_THRESHOLD:
|
||
if overlap_ratio >= MAJORITY_OVERLAP_THRESHOLD:
|
||
event = "REVERSE"
|
||
elif overlap_ratio >= PARTIAL_OVERLAP_THRESHOLD:
|
||
event = "CROSS_LINE"
|
||
else:
|
||
event = "SUSPECT"
|
||
graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * overlap_ratio, current_frame=frame_idx)
|
||
else:
|
||
event = "SUSPECT"
|
||
graffiti.remove_paint_roi(bbox_roi, mask_roi, amount=DEC_PER_OPPOSITE * 0.5, current_frame=frame_idx)
|
||
else:
|
||
event = None
|
||
graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx)
|
||
else:
|
||
event = None
|
||
graffiti.apply_paint_roi(bbox_roi, mask_roi, dir_vec_scaled, inc=INC_PER_PASS, current_frame=frame_idx)
|
||
|
||
# 重置累计距离以避免重复涂鸦
|
||
track_cum_dist[track_id] = 0.0
|
||
track_last_frame[track_id] = (frame_idx, event)
|
||
|
||
# 立即绘制当前段的多边形边界与方向箭头(局部)
|
||
try:
|
||
cv2.polylines(vis, [poly], True, (0,255,0), 2)
|
||
poly_center = np.mean(poly, axis=0)
|
||
draw_arrow(vis, (poly_center[0], poly_center[1]), (dir_vec_scaled[0]*20, dir_vec_scaled[1]*20), color=(0,255,0), thickness=2)
|
||
except Exception:
|
||
pass
|
||
|
||
# 始终绘制车辆当前方向向量(红色)
|
||
if len(prev_centers) >= 2:
|
||
p0 = prev_centers[-2]
|
||
p1 = prev_centers[-1]
|
||
v = (p1[0]-p0[0], p1[1]-p0[1])
|
||
draw_arrow(vis, (cx, cy), (v[0], v[1]), color=(0,0,255), thickness=2)
|
||
|
||
# 绘制车辆 id 文本
|
||
cv2.putText(vis, f"id:{track_id}", (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
|
||
|
||
# 可视化热力图:红色通道叠加(注意:热力在 ROI 更新时已应用懒衰减)
|
||
heat_norm = np.clip(graffiti.heat / MAX_HEAT, 0.0, 1.0)
|
||
heat_u8 = (heat_norm * 255).astype(np.uint8)
|
||
red_overlay = np.zeros_like(frame)
|
||
red_overlay[:, :, 2] = heat_u8
|
||
alpha = 0.45
|
||
vis = cv2.addWeighted(vis, 1.0, red_overlay, alpha, 0)
|
||
|
||
# 在网格上绘制方向箭头以展示持久热区方向(降低频率以提升性能)
|
||
if frame_idx % GRID_DRAW_EVERY == 0:
|
||
gh, gw = graffiti.h, graffiti.w
|
||
for gy in range(0, gh, GRID_STEP):
|
||
for gx in range(0, gw, GRID_STEP):
|
||
x0 = gx
|
||
y0 = gy
|
||
x1 = min(gx + GRID_STEP, gw)
|
||
y1 = min(gy + GRID_STEP, gh)
|
||
win = graffiti.heat[y0:y1, x0:x1]
|
||
if win.size == 0:
|
||
continue
|
||
avg_heat = float(win.mean())
|
||
if avg_heat / MAX_HEAT >= GRID_HEAT_THRESHOLD:
|
||
avg_dx = float(graffiti.dir_x[y0:y1, x0:x1].sum())
|
||
avg_dy = float(graffiti.dir_y[y0:y1, x0:x1].sum())
|
||
if avg_dx == 0 and avg_dy == 0:
|
||
continue
|
||
cell_center = (x0 + (x1-x0)/2.0, y0 + (y1-y0)/2.0)
|
||
draw_arrow(vis, cell_center, (avg_dx * 5.0, avg_dy * 5.0), color=(0,128,255), thickness=2)
|
||
|
||
# 绘制轨迹与事件标签
|
||
for tid, hist in track_hist.items():
|
||
if len(hist) == 0:
|
||
continue
|
||
pts = np.array(hist, dtype=np.int32)
|
||
for i in range(1, len(pts)):
|
||
cv2.line(vis, tuple(pts[i-1]), tuple(pts[i]), (255, 255, 255), 1)
|
||
cx, cy = hist[-1]
|
||
cv2.circle(vis, (int(cx), int(cy)), 3, (255, 255, 255), -1)
|
||
if tid in track_last_frame and track_last_frame[tid] is not None:
|
||
last_frame_idx, event = track_last_frame[tid]
|
||
if event is not None and frame_idx - last_frame_idx <= 5:
|
||
txt = f"id:{tid} {event}"
|
||
cv2.putText(vis, txt, (int(cx)+6, int(cy)-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2)
|
||
|
||
# 显示帧序号与调试信息
|
||
cv2.putText(vis, f"Frame:{frame_idx}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200,200,200), 2)
|
||
cv2.putText(vis, f"HeatMax:{MAX_HEAT} TrustThr:{TRUST_THRESHOLD}", (10, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200,200,200), 1)
|
||
|
||
if show:
|
||
cv2.imshow("graffiti_lane_vectors", vis)
|
||
key = cv2.waitKey(1)
|
||
if key == 27:
|
||
break
|
||
if writer is not None:
|
||
writer.write(vis)
|
||
|
||
if writer is not None:
|
||
writer.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
# --------------------------
|
||
# 运行入口(示例)
|
||
# --------------------------
|
||
if __name__ == "__main__":
|
||
video_path = "./123.mp4"
|
||
model_path = "yolo11s.pt"
|
||
out_path = "out_graffiti_vectors_opt2.mp4"
|
||
process_video_with_graffiti(video_path, model_path, out_path=out_path, show=True, device="0")
|