This repository has no description
1import RPi.GPIO as GPIO
2import time
3from picamera2 import Picamera2
4from datetime import datetime
5import os
6import logging
7import http.server
8import socketserver
9import threading
10import websockets
11import asyncio
12
13# Setup logging
14logger = logging.getLogger('camera_server')
15logger.setLevel(logging.INFO)
16formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17file_handler = logging.FileHandler('/home/kierank/camera_server.log')
18file_handler.setFormatter(formatter)
19stream_handler = logging.StreamHandler()
20stream_handler.setFormatter(formatter)
21logger.addHandler(file_handler)
22logger.addHandler(stream_handler)
23
24class Config:
25 BUTTON_PIN = 17
26 PHOTO_DIR = "/home/kierank/photos"
27 WEB_PORT = 80
28 WS_PORT = 8765
29 PHOTO_RESOLUTION = (2592, 1944)
30 CAMERA_SETTLE_TIME = 1
31 DEBOUNCE_DELAY = 0.2
32 POLL_INTERVAL = 0.01
33
34def validate_photo_dir():
35 if not os.path.isabs(Config.PHOTO_DIR):
36 raise ValueError("PHOTO_DIR must be an absolute path")
37 if not os.access(Config.PHOTO_DIR, os.W_OK):
38 raise PermissionError(f"No write access to {Config.PHOTO_DIR}")
39
40# Ensure photo directory exists and is valid
41validate_photo_dir()
42os.makedirs(Config.PHOTO_DIR, exist_ok=True)
43
44# Set up GPIO
45GPIO.setmode(GPIO.BCM)
46GPIO.setup(Config.BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
47
48# WebSocket clients set
49connected_clients = set()
50
51# Create a simple HTML gallery template - using triple quotes properly
52HTML_TEMPLATE = """<!DOCTYPE html>
53<html>
54<head>
55 <title>Inkpress: Gallery</title>
56 <meta name="viewport" content="width=device-width, initial-scale=1">
57 <style>
58 body {{ font-family: Arial; max-width: 800px; margin: 0 auto; padding: 20px; }}
59 h1 {{ text-align: center; }}
60 .gallery {{ display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 10px; }}
61 .photo {{ border: 1px solid #ddd; padding: 5px; }}
62 .photo img {{ width: 100%; height: auto; }}
63 .photo a {{ display: block; text-align: center; margin-top: 5px; }}
64 </style>
65 <script>
66 const ws = new WebSocket('ws://' + window.location.hostname + ':8765');
67 ws.onmessage = function(event) {{
68 if(event.data === 'reload') {{
69 window.location.reload();
70 }}
71 }};
72 </script>
73</head>
74<body>
75 <h1>Inkpress: Gallery</h1>
76 <div class="gallery">
77 {photo_items}
78 </div>
79</body>
80</html>
81"""
82
83class PhotoHandler(http.server.SimpleHTTPRequestHandler):
84 def __init__(self, *args, **kwargs):
85 super().__init__(*args, directory=Config.PHOTO_DIR, **kwargs)
86
87 def do_GET(self):
88 if self.path == '/':
89 self.send_response(200)
90 self.send_header('Content-type', 'text/html')
91 self.send_header('X-Content-Type-Options', 'nosniff')
92 self.send_header('X-Frame-Options', 'DENY')
93 self.send_header('X-XSS-Protection', '1; mode=block')
94 self.end_headers()
95
96 # Generate photo gallery HTML
97 photo_items = ""
98 try:
99 files = sorted(os.listdir(Config.PHOTO_DIR), reverse=True)
100 for filename in files:
101 if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
102 timestamp = filename.replace('photo_', '').replace('.jpg', '')
103 photo_items += f"""
104 <div class="photo">
105 <img src="/{filename}" alt="{timestamp}">
106 <a href="/{filename}" download>Download</a>
107 </div>
108 """
109
110 if not photo_items:
111 photo_items = "<p style='grid-column: 1/-1; text-align: center;'>No photos yet. Press the button to take a photo!</p>"
112 except Exception as e:
113 logger.error(f"Error generating gallery: {str(e)}")
114 photo_items = f"<p>Error loading photos: {str(e)}</p>"
115
116 html = HTML_TEMPLATE.format(photo_items=photo_items)
117 self.wfile.write(html.encode())
118 else:
119 super().do_GET()
120
121async def websocket_handler(websocket, path):
122 connected_clients.add(websocket)
123 try:
124 await websocket.wait_closed()
125 finally:
126 connected_clients.remove(websocket)
127
128async def notify_clients():
129 if connected_clients:
130 await asyncio.gather(
131 *[client.send('reload') for client in connected_clients]
132 )
133
134def take_photo():
135 """
136 Captures a photo using the Raspberry Pi camera.
137
138 The photo is saved with a timestamp in the configured photo directory.
139 The camera is configured for still capture at the specified resolution.
140
141 Raises:
142 IOError: If there's an error accessing the camera or saving the file
143 """
144 try:
145 with Picamera2() as picam2:
146 config = picam2.create_still_configuration(main={"size": Config.PHOTO_RESOLUTION})
147 picam2.configure(config)
148 picam2.start()
149 time.sleep(Config.CAMERA_SETTLE_TIME)
150
151 timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
152 filename = f"{Config.PHOTO_DIR}/photo_{timestamp}.jpg"
153 logger.info(f"Taking photo: {filename}")
154
155 picam2.capture_file(filename)
156 logger.info("Photo taken successfully")
157
158 # Notify websocket clients to reload
159 asyncio.run(notify_clients())
160 except IOError as e:
161 logger.error(f"IO Error while taking photo: {str(e)}")
162 except Exception as e:
163 logger.error(f"Unexpected error while taking photo: {str(e)}")
164
165def run_server():
166 try:
167 handler = PhotoHandler
168 with socketserver.TCPServer(("", Config.WEB_PORT), handler) as httpd:
169 logger.info(f"Web server started at port {Config.WEB_PORT}")
170 httpd.serve_forever()
171 except Exception as e:
172 logger.error(f"Server error: {str(e)}")
173
174def main():
175 logger.info("Camera and web server starting")
176 server = None
177
178 try:
179 # Start HTTP server
180 server = socketserver.TCPServer(("", Config.WEB_PORT), PhotoHandler)
181 server_thread = threading.Thread(target=server.serve_forever, daemon=True)
182 server_thread.start()
183
184 # Start WebSocket server
185 ws_server = websockets.serve(websocket_handler, "0.0.0.0", Config.WS_PORT)
186 asyncio.get_event_loop().run_until_complete(ws_server)
187 ws_thread = threading.Thread(
188 target=asyncio.get_event_loop().run_forever,
189 daemon=True
190 )
191 ws_thread.start()
192
193 previous_state = GPIO.input(Config.BUTTON_PIN)
194 while True:
195 current_state = GPIO.input(Config.BUTTON_PIN)
196
197 if current_state == False and previous_state == True:
198 logger.info("Button press detected")
199 take_photo()
200 time.sleep(Config.DEBOUNCE_DELAY)
201
202 previous_state = current_state
203 time.sleep(Config.POLL_INTERVAL)
204
205 except KeyboardInterrupt:
206 logger.info("Program stopped by user")
207 except Exception as e:
208 logger.error(f"Unexpected error: {str(e)}")
209 finally:
210 if server:
211 server.shutdown()
212 server.server_close()
213 GPIO.cleanup()
214 logger.info("GPIO cleaned up")
215
216if __name__ == "__main__":
217 main()