About Multi-camera viewer optimized for RTSP streams
1import os
2import threading
3import time
4from collections import deque
5from datetime import datetime
6from urllib.parse import urlparse
7
8import cv2
9import numpy as np
10import requests
11from PyQt6.QtCore import QThread, pyqtSignal
12
13from camera_utils import (
14 _build_rtsp_url,
15 _normalize_rtsp_url,
16 _parse_rtsp_url,
17 _tcp_probe,
18 _udp_reolink_wake,
19)
20from i18n import tr
21
22
23class CameraThread(QThread):
24 """Thread für einzelne Kamera mit OpenCV - optimiert für parallele Streams"""
25 frame_ready = pyqtSignal(np.ndarray, int)
26 connection_status = pyqtSignal(bool, int, str)
27
28 def __init__(self, camera_id, rtsp_url, uid=""):
29 super().__init__()
30 self.camera_id = camera_id
31 # Normalize URL to ensure explicit port (prevents FFmpeg TCP fallback errors)
32 self.rtsp_url = _normalize_rtsp_url(rtsp_url)
33 self.uid = uid
34 self.running = False
35 self.recording = False
36 self.video_writer = None
37 self.event_writer = None
38 self._event_clip_filename = None
39 self._event_clip_until = 0.0
40 self._event_clip_started = 0.0
41 self._writer_lock = threading.Lock()
42 self._buffer_lock = threading.Lock()
43 self._frame_buffer = deque()
44 self._frame_buffer_seconds = 12.0
45 self.cap = None
46 self.reconnect_delay = 5 # Mehr Zeit für Akku-Kameras
47 self._host, self._port, self._user, self._password = _parse_rtsp_url(rtsp_url)
48 self._is_proxy_stream = self._host in ("localhost", "127.0.0.1") and int(self._port or 0) == 8554
49 if self._is_proxy_stream:
50 self.reconnect_delay = 2
51
52 # Alternative Pfade (Reolink Fallbacks)
53 self._alt_paths = [
54 "h264Preview_01_main",
55 "h265Preview_01_main",
56 "Preview_01_main",
57 "h264Preview_01_sub",
58 "Preview_01_sub"
59 ]
60
61 def run(self):
62 """Hauptschleife mit automatischem Retry"""
63 self.running = True
64
65 while self.running:
66 try:
67 self._connect_and_stream()
68 except Exception as e:
69 self.connection_status.emit(False, self.camera_id, tr("error.prefix", error=str(e)))
70 finally:
71 self._release_capture()
72
73 if self.running:
74 self.connection_status.emit(False, self.camera_id, tr("camera.preview.retrying"))
75 for _ in range(int(self.reconnect_delay * 10)):
76 if not self.running:
77 break
78 self.msleep(100)
79
80 self._cleanup()
81
82 def _release_capture(self):
83 if self.cap:
84 try:
85 self.cap.release()
86 except Exception:
87 pass
88 self.cap = None
89
90 def _release_writer(self):
91 with self._writer_lock:
92 if self.video_writer:
93 try:
94 self.video_writer.release()
95 except Exception:
96 pass
97 self.video_writer = None
98 self.recording = False
99
100 def _wait_before_reconnect(self, seconds: float):
101 end_time = time.monotonic() + seconds
102 while self.running and time.monotonic() < end_time:
103 self.msleep(100)
104
105 def _open_capture(self, rtsp_url: str, open_timeout_ms: int, read_timeout_ms: int):
106 self._release_capture()
107 return cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG, [
108 cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, open_timeout_ms,
109 cv2.CAP_PROP_READ_TIMEOUT_MSEC, read_timeout_ms
110 ])
111
112 def _connect_and_stream(self):
113 """Verbindung herstellen und streamen"""
114 # Best-effort wake attempt for sleeping/battery cameras
115 if self._host and not self._is_proxy_stream:
116 # Intensiv-Weckphase (für Akku-Kameras wie Argus PT Ultra)
117 # Wir wiederholen das Wecken und prüfen die Erreichbarkeit über mind. 10 Sek.
118 self.connection_status.emit(False, self.camera_id, tr("camera.preview.waiting"))
119
120 wake_ok = False
121 for attempt in range(10): # 10 Versuche alle ~1s = ca. 10s total
122 if not self.running:
123 break
124
125 # 1. UDP Wake Burst
126 _udp_reolink_wake(self._host, self.uid)
127
128 # 2. Optionaler HTTP Ping
129 try:
130 requests.get(f"http://{self._host}:8000/api.cgi?cmd=GetDevInfo", timeout=0.2)
131 except Exception:
132 pass
133
134 # 3. RTSP Erreichbarkeit prüfen (Port 554)
135 for _ in range(3):
136 if not self.running:
137 break
138 ok, _ = _tcp_probe(self._host, int(self._port or 554), timeout=0.2)
139 if ok:
140 wake_ok = True
141 break
142 time.sleep(0.3)
143
144 if wake_ok:
145 break
146
147 if wake_ok:
148 self.connection_status.emit(True, self.camera_id, tr("camera.status.connected")) # Wach!
149 self._wait_before_reconnect(1.0)
150 else:
151 # Auch wenn TCP Probe fehlschlägt, versuchen wir es trotzdem
152 # (manchen Kameras antworten nicht auf Port-Checks, aber auf echte RTSP-Anfragen)
153 self.connection_status.emit(False, self.camera_id, tr("camera.status.connecting"))
154
155 open_timeout_ms = 20000 if self._is_proxy_stream else 3000
156 read_timeout_ms = 10000 if self._is_proxy_stream else 3000
157
158 # RTSP Stream öffnen (mit Fallback-Pfaden für native Reolink-RTSP-URLs)
159 # Use TCP transport to reduce RTP packet loss warnings
160 self.cap = self._open_capture(self.rtsp_url, open_timeout_ms, read_timeout_ms)
161
162 # Falls eine native Kamera nicht öffnet, probieren wir Reolink-typische Varianten.
163 # Bei ReolinkProxy-URLs ist der Pfad absichtlich fix (<Name>/mainStream).
164 if not self.cap.isOpened() and not self._is_proxy_stream:
165 # Parse URL properly to rebuild with alternative paths
166 try:
167 u = urlparse(self.rtsp_url)
168 port = u.port or 554
169
170 for path in self._alt_paths:
171 # Use _build_rtsp_url to properly encode credentials
172 test_url = _build_rtsp_url(
173 host=u.hostname,
174 port=port,
175 username=u.username or '',
176 password=u.password or '',
177 path=path,
178 scheme=u.scheme
179 )
180 if test_url == self.rtsp_url:
181 continue
182
183 self.connection_status.emit(False, self.camera_id, f"Prüfe Pfad: {path}...")
184 self.cap = self._open_capture(test_url, open_timeout_ms, read_timeout_ms)
185 if self.cap.isOpened():
186 self.rtsp_url = test_url
187 break
188 except Exception:
189 pass
190
191 if not self.cap.isOpened():
192 # Diagnostik: Wenn RTSP zu ist, aber Port 8000 offen, ist RTSP wahrscheinlich in der Kamera deaktiviert
193 if self._host and not self._is_proxy_stream:
194 ok_api, _ = _tcp_probe(self._host, 8000, timeout=0.5)
195 if ok_api:
196 raise Exception("Kamera antwortet auf API (Port 8000), aber RTSP ist blockiert. Bitte 'RTSP' in den Kamera-Einstellungen (Netzwerk -> Fortgeschritten -> Servereinstellungen) aktivieren!")
197 raise Exception(tr("camera.error.stream_unreachable"))
198
199 # Optimierungen für geringe Latenz
200 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
201 if not self._is_proxy_stream:
202 self.cap.set(cv2.CAP_PROP_FPS, 25)
203
204 self.connection_status.emit(True, self.camera_id, tr("camera.status.connected"))
205
206 last_ui_emit = 0.0
207 ui_emit_interval = 1.0 / (15 if self._is_proxy_stream else 25)
208 failed_reads = 0
209 max_failed_reads = 4 if self._is_proxy_stream else 3
210
211 while self.running:
212 ret, frame = self.cap.read()
213
214 if not ret:
215 failed_reads += 1
216 if failed_reads >= max_failed_reads:
217 raise Exception(tr("camera.error.stream_interrupted"))
218 self.msleep(500 if self._is_proxy_stream else 200)
219 continue
220
221 failed_reads = 0
222 self._remember_frame(frame)
223
224 now = time.monotonic()
225 if now - last_ui_emit >= ui_emit_interval:
226 self.frame_ready.emit(frame.copy(), self.camera_id)
227 last_ui_emit = now
228
229 # Aufzeichnung (alle Frames)
230 with self._writer_lock:
231 if self.recording and self.video_writer is not None:
232 try:
233 self.video_writer.write(frame)
234 except Exception:
235 # Don't crash the streaming thread due to writer issues.
236 pass
237 if self.event_writer is not None:
238 try:
239 self.event_writer.write(frame)
240 except Exception:
241 pass
242 if time.monotonic() >= self._event_clip_until:
243 self._release_event_writer_locked()
244
245 self.msleep(5 if self._is_proxy_stream else 10)
246
247 def _remember_frame(self, frame):
248 now = time.monotonic()
249 with self._buffer_lock:
250 self._frame_buffer.append((now, frame.copy()))
251 cutoff = now - self._frame_buffer_seconds
252 while self._frame_buffer and self._frame_buffer[0][0] < cutoff:
253 self._frame_buffer.popleft()
254
255 def _cleanup(self):
256 """Ressourcen freigeben"""
257 self._release_capture()
258 self._release_writer()
259 with self._writer_lock:
260 self._release_event_writer_locked()
261
262 def _release_event_writer_locked(self):
263 if self.event_writer:
264 try:
265 self.event_writer.release()
266 except Exception:
267 pass
268 self.event_writer = None
269 self._event_clip_filename = None
270 self._event_clip_until = 0.0
271 self._event_clip_started = 0.0
272
273 def start_recording(self, output_path):
274 """Starte Aufzeichnung"""
275 if not (self.cap and self.cap.isOpened()):
276 return None
277
278 with self._writer_lock:
279 if self.recording and self.video_writer is not None:
280 return None
281
282 # Ensure any previous writer is closed before re-opening
283 if self.video_writer is not None:
284 try:
285 self.video_writer.release()
286 except Exception:
287 pass
288 self.video_writer = None
289
290 fps = float(self.cap.get(cv2.CAP_PROP_FPS))
291 if not fps or fps <= 0 or fps > 120:
292 fps = 25.0
293 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
294 height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
295 if width <= 0 or height <= 0:
296 width, height = 640, 480
297
298 os.makedirs(output_path, exist_ok=True)
299 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
300
301 # Some streams behave badly with MPEG4/XVID timestamping (invalid PTS).
302 # MJPG-in-AVI is usually more tolerant.
303 fourcc = cv2.VideoWriter_fourcc(*'MJPG')
304 filename = os.path.join(output_path, f"camera_{self.camera_id}_{timestamp}.avi")
305
306 vw = cv2.VideoWriter(filename, fourcc, fps, (width, height))
307 if not vw.isOpened():
308 return None
309
310 self.video_writer = vw
311 self.recording = True
312 return filename
313 return None
314
315 def start_event_clip(
316 self,
317 output_path,
318 label: str,
319 camera_name: str = "",
320 pre_seconds: float = 8.0,
321 post_seconds: float = 20.0,
322 max_seconds: float = 180.0,
323 ):
324 """Start a short event clip from the rolling frame buffer plus future frames."""
325 if not (self.cap and self.cap.isOpened()):
326 return None
327
328 with self._writer_lock:
329 now = time.monotonic()
330 max_seconds = min(180.0, max(1.0, float(max_seconds)))
331 post_seconds = max(1.0, min(float(post_seconds), max_seconds))
332 if self.event_writer is not None and now < self._event_clip_until:
333 max_until = (self._event_clip_started or now) + max_seconds
334 self._event_clip_until = min(max_until, max(self._event_clip_until, now + post_seconds))
335 return self._event_clip_filename
336
337 with self._buffer_lock:
338 buffered_items = [
339 (frame_time, frame)
340 for frame_time, frame in self._frame_buffer
341 if frame_time >= now - max(0.0, min(float(pre_seconds), max_seconds))
342 ]
343 buffered = [frame for _frame_time, frame in buffered_items]
344 actual_pre_seconds = now - buffered_items[0][0] if buffered_items else 0.0
345
346 fps = float(self.cap.get(cv2.CAP_PROP_FPS))
347 if not fps or fps <= 0 or fps > 120:
348 fps = 25.0
349 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
350 height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
351 if (width <= 0 or height <= 0) and buffered:
352 height, width = buffered[-1].shape[:2]
353 if width <= 0 or height <= 0:
354 width, height = 640, 480
355
356 os.makedirs(output_path, exist_ok=True)
357 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
358 safe_label = "".join(c if (c.isalnum() or c in "-_") else "_" for c in str(label))
359 safe_camera = "".join(c if (c.isalnum() or c in "-_") else "_" for c in str(camera_name))
360 prefix = f"event_{safe_camera}_{self.camera_id}" if safe_camera else f"event_{self.camera_id}"
361 filename = os.path.join(output_path, f"{prefix}_{safe_label}_{timestamp}.avi")
362
363 fourcc = cv2.VideoWriter_fourcc(*'MJPG')
364 vw = cv2.VideoWriter(filename, fourcc, fps, (width, height))
365 if not vw.isOpened():
366 return None
367
368 for frame in buffered:
369 if frame.shape[1] != width or frame.shape[0] != height:
370 frame = cv2.resize(frame, (width, height))
371 vw.write(frame)
372
373 self.event_writer = vw
374 self._event_clip_filename = filename
375 self._event_clip_started = now - max(0.0, actual_pre_seconds)
376 self._event_clip_until = min(self._event_clip_started + max_seconds, now + post_seconds)
377 return filename
378
379 def stop_recording(self):
380 """Stoppe Aufzeichnung"""
381 with self._writer_lock:
382 self.recording = False
383 if self.video_writer:
384 try:
385 self.video_writer.release()
386 except Exception:
387 pass
388 self.video_writer = None
389
390 def request_stop(self):
391 """Signal the stream loop to stop.
392
393 OpenCV/FFmpeg can abort if VideoCapture is released from a different
394 thread while open/read is active. The stream thread owns cleanup.
395 """
396 self.running = False
397 self.stop_recording()
398
399 def stop(self, timeout_ms=2000):
400 """Thread stoppen"""
401 self.request_stop()
402 if not self.wait(timeout_ms):
403 return False
404 return True