Files
zimoyin cdf228fe56 feat(app): 集成loguru日志系统并优化错误处理
- 在app.py中引入loguru并配置日志轮转、异步输出等功能
- 添加全局日志初始化函数和程序启动/退出日志记录
- 将所有print语句替换为logger.info/error/debug/warning方法
- 在data_source.py中添加模型加载和视频打开的日志记录
- 在各个处理器中集成日志记录器实例并记录处理状态
- 修改处理器模块导入路径以符合相对导入规范
- 在requirements.txt中添加loguru依赖包
- 统一异常处理的日志记录方式,便于调试和监控
2026-01-10 18:03:18 +08:00

263 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import cv2
import math
from collections import deque, defaultdict
from ..base_processor import BaseProcessor
from ..pipeline_data import PipelineData
class GraffitiProcessor(BaseProcessor):
"""涂鸦计算处理器:计算车辆轨迹并更新车道涂鸦"""
def __init__(self, name: str = "涂鸦计算处理器"):
super().__init__(name)
# 可调参数
self.MAX_HEAT = 10.0
self.INC_PER_PASS = 1.0
self.DEC_PER_OPPOSITE = 1.0
self.MIN_MOVE_DIST = 6.0
self.MIN_CUM_DIST = 20.0
self.HISTORY_LEN = 16
self.HEAT_DECAY_PER_FRAME = 0.002
self.VEHICLE_CLASSES = {2, 3, 5, 7}
self.FULL_DECAY_EVERY = 30 # 每30帧进行一次全图衰减
# 初始化数据结构(在process中根据帧尺寸初始化)
self.graffiti = None
self.frame_shape = None
self.track_hist = defaultdict(lambda: deque(maxlen=self.HISTORY_LEN))
self.track_cum_dist = defaultdict(float)
self.frame_idx = 0
def init_graffiti(self, h, w):
"""初始化涂鸦数据结构"""
if self.graffiti is None or self.frame_shape != (h, w):
self.graffiti = GraffitiLane(w, h)
self.frame_shape = (h, w)
def process(self, data: PipelineData) -> PipelineData:
"""处理每一帧,更新涂鸦数据"""
if data.current_result is None or data.frame is None:
return data
frame = data.frame
h, w = frame.shape[:2]
self.init_graffiti(h, w)
self.frame_idx = data.frame_idx
# 周期性对整图做完整衰减
if self.frame_idx % self.FULL_DECAY_EVERY == 0:
self.graffiti.full_decay(self.frame_idx)
# 获取检测框
boxes = getattr(data.current_result, "boxes", None)
if boxes is None:
return data
# 处理每个检测框
for box in boxes:
try:
# 解析检测框
xyxy = getattr(box, "xyxy", None)
if xyxy is None:
continue
x1, y1, x2, y2 = map(float, xyxy[0].tolist())
cls = int(getattr(box, "cls", -1))
track_id = int(getattr(box, "id", -1))
# 仅处理车辆类
if cls not in self.VEHICLE_CLASSES:
continue
# 更新轨迹和历史
self._update_track(track_id, x1, y1, x2, y2)
# 如果累计距离达到阈值,更新涂鸦
if (self.track_cum_dist[track_id] >= self.MIN_CUM_DIST and
len(self.track_hist[track_id]) >= 2):
self._update_graffiti(track_id, x2 - x1)
self.track_cum_dist[track_id] = 0.0
except Exception as e:
continue
# 清理长时间不活跃的track
self._cleanup_old_tracks()
# 存储涂鸦数据到PipelineDatadeque转换为list以便序列化)
data.put_data("graffiti", "heat_map", self.graffiti.heat)
data.put_data("graffiti", "dir_x_map", self.graffiti.dir_x)
data.put_data("graffiti", "dir_y_map", self.graffiti.dir_y)
data.put_data("graffiti", "track_hist", {
track_id: list(hist) for track_id, hist in self.track_hist.items()
})
data.put_data("graffiti", "track_cum_dist", dict(self.track_cum_dist))
return data
def _update_track(self, track_id, x1, y1, x2, y2):
"""更新车辆轨迹信息"""
cx, cy = (x1 + x2) / 2.0, (y1 + y2) / 2.0
prev_centers = self.track_hist[track_id]
if len(prev_centers) > 0:
last_cx, last_cy = prev_centers[-1]
move_dist = math.hypot(cx - last_cx, cy - last_cy)
else:
move_dist = 0.0
if len(prev_centers) > 0:
self.track_cum_dist[track_id] += move_dist
else:
self.track_cum_dist[track_id] = 0.0
if move_dist >= self.MIN_MOVE_DIST:
prev_centers.append((cx, cy))
elif len(prev_centers) == 0:
prev_centers.append((cx, cy))
def _update_graffiti(self, track_id, vehicle_width):
"""根据车辆轨迹更新涂鸦"""
hist = self.track_hist[track_id]
if len(hist) < 2:
return
cp_prev = hist[-2]
cp_cur = hist[-1]
# 创建多边形掩码
poly = self._polygon_from_segment(cp_prev, cp_cur, max(8.0, vehicle_width))
if poly is None:
return
# 计算ROI掩码
bbox_roi, mask_roi = self._polygon_mask_roi_from_pts(poly, self.frame_shape)
if mask_roi.size == 0:
return
# 计算方向向量
dir_vec = self._normalize_vec((cp_cur[0] - cp_prev[0], cp_cur[1] - cp_prev[1]))
dir_vec_scaled = dir_vec.astype(np.float32) * 1.0
# 更新涂鸦
self.graffiti.apply_paint_roi(
bbox_roi, mask_roi, dir_vec_scaled,
inc=self.INC_PER_PASS,
current_frame=self.frame_idx
)
def _cleanup_old_tracks(self):
"""清理长时间不活跃的track,防止内存泄漏"""
# 可以根据需要实现,比如超过一定帧数没有更新就清理
# 这里先不实现,根据实际情况调整
pass
def _polygon_from_segment(self, center_prev, center_cur, veh_width_px):
"""从两点构造矩形带"""
x0, y0 = center_prev
x1, y1 = center_cur
vx = x1 - x0
vy = y1 - y0
dist = math.hypot(vx, vy)
if dist == 0:
return None
ux, uy = vx / dist, vy / dist
px, py = -uy, ux
half_w = veh_width_px / 2.0
p0 = (int(round(x0 + px * half_w)), int(round(y0 + py * half_w)))
p1 = (int(round(x0 - px * half_w)), int(round(y0 - py * half_w)))
p2 = (int(round(x1 - px * half_w)), int(round(y1 - py * half_w)))
p3 = (int(round(x1 + px * half_w)), int(round(y1 + py * half_w)))
return np.array([p0, p1, p2, p3], dtype=np.int32)
def _polygon_mask_roi_from_pts(self, pts, img_shape):
"""在多边形最小包围矩形内生成局部掩码"""
if pts is None or len(pts) == 0:
return (0, 0, 0, 0), np.zeros((0, 0), dtype=np.uint8)
x, y, w, h = cv2.boundingRect(pts)
x2 = min(x + w, img_shape[1])
y2 = min(y + h, img_shape[0])
w = x2 - x
h = y2 - y
if w <= 0 or h <= 0:
return (0, 0, 0, 0), np.zeros((0, 0), dtype=np.uint8)
pts_roi = pts.copy()
pts_roi[:, 0] -= x
pts_roi[:, 1] -= y
mask = np.zeros((h, w), dtype=np.uint8)
cv2.fillPoly(mask, [pts_roi], 1)
return (x, y, w, h), mask
def _normalize_vec(self, v):
"""归一化向量"""
v = np.array(v, dtype=np.float32)
norm = np.linalg.norm(v)
if norm == 0:
return np.array([0.0, 0.0], dtype=np.float32)
return v / norm
class GraffitiLane:
"""涂鸦车道数据结构"""
def __init__(self, frame_w, frame_h):
self.w = frame_w
self.h = frame_h
self.heat = np.zeros((self.h, self.w), dtype=np.float32)
self.dir_x = np.zeros((self.h, self.w), dtype=np.float32)
self.dir_y = np.zeros((self.h, self.w), dtype=np.float32)
self.last_decay_frame = 0
def _decay_factor(self, frames):
if frames <= 0:
return 1.0
return (1.0 - 0.002) ** frames # HEAT_DECAY_PER_FRAME = 0.002
def lazy_decay_roi(self, bbox, current_frame):
x, y, w, h = bbox
if w <= 0 or h <= 0:
return
frames = current_frame - self.last_decay_frame
if frames <= 0:
return
factor = self._decay_factor(frames)
self.heat[y:y + h, x:x + w] *= factor
self.dir_x[y:y + h, x:x + w] *= factor
self.dir_y[y:y + h, x:x + w] *= factor
def full_decay(self, current_frame):
"""在整图上强制应用pending衰减(用于周期性清理)"""
frames = current_frame - self.last_decay_frame
if frames <= 0:
return
factor = self._decay_factor(frames)
if factor == 1.0:
self.last_decay_frame = current_frame
return
# 对整图应用因子
self.heat *= factor
self.dir_x *= factor
self.dir_y *= factor
self.last_decay_frame = current_frame
def apply_paint_roi(self, bbox, mask_roi, direction_vector, inc=1.0, current_frame=0):
self.lazy_decay_roi(bbox, current_frame)
x, y, w, h = bbox
add = inc * mask_roi.astype(np.float32)
self.heat[y:y + h, x:x + w] = np.minimum(10.0, self.heat[y:y + h, x:x + w] + add) # MAX_HEAT = 10.0
dx, dy = direction_vector
self.dir_x[y:y + h, x:x + w] += dx * add
self.dir_y[y:y + h, x:x + w] += dy * add