About Multi-camera viewer optimized for RTSP streams
1import threading
2import time
3import math
4from dataclasses import dataclass
5import importlib
6import os
7from pathlib import Path
8import shutil
9import subprocess
10import sys
11from typing import Any
12
13import cv2
14from PyQt6.QtCore import QThread, pyqtSignal
15
16
17KNOWN_YOLO_WEIGHTS = {
18 "yolo11n.pt",
19 "yolo11s.pt",
20 "yolo11m.pt",
21 "yolo11l.pt",
22 "yolo11x.pt",
23 "yolov8n.pt",
24 "yolov8s.pt",
25 "yolov8m.pt",
26 "yolov8l.pt",
27 "yolov8x.pt",
28}
29
30
31DEFAULT_DETECTION_CONFIG = {
32 "model": "yolo11n.pt",
33 "imgsz": 640,
34 "confidence": 0.4,
35 "device": "auto",
36 "analysis_fps_per_camera": 3.0,
37 "stable_frames": 2,
38 "cooldown_seconds": 180,
39 "event_suppress_seconds": 30,
40 "event_clip_seconds": 30,
41 "pre_event_seconds": 8,
42 "post_event_seconds": 20,
43 "motion_required_classes": [
44 "bicycle",
45 "car",
46 "motorcycle",
47 "bus",
48 "truck",
49 ],
50 "motion_min_pixels": 12.0,
51 "classes": [
52 "person",
53 "bicycle",
54 "car",
55 "motorcycle",
56 "bus",
57 "truck",
58 "bird",
59 "cat",
60 "dog",
61 "horse",
62 "sheep",
63 "cow",
64 ],
65}
66
67
68class ModelLoadError(RuntimeError):
69 pass
70
71
72class DetectionDependencyError(RuntimeError):
73 pass
74
75
76_DEPENDENCY_LOCK = threading.Lock()
77_DEPENDENCY_PATH_ADDED = False
78
79
80ULTRALYTICS_PACKAGE = "ultralytics>=8.3.0"
81
82
83DETECTION_SUPPORT_PACKAGES = [
84 "pillow>=10.0.0",
85 "pyyaml>=6.0.0",
86 "scipy>=1.10.0",
87 "psutil>=5.9.0",
88 "matplotlib>=3.7.0",
89 "polars>=0.20.0",
90 "ultralytics-thop>=2.0.0",
91]
92
93
94def _app_data_dir() -> Path:
95 if sys.platform == "win32":
96 base = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA")
97 return Path(base).expanduser() / "WildCam" if base else Path.home() / "AppData" / "Local" / "WildCam"
98 if sys.platform == "darwin":
99 return Path.home() / "Library" / "Application Support" / "WildCam"
100 base = os.environ.get("XDG_DATA_HOME")
101 return Path(base).expanduser() / "wildcam" if base else Path.home() / ".local" / "share" / "wildcam"
102
103
104def detection_runtime_dir() -> Path:
105 version = f"py{sys.version_info.major}.{sys.version_info.minor}"
106 return _app_data_dir() / "detection-runtime" / version
107
108
109def _add_detection_runtime_to_path() -> None:
110 global _DEPENDENCY_PATH_ADDED
111 runtime_dir = detection_runtime_dir()
112 if _DEPENDENCY_PATH_ADDED and str(runtime_dir) in sys.path:
113 return
114 if runtime_dir.exists():
115 sys.path.insert(0, str(runtime_dir))
116 _DEPENDENCY_PATH_ADDED = True
117
118
119def _has_detection_dependencies() -> bool:
120 _add_detection_runtime_to_path()
121 return all(
122 importlib.util.find_spec(module_name) is not None
123 for module_name in ("ultralytics", "torch", "torchvision")
124 )
125
126
127def _python_version(command: list[str]) -> tuple[int, int] | None:
128 try:
129 result = subprocess.run(
130 [*command, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')"],
131 check=True,
132 capture_output=True,
133 text=True,
134 timeout=10,
135 )
136 except Exception:
137 return None
138 try:
139 major, minor = result.stdout.strip().split(".", 1)
140 return int(major), int(minor)
141 except Exception:
142 return None
143
144
145def _find_install_python() -> list[str]:
146 if not getattr(sys, "frozen", False):
147 return [sys.executable]
148
149 wanted = (sys.version_info.major, sys.version_info.minor)
150 candidates = [
151 [f"python{wanted[0]}.{wanted[1]}"],
152 ["python3"],
153 ["python"],
154 ]
155 if sys.platform == "win32":
156 candidates = [["py", f"-{wanted[0]}.{wanted[1]}"], ["py", "-3"], *candidates]
157
158 for command in candidates:
159 version = _python_version(command)
160 if version == wanted:
161 return command
162
163 needed = f"{wanted[0]}.{wanted[1]}"
164 raise DetectionDependencyError(
165 "Objekterkennung benötigt Python "
166 f"{needed} mit pip, um die optionalen ML-Pakete beim ersten Start zu installieren."
167 )
168
169
170def _run_pip(command: list[str]) -> None:
171 try:
172 subprocess.run(command, check=True)
173 except subprocess.CalledProcessError as exc:
174 raise DetectionDependencyError(
175 "Installation der optionalen Objekterkennungs-Pakete fehlgeschlagen. "
176 "Bitte Internetverbindung und pip prüfen."
177 ) from exc
178
179
180def ensure_detection_dependencies() -> None:
181 if _has_detection_dependencies():
182 return
183
184 with _DEPENDENCY_LOCK:
185 if _has_detection_dependencies():
186 return
187
188 runtime_dir = detection_runtime_dir()
189 runtime_dir.mkdir(parents=True, exist_ok=True)
190 python = _find_install_python()
191 base_cmd = [*python, "-m", "pip", "install", "--upgrade", "--target", str(runtime_dir)]
192
193 if sys.platform.startswith("linux"):
194 _run_pip(
195 [
196 *base_cmd,
197 "--index-url",
198 "https://download.pytorch.org/whl/cpu",
199 "torch>=2.2.0",
200 "torchvision>=0.17.0",
201 ]
202 )
203 else:
204 _run_pip([*base_cmd, "torch>=2.2.0", "torchvision>=0.17.0"])
205
206 _run_pip([*base_cmd, *DETECTION_SUPPORT_PACKAGES])
207 _run_pip([*base_cmd, "--no-deps", ULTRALYTICS_PACKAGE])
208 importlib.invalidate_caches()
209 _add_detection_runtime_to_path()
210
211 if not _has_detection_dependencies():
212 raise DetectionDependencyError(
213 f"Optionale Objekterkennungs-Pakete wurden installiert, aber nicht vollständig gefunden: {runtime_dir}"
214 )
215
216
217def default_model_dir(recording_path: str) -> Path:
218 return Path(recording_path).expanduser() / "models"
219
220
221def is_known_yolo_weight(model_name: str) -> bool:
222 return Path(str(model_name)).name in KNOWN_YOLO_WEIGHTS
223
224
225def _find_downloaded_weight(file_name: str) -> Path | None:
226 search_roots = [
227 Path.cwd(),
228 Path.home() / ".cache",
229 Path.home() / ".config" / "Ultralytics",
230 ]
231 for root in search_roots:
232 if not root.exists():
233 continue
234 direct = root / file_name
235 if direct.exists():
236 return direct
237 try:
238 for candidate in root.rglob(file_name):
239 if candidate.is_file():
240 return candidate
241 except Exception:
242 continue
243 return None
244
245
246def prepare_model_path(model_name: str, model_dir: str | Path) -> Path:
247 ensure_detection_dependencies()
248
249 raw = str(model_name or "").strip() or DEFAULT_DETECTION_CONFIG["model"]
250 expanded = Path(raw).expanduser()
251 if expanded.exists():
252 return expanded
253
254 file_name = Path(raw).name
255 target_dir = Path(model_dir).expanduser()
256 local_target = target_dir / file_name
257 if local_target.exists():
258 return local_target
259
260 if not is_known_yolo_weight(raw):
261 raise ModelLoadError(f"Modell nicht gefunden: {raw}")
262
263 target_dir.mkdir(parents=True, exist_ok=True)
264 last_error = None
265
266 try:
267 from ultralytics.utils.downloads import attempt_download_asset
268
269 downloaded = Path(attempt_download_asset(file_name)).expanduser()
270 if downloaded.exists():
271 shutil.copy2(downloaded, local_target)
272 return local_target
273 except Exception as exc:
274 last_error = exc
275
276 try:
277 from ultralytics import YOLO
278
279 model = YOLO(file_name)
280 for attr in ("ckpt_path", "pt_path"):
281 downloaded_attr = getattr(model, attr, None)
282 if downloaded_attr and Path(downloaded_attr).exists():
283 shutil.copy2(Path(downloaded_attr), local_target)
284 return local_target
285 except Exception as exc:
286 last_error = exc
287
288 downloaded = _find_downloaded_weight(file_name)
289 if downloaded is not None:
290 shutil.copy2(downloaded, local_target)
291 return local_target
292
293 raise ModelLoadError(f"Modell-Download fehlgeschlagen: {file_name}: {last_error}")
294
295
296@dataclass
297class DetectionResult:
298 camera_id: int
299 camera_name: str
300 label: str
301 confidence: float
302 detections: list[dict[str, Any]]
303 annotated_frame: Any
304 timestamp: float
305
306
307class DetectionWorker(QThread):
308 detected = pyqtSignal(object)
309 status = pyqtSignal(str)
310
311 def __init__(self, config: dict | None = None, parent=None):
312 super().__init__(parent)
313 merged = dict(DEFAULT_DETECTION_CONFIG)
314 if config:
315 merged.update(config)
316 self.config = merged
317 self._running = False
318 self._lock = threading.Lock()
319 self._latest_frames: dict[int, tuple[str, Any, float]] = {}
320 self._last_analyzed: dict[int, float] = {}
321 self._stable_hits: dict[tuple[int, str], int] = {}
322 self._last_event: dict[tuple[int, str], float] = {}
323 self._last_camera_event: dict[int, float] = {}
324 self._last_motion_box: dict[tuple[int, str], tuple[float, float]] = {}
325 self._model = None
326 self._names: dict[int, str] = {}
327 self._device = "cpu"
328
329 def submit_frame(self, camera_id: int, camera_name: str, frame):
330 if not self._running:
331 return
332 with self._lock:
333 self._latest_frames[int(camera_id)] = (camera_name, frame.copy(), time.monotonic())
334
335 def stop(self, timeout_ms: int = 3000) -> bool:
336 self._running = False
337 if self.isRunning():
338 return bool(self.wait(timeout_ms))
339 return True
340
341 def run(self):
342 self._running = True
343 try:
344 self._load_model()
345 except Exception as exc:
346 self.status.emit(f"Modell-Laden fehlgeschlagen: {exc}")
347 self._running = False
348 return
349
350 self.status.emit(f"Objekterkennung aktiv ({self.config['model']}, {self._device})")
351 while self._running:
352 item = self._next_frame()
353 if item is None:
354 self.msleep(50)
355 continue
356
357 camera_id, camera_name, frame = item
358 try:
359 self._analyze_frame(camera_id, camera_name, frame)
360 except Exception as exc:
361 self.status.emit(f"Objekterkennung Fehler: {exc}")
362 self.msleep(500)
363
364 def _load_model(self):
365 self.status.emit("Objekterkennung-Runtime wird geprüft/installiert...")
366 ensure_detection_dependencies()
367
368 try:
369 from ultralytics import YOLO
370 except Exception as exc:
371 raise RuntimeError("Python-Paket 'ultralytics' ist nicht installiert") from exc
372
373 requested_device = str(self.config.get("device", "auto")).strip().lower()
374 if requested_device == "auto":
375 try:
376 import torch
377
378 self._device = "cuda:0" if torch.cuda.is_available() else "cpu"
379 except Exception:
380 self._device = "cpu"
381 else:
382 self._device = requested_device
383
384 model_dir = self.config.get("model_dir") or default_model_dir(self.config.get("recording_path", "."))
385 model_path = prepare_model_path(str(self.config.get("model", "yolo11n.pt")), model_dir)
386 self.config["model"] = str(model_path)
387 self._model = YOLO(str(model_path))
388 self._names = dict(getattr(self._model, "names", {}) or {})
389
390 def _next_frame(self):
391 now = time.monotonic()
392 min_interval = 1.0 / max(0.1, float(self.config.get("analysis_fps_per_camera", 3.0)))
393
394 with self._lock:
395 if not self._latest_frames:
396 return None
397 for camera_id, (_camera_name, _frame, frame_time) in list(self._latest_frames.items()):
398 if now - frame_time > 2.0:
399 self._latest_frames.pop(camera_id, None)
400 self._last_analyzed.pop(camera_id, None)
401 candidates = sorted(
402 self._latest_frames.items(),
403 key=lambda item: self._last_analyzed.get(item[0], 0.0),
404 )
405 for camera_id, (camera_name, frame, _frame_time) in candidates:
406 if now - self._last_analyzed.get(camera_id, 0.0) >= min_interval:
407 self._last_analyzed[camera_id] = now
408 return camera_id, camera_name, frame
409 return None
410
411 def _analyze_frame(self, camera_id: int, camera_name: str, frame):
412 if self._model is None:
413 return
414
415 wanted = set(self.config.get("classes") or [])
416 results = self._model.predict(
417 frame,
418 imgsz=int(self.config.get("imgsz", 640)),
419 conf=float(self.config.get("confidence", 0.4)),
420 device=self._device,
421 verbose=False,
422 )
423
424 detections = []
425 best_by_label: dict[str, dict[str, Any]] = {}
426 for result in results:
427 boxes = getattr(result, "boxes", None)
428 if boxes is None:
429 continue
430 for box in boxes:
431 cls_id = int(box.cls[0])
432 label = str(self._names.get(cls_id, cls_id))
433 if wanted and label not in wanted:
434 continue
435 confidence = float(box.conf[0])
436 xyxy = [int(v) for v in box.xyxy[0].tolist()]
437 item = {
438 "label": label,
439 "confidence": confidence,
440 "box": xyxy,
441 }
442 detections.append(item)
443 if label not in best_by_label or confidence > best_by_label[label]["confidence"]:
444 best_by_label[label] = item
445
446 if not detections:
447 self._decay_stable_hits(camera_id)
448 return
449
450 now = time.monotonic()
451 stable_frames = max(1, int(self.config.get("stable_frames", 2)))
452 cooldown = max(0.0, float(self.config.get("cooldown_seconds", 180)))
453 event_suppress = max(0.0, float(self.config.get("event_suppress_seconds", 30)))
454 motion_required = set(self.config.get("motion_required_classes") or [])
455 motion_min_pixels = max(0.0, float(self.config.get("motion_min_pixels", 12.0)))
456 eligible: list[tuple[str, dict[str, Any]]] = []
457
458 if now - self._last_camera_event.get(camera_id, 0.0) < event_suppress:
459 self._reset_stable_hits(camera_id)
460 return
461
462 for label, best in best_by_label.items():
463 key = (camera_id, label)
464 if motion_required and label in motion_required and not self._has_required_motion(key, best, motion_min_pixels):
465 self._stable_hits[key] = 0
466 continue
467 self._stable_hits[key] = self._stable_hits.get(key, 0) + 1
468 if self._stable_hits[key] < stable_frames:
469 continue
470 if now - self._last_event.get(key, 0.0) < cooldown:
471 continue
472 eligible.append((label, best))
473
474 if not eligible:
475 return
476
477 label, best = max(eligible, key=lambda item: float(item[1]["confidence"]))
478 self._last_event[(camera_id, label)] = now
479 self._last_camera_event[camera_id] = now
480 self._reset_other_stable_hits(camera_id, label)
481 annotated = self._annotate(frame, detections)
482 self.detected.emit(
483 DetectionResult(
484 camera_id=camera_id,
485 camera_name=camera_name,
486 label=label,
487 confidence=float(best["confidence"]),
488 detections=detections,
489 annotated_frame=annotated,
490 timestamp=time.time(),
491 )
492 )
493
494 def _decay_stable_hits(self, camera_id: int):
495 for key in list(self._stable_hits.keys()):
496 if key[0] == camera_id:
497 self._stable_hits[key] = max(0, self._stable_hits[key] - 1)
498
499 def _reset_stable_hits(self, camera_id: int):
500 for key in list(self._stable_hits.keys()):
501 if key[0] == camera_id:
502 self._stable_hits[key] = 0
503
504 def _reset_other_stable_hits(self, camera_id: int, active_label: str):
505 for key in list(self._stable_hits.keys()):
506 if key[0] == camera_id and key[1] != active_label:
507 self._stable_hits[key] = 0
508
509 def _has_required_motion(self, key: tuple[int, str], detection: dict[str, Any], min_pixels: float) -> bool:
510 x1, y1, x2, y2 = detection["box"]
511 center = ((x1 + x2) / 2.0, (y1 + y2) / 2.0)
512 previous = self._last_motion_box.get(key)
513 self._last_motion_box[key] = center
514 if previous is None:
515 return False
516 return math.hypot(center[0] - previous[0], center[1] - previous[1]) >= min_pixels
517
518 def _annotate(self, frame, detections: list[dict[str, Any]]):
519 annotated = frame.copy()
520 for detection in detections:
521 x1, y1, x2, y2 = detection["box"]
522 label = detection["label"]
523 confidence = detection["confidence"]
524 cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 200, 255), 2)
525 text = f"{label} {confidence:.2f}"
526 cv2.putText(
527 annotated,
528 text,
529 (x1, max(20, y1 - 6)),
530 cv2.FONT_HERSHEY_SIMPLEX,
531 0.6,
532 (0, 200, 255),
533 2,
534 cv2.LINE_AA,
535 )
536 return annotated