This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1import RPi.GPIO as GPIO 2import time 3from picamera2 import Picamera2 4from datetime import datetime 5import os 6import logging 7import http.server 8import socketserver 9import threading 10import websockets 11import asyncio 12 13# Setup logging 14logger = logging.getLogger('camera_server') 15logger.setLevel(logging.INFO) 16formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') 17file_handler = logging.FileHandler('/home/kierank/camera_server.log') 18file_handler.setFormatter(formatter) 19stream_handler = logging.StreamHandler() 20stream_handler.setFormatter(formatter) 21logger.addHandler(file_handler) 22logger.addHandler(stream_handler) 23 24class Config: 25 BUTTON_PIN = 17 26 PHOTO_DIR = "/home/kierank/photos" 27 WEB_PORT = 80 28 WS_PORT = 8765 29 PHOTO_RESOLUTION = (2592, 1944) 30 CAMERA_SETTLE_TIME = 1 31 DEBOUNCE_DELAY = 0.2 32 POLL_INTERVAL = 0.01 33 34def validate_photo_dir(): 35 if not os.path.isabs(Config.PHOTO_DIR): 36 raise ValueError("PHOTO_DIR must be an absolute path") 37 if not os.access(Config.PHOTO_DIR, os.W_OK): 38 raise PermissionError(f"No write access to {Config.PHOTO_DIR}") 39 40# Ensure photo directory exists and is valid 41validate_photo_dir() 42os.makedirs(Config.PHOTO_DIR, exist_ok=True) 43 44# Set up GPIO 45GPIO.setmode(GPIO.BCM) 46GPIO.setup(Config.BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) 47 48# WebSocket clients set 49connected_clients = set() 50 51# Create a simple HTML gallery template - using triple quotes properly 52HTML_TEMPLATE = """<!DOCTYPE html> 53<html> 54<head> 55 <title>Inkpress: Gallery</title> 56 <meta name="viewport" content="width=device-width, initial-scale=1"> 57 <style> 58 body {{ font-family: Arial; max-width: 800px; margin: 0 auto; padding: 20px; }} 59 h1 {{ text-align: center; }} 60 .gallery {{ display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 10px; }} 61 .photo {{ border: 1px solid #ddd; padding: 5px; }} 62 .photo img {{ width: 100%; height: auto; }} 63 .photo a {{ display: block; text-align: center; margin-top: 5px; }} 64 </style> 65 <script> 66 const ws = new WebSocket('ws://' + window.location.hostname + ':8765'); 67 ws.onmessage = function(event) {{ 68 if(event.data === 'reload') {{ 69 window.location.reload(); 70 }} 71 }}; 72 </script> 73</head> 74<body> 75 <h1>Inkpress: Gallery</h1> 76 <div class="gallery"> 77 {photo_items} 78 </div> 79</body> 80</html> 81""" 82 83class PhotoHandler(http.server.SimpleHTTPRequestHandler): 84 def __init__(self, *args, **kwargs): 85 super().__init__(*args, directory=Config.PHOTO_DIR, **kwargs) 86 87 def do_GET(self): 88 if self.path == '/': 89 self.send_response(200) 90 self.send_header('Content-type', 'text/html') 91 self.send_header('X-Content-Type-Options', 'nosniff') 92 self.send_header('X-Frame-Options', 'DENY') 93 self.send_header('X-XSS-Protection', '1; mode=block') 94 self.end_headers() 95 96 # Generate photo gallery HTML 97 photo_items = "" 98 try: 99 files = sorted(os.listdir(Config.PHOTO_DIR), reverse=True) 100 for filename in files: 101 if filename.lower().endswith(('.jpg', '.jpeg', '.png')): 102 timestamp = filename.replace('photo_', '').replace('.jpg', '') 103 photo_items += f""" 104 <div class="photo"> 105 <img src="/{filename}" alt="{timestamp}"> 106 <a href="/{filename}" download>Download</a> 107 </div> 108 """ 109 110 if not photo_items: 111 photo_items = "<p style='grid-column: 1/-1; text-align: center;'>No photos yet. Press the button to take a photo!</p>" 112 except Exception as e: 113 logger.error(f"Error generating gallery: {str(e)}") 114 photo_items = f"<p>Error loading photos: {str(e)}</p>" 115 116 html = HTML_TEMPLATE.format(photo_items=photo_items) 117 self.wfile.write(html.encode()) 118 else: 119 super().do_GET() 120 121async def websocket_handler(websocket, path): 122 connected_clients.add(websocket) 123 try: 124 await websocket.wait_closed() 125 finally: 126 connected_clients.remove(websocket) 127 128async def notify_clients(): 129 if connected_clients: 130 await asyncio.gather( 131 *[client.send('reload') for client in connected_clients] 132 ) 133 134def take_photo(): 135 """ 136 Captures a photo using the Raspberry Pi camera. 137 138 The photo is saved with a timestamp in the configured photo directory. 139 The camera is configured for still capture at the specified resolution. 140 141 Raises: 142 IOError: If there's an error accessing the camera or saving the file 143 """ 144 try: 145 with Picamera2() as picam2: 146 config = picam2.create_still_configuration(main={"size": Config.PHOTO_RESOLUTION}) 147 picam2.configure(config) 148 picam2.start() 149 time.sleep(Config.CAMERA_SETTLE_TIME) 150 151 timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") 152 filename = f"{Config.PHOTO_DIR}/photo_{timestamp}.jpg" 153 logger.info(f"Taking photo: {filename}") 154 155 picam2.capture_file(filename) 156 logger.info("Photo taken successfully") 157 158 # Notify websocket clients to reload 159 asyncio.run(notify_clients()) 160 except IOError as e: 161 logger.error(f"IO Error while taking photo: {str(e)}") 162 except Exception as e: 163 logger.error(f"Unexpected error while taking photo: {str(e)}") 164 165def run_server(): 166 try: 167 handler = PhotoHandler 168 with socketserver.TCPServer(("", Config.WEB_PORT), handler) as httpd: 169 logger.info(f"Web server started at port {Config.WEB_PORT}") 170 httpd.serve_forever() 171 except Exception as e: 172 logger.error(f"Server error: {str(e)}") 173 174def main(): 175 logger.info("Camera and web server starting") 176 server = None 177 178 try: 179 # Start HTTP server 180 server = socketserver.TCPServer(("", Config.WEB_PORT), PhotoHandler) 181 server_thread = threading.Thread(target=server.serve_forever, daemon=True) 182 server_thread.start() 183 184 # Start WebSocket server 185 ws_server = websockets.serve(websocket_handler, "0.0.0.0", Config.WS_PORT) 186 asyncio.get_event_loop().run_until_complete(ws_server) 187 ws_thread = threading.Thread( 188 target=asyncio.get_event_loop().run_forever, 189 daemon=True 190 ) 191 ws_thread.start() 192 193 previous_state = GPIO.input(Config.BUTTON_PIN) 194 while True: 195 current_state = GPIO.input(Config.BUTTON_PIN) 196 197 if current_state == False and previous_state == True: 198 logger.info("Button press detected") 199 take_photo() 200 time.sleep(Config.DEBOUNCE_DELAY) 201 202 previous_state = current_state 203 time.sleep(Config.POLL_INTERVAL) 204 205 except KeyboardInterrupt: 206 logger.info("Program stopped by user") 207 except Exception as e: 208 logger.error(f"Unexpected error: {str(e)}") 209 finally: 210 if server: 211 server.shutdown() 212 server.server_close() 213 GPIO.cleanup() 214 logger.info("GPIO cleaned up") 215 216if __name__ == "__main__": 217 main()