About Multi-camera viewer optimized for RTSP streams
0

Configure Feed

Select the types of activity you want to include in your feed.

at master 16 kB View raw
1import os 2import threading 3import time 4from collections import deque 5from datetime import datetime 6from urllib.parse import urlparse 7 8import cv2 9import numpy as np 10import requests 11from PyQt6.QtCore import QThread, pyqtSignal 12 13from camera_utils import ( 14 _build_rtsp_url, 15 _normalize_rtsp_url, 16 _parse_rtsp_url, 17 _tcp_probe, 18 _udp_reolink_wake, 19) 20from i18n import tr 21 22 23class CameraThread(QThread): 24 """Thread für einzelne Kamera mit OpenCV - optimiert für parallele Streams""" 25 frame_ready = pyqtSignal(np.ndarray, int) 26 connection_status = pyqtSignal(bool, int, str) 27 28 def __init__(self, camera_id, rtsp_url, uid=""): 29 super().__init__() 30 self.camera_id = camera_id 31 # Normalize URL to ensure explicit port (prevents FFmpeg TCP fallback errors) 32 self.rtsp_url = _normalize_rtsp_url(rtsp_url) 33 self.uid = uid 34 self.running = False 35 self.recording = False 36 self.video_writer = None 37 self.event_writer = None 38 self._event_clip_filename = None 39 self._event_clip_until = 0.0 40 self._event_clip_started = 0.0 41 self._writer_lock = threading.Lock() 42 self._buffer_lock = threading.Lock() 43 self._frame_buffer = deque() 44 self._frame_buffer_seconds = 12.0 45 self.cap = None 46 self.reconnect_delay = 5 # Mehr Zeit für Akku-Kameras 47 self._host, self._port, self._user, self._password = _parse_rtsp_url(rtsp_url) 48 self._is_proxy_stream = self._host in ("localhost", "127.0.0.1") and int(self._port or 0) == 8554 49 if self._is_proxy_stream: 50 self.reconnect_delay = 2 51 52 # Alternative Pfade (Reolink Fallbacks) 53 self._alt_paths = [ 54 "h264Preview_01_main", 55 "h265Preview_01_main", 56 "Preview_01_main", 57 "h264Preview_01_sub", 58 "Preview_01_sub" 59 ] 60 61 def run(self): 62 """Hauptschleife mit automatischem Retry""" 63 self.running = True 64 65 while self.running: 66 try: 67 self._connect_and_stream() 68 except Exception as e: 69 self.connection_status.emit(False, self.camera_id, tr("error.prefix", error=str(e))) 70 finally: 71 self._release_capture() 72 73 if self.running: 74 self.connection_status.emit(False, self.camera_id, tr("camera.preview.retrying")) 75 for _ in range(int(self.reconnect_delay * 10)): 76 if not self.running: 77 break 78 self.msleep(100) 79 80 self._cleanup() 81 82 def _release_capture(self): 83 if self.cap: 84 try: 85 self.cap.release() 86 except Exception: 87 pass 88 self.cap = None 89 90 def _release_writer(self): 91 with self._writer_lock: 92 if self.video_writer: 93 try: 94 self.video_writer.release() 95 except Exception: 96 pass 97 self.video_writer = None 98 self.recording = False 99 100 def _wait_before_reconnect(self, seconds: float): 101 end_time = time.monotonic() + seconds 102 while self.running and time.monotonic() < end_time: 103 self.msleep(100) 104 105 def _open_capture(self, rtsp_url: str, open_timeout_ms: int, read_timeout_ms: int): 106 self._release_capture() 107 return cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG, [ 108 cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, open_timeout_ms, 109 cv2.CAP_PROP_READ_TIMEOUT_MSEC, read_timeout_ms 110 ]) 111 112 def _connect_and_stream(self): 113 """Verbindung herstellen und streamen""" 114 # Best-effort wake attempt for sleeping/battery cameras 115 if self._host and not self._is_proxy_stream: 116 # Intensiv-Weckphase (für Akku-Kameras wie Argus PT Ultra) 117 # Wir wiederholen das Wecken und prüfen die Erreichbarkeit über mind. 10 Sek. 118 self.connection_status.emit(False, self.camera_id, tr("camera.preview.waiting")) 119 120 wake_ok = False 121 for attempt in range(10): # 10 Versuche alle ~1s = ca. 10s total 122 if not self.running: 123 break 124 125 # 1. UDP Wake Burst 126 _udp_reolink_wake(self._host, self.uid) 127 128 # 2. Optionaler HTTP Ping 129 try: 130 requests.get(f"http://{self._host}:8000/api.cgi?cmd=GetDevInfo", timeout=0.2) 131 except Exception: 132 pass 133 134 # 3. RTSP Erreichbarkeit prüfen (Port 554) 135 for _ in range(3): 136 if not self.running: 137 break 138 ok, _ = _tcp_probe(self._host, int(self._port or 554), timeout=0.2) 139 if ok: 140 wake_ok = True 141 break 142 time.sleep(0.3) 143 144 if wake_ok: 145 break 146 147 if wake_ok: 148 self.connection_status.emit(True, self.camera_id, tr("camera.status.connected")) # Wach! 149 self._wait_before_reconnect(1.0) 150 else: 151 # Auch wenn TCP Probe fehlschlägt, versuchen wir es trotzdem 152 # (manchen Kameras antworten nicht auf Port-Checks, aber auf echte RTSP-Anfragen) 153 self.connection_status.emit(False, self.camera_id, tr("camera.status.connecting")) 154 155 open_timeout_ms = 20000 if self._is_proxy_stream else 3000 156 read_timeout_ms = 10000 if self._is_proxy_stream else 3000 157 158 # RTSP Stream öffnen (mit Fallback-Pfaden für native Reolink-RTSP-URLs) 159 # Use TCP transport to reduce RTP packet loss warnings 160 self.cap = self._open_capture(self.rtsp_url, open_timeout_ms, read_timeout_ms) 161 162 # Falls eine native Kamera nicht öffnet, probieren wir Reolink-typische Varianten. 163 # Bei ReolinkProxy-URLs ist der Pfad absichtlich fix (<Name>/mainStream). 164 if not self.cap.isOpened() and not self._is_proxy_stream: 165 # Parse URL properly to rebuild with alternative paths 166 try: 167 u = urlparse(self.rtsp_url) 168 port = u.port or 554 169 170 for path in self._alt_paths: 171 # Use _build_rtsp_url to properly encode credentials 172 test_url = _build_rtsp_url( 173 host=u.hostname, 174 port=port, 175 username=u.username or '', 176 password=u.password or '', 177 path=path, 178 scheme=u.scheme 179 ) 180 if test_url == self.rtsp_url: 181 continue 182 183 self.connection_status.emit(False, self.camera_id, f"Prüfe Pfad: {path}...") 184 self.cap = self._open_capture(test_url, open_timeout_ms, read_timeout_ms) 185 if self.cap.isOpened(): 186 self.rtsp_url = test_url 187 break 188 except Exception: 189 pass 190 191 if not self.cap.isOpened(): 192 # Diagnostik: Wenn RTSP zu ist, aber Port 8000 offen, ist RTSP wahrscheinlich in der Kamera deaktiviert 193 if self._host and not self._is_proxy_stream: 194 ok_api, _ = _tcp_probe(self._host, 8000, timeout=0.5) 195 if ok_api: 196 raise Exception("Kamera antwortet auf API (Port 8000), aber RTSP ist blockiert. Bitte 'RTSP' in den Kamera-Einstellungen (Netzwerk -> Fortgeschritten -> Servereinstellungen) aktivieren!") 197 raise Exception(tr("camera.error.stream_unreachable")) 198 199 # Optimierungen für geringe Latenz 200 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) 201 if not self._is_proxy_stream: 202 self.cap.set(cv2.CAP_PROP_FPS, 25) 203 204 self.connection_status.emit(True, self.camera_id, tr("camera.status.connected")) 205 206 last_ui_emit = 0.0 207 ui_emit_interval = 1.0 / (15 if self._is_proxy_stream else 25) 208 failed_reads = 0 209 max_failed_reads = 4 if self._is_proxy_stream else 3 210 211 while self.running: 212 ret, frame = self.cap.read() 213 214 if not ret: 215 failed_reads += 1 216 if failed_reads >= max_failed_reads: 217 raise Exception(tr("camera.error.stream_interrupted")) 218 self.msleep(500 if self._is_proxy_stream else 200) 219 continue 220 221 failed_reads = 0 222 self._remember_frame(frame) 223 224 now = time.monotonic() 225 if now - last_ui_emit >= ui_emit_interval: 226 self.frame_ready.emit(frame.copy(), self.camera_id) 227 last_ui_emit = now 228 229 # Aufzeichnung (alle Frames) 230 with self._writer_lock: 231 if self.recording and self.video_writer is not None: 232 try: 233 self.video_writer.write(frame) 234 except Exception: 235 # Don't crash the streaming thread due to writer issues. 236 pass 237 if self.event_writer is not None: 238 try: 239 self.event_writer.write(frame) 240 except Exception: 241 pass 242 if time.monotonic() >= self._event_clip_until: 243 self._release_event_writer_locked() 244 245 self.msleep(5 if self._is_proxy_stream else 10) 246 247 def _remember_frame(self, frame): 248 now = time.monotonic() 249 with self._buffer_lock: 250 self._frame_buffer.append((now, frame.copy())) 251 cutoff = now - self._frame_buffer_seconds 252 while self._frame_buffer and self._frame_buffer[0][0] < cutoff: 253 self._frame_buffer.popleft() 254 255 def _cleanup(self): 256 """Ressourcen freigeben""" 257 self._release_capture() 258 self._release_writer() 259 with self._writer_lock: 260 self._release_event_writer_locked() 261 262 def _release_event_writer_locked(self): 263 if self.event_writer: 264 try: 265 self.event_writer.release() 266 except Exception: 267 pass 268 self.event_writer = None 269 self._event_clip_filename = None 270 self._event_clip_until = 0.0 271 self._event_clip_started = 0.0 272 273 def start_recording(self, output_path): 274 """Starte Aufzeichnung""" 275 if not (self.cap and self.cap.isOpened()): 276 return None 277 278 with self._writer_lock: 279 if self.recording and self.video_writer is not None: 280 return None 281 282 # Ensure any previous writer is closed before re-opening 283 if self.video_writer is not None: 284 try: 285 self.video_writer.release() 286 except Exception: 287 pass 288 self.video_writer = None 289 290 fps = float(self.cap.get(cv2.CAP_PROP_FPS)) 291 if not fps or fps <= 0 or fps > 120: 292 fps = 25.0 293 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) 294 height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) 295 if width <= 0 or height <= 0: 296 width, height = 640, 480 297 298 os.makedirs(output_path, exist_ok=True) 299 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 300 301 # Some streams behave badly with MPEG4/XVID timestamping (invalid PTS). 302 # MJPG-in-AVI is usually more tolerant. 303 fourcc = cv2.VideoWriter_fourcc(*'MJPG') 304 filename = os.path.join(output_path, f"camera_{self.camera_id}_{timestamp}.avi") 305 306 vw = cv2.VideoWriter(filename, fourcc, fps, (width, height)) 307 if not vw.isOpened(): 308 return None 309 310 self.video_writer = vw 311 self.recording = True 312 return filename 313 return None 314 315 def start_event_clip( 316 self, 317 output_path, 318 label: str, 319 camera_name: str = "", 320 pre_seconds: float = 8.0, 321 post_seconds: float = 20.0, 322 max_seconds: float = 180.0, 323 ): 324 """Start a short event clip from the rolling frame buffer plus future frames.""" 325 if not (self.cap and self.cap.isOpened()): 326 return None 327 328 with self._writer_lock: 329 now = time.monotonic() 330 max_seconds = min(180.0, max(1.0, float(max_seconds))) 331 post_seconds = max(1.0, min(float(post_seconds), max_seconds)) 332 if self.event_writer is not None and now < self._event_clip_until: 333 max_until = (self._event_clip_started or now) + max_seconds 334 self._event_clip_until = min(max_until, max(self._event_clip_until, now + post_seconds)) 335 return self._event_clip_filename 336 337 with self._buffer_lock: 338 buffered_items = [ 339 (frame_time, frame) 340 for frame_time, frame in self._frame_buffer 341 if frame_time >= now - max(0.0, min(float(pre_seconds), max_seconds)) 342 ] 343 buffered = [frame for _frame_time, frame in buffered_items] 344 actual_pre_seconds = now - buffered_items[0][0] if buffered_items else 0.0 345 346 fps = float(self.cap.get(cv2.CAP_PROP_FPS)) 347 if not fps or fps <= 0 or fps > 120: 348 fps = 25.0 349 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) 350 height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) 351 if (width <= 0 or height <= 0) and buffered: 352 height, width = buffered[-1].shape[:2] 353 if width <= 0 or height <= 0: 354 width, height = 640, 480 355 356 os.makedirs(output_path, exist_ok=True) 357 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 358 safe_label = "".join(c if (c.isalnum() or c in "-_") else "_" for c in str(label)) 359 safe_camera = "".join(c if (c.isalnum() or c in "-_") else "_" for c in str(camera_name)) 360 prefix = f"event_{safe_camera}_{self.camera_id}" if safe_camera else f"event_{self.camera_id}" 361 filename = os.path.join(output_path, f"{prefix}_{safe_label}_{timestamp}.avi") 362 363 fourcc = cv2.VideoWriter_fourcc(*'MJPG') 364 vw = cv2.VideoWriter(filename, fourcc, fps, (width, height)) 365 if not vw.isOpened(): 366 return None 367 368 for frame in buffered: 369 if frame.shape[1] != width or frame.shape[0] != height: 370 frame = cv2.resize(frame, (width, height)) 371 vw.write(frame) 372 373 self.event_writer = vw 374 self._event_clip_filename = filename 375 self._event_clip_started = now - max(0.0, actual_pre_seconds) 376 self._event_clip_until = min(self._event_clip_started + max_seconds, now + post_seconds) 377 return filename 378 379 def stop_recording(self): 380 """Stoppe Aufzeichnung""" 381 with self._writer_lock: 382 self.recording = False 383 if self.video_writer: 384 try: 385 self.video_writer.release() 386 except Exception: 387 pass 388 self.video_writer = None 389 390 def request_stop(self): 391 """Signal the stream loop to stop. 392 393 OpenCV/FFmpeg can abort if VideoCapture is released from a different 394 thread while open/read is active. The stream thread owns cleanup. 395 """ 396 self.running = False 397 self.stop_recording() 398 399 def stop(self, timeout_ms=2000): 400 """Thread stoppen""" 401 self.request_stop() 402 if not self.wait(timeout_ms): 403 return False 404 return True