This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1import os 2import json 3import logging 4import requests 5import magic 6from datetime import datetime 7from dotenv import load_dotenv 8from atproto import Client, models 9from atproto.exceptions import BadRequestError 10import sys 11from crontab import CronTab 12from logging.handlers import TimedRotatingFileHandler 13import glob 14import time 15 16# Ensure the script is run inside a virtual environment 17if not hasattr(sys, 'real_prefix') and not (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix): 18 print("Error: This script must be run inside a virtual environment.") 19 sys.exit(1) 20else: 21 logging.info("Virtual environment detected.") 22 23# Define the paths 24BASE_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) 25ASSETS_DIR = os.path.join(BASE_DIR, "assets") 26ENV_PATH = os.path.join(ASSETS_DIR, ".env") 27JSON_PATH = os.path.join(ASSETS_DIR, "cids.json") 28SCRIPT_PATH = os.path.abspath(__file__) 29 30# Define the log file directory and log file path 31log_dir = os.path.join(BASE_DIR, "logs") 32if not os.path.exists(log_dir): 33 os.makedirs(log_dir) 34 35def cleanup_old_logs(log_directory, days=30): 36 """Deletes log files older than the specified number of days.""" 37 cutoff = time.time() - (days * 86400) # Convert days to seconds 38 for log_file in glob.glob(os.path.join(log_directory, "update.log")): 39 if os.path.isfile(log_file) and os.path.getmtime(log_file) < cutoff: 40 os.remove(log_file) 41 print(f"Deleted old log: {log_file}") 42 43# Cleanup logs older than 30 days before setting up new logging 44cleanup_old_logs(log_dir, days=30) 45 46# Use a fixed log file name for the current log; TimedRotatingFileHandler will manage rotations. 47log_file_path = os.path.join(log_dir, "update.log") 48 49# Configure logging to both console and file with bi-weekly rotation 50console_handler = logging.StreamHandler() 51console_handler.setLevel(logging.INFO) # Show only INFO and higher levels on console 52 53# Create a timed rotating file handler (rotate every 14 days, keep up to 5 backups) 54file_handler = TimedRotatingFileHandler( 55 log_file_path, 56 when="D", # 'D' stands for days 57 interval=14, # Rotate every 14 days 58 backupCount=5 59) 60file_handler.setLevel(logging.INFO) # Save all logs to file 61 62formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") 63console_handler.setFormatter(formatter) 64file_handler.setFormatter(formatter) 65 66# Root logger setup 67logger = logging.getLogger() 68logger.setLevel(logging.INFO) # Set the root logger level to INFO 69 70# Remove default handlers (to prevent duplication) 71for handler in logger.handlers[:]: 72 logger.removeHandler(handler) 73 74# Add the custom handlers 75logger.addHandler(console_handler) 76logger.addHandler(file_handler) 77 78# Suppress httpx logging (this stops httpx internal logs) 79logging.getLogger("httpx").setLevel(logging.WARNING) # Suppress INFO and DEBUG logs from httpx 80 81def ensure_https(url): 82 """Ensure the URL starts with https://.""" 83 if not url.startswith("http://") and not url.startswith("https://"): 84 return "https://" + url 85 if url.startswith("http://"): 86 return "https://" + url[7:] 87 return url 88 89def is_endpoint_alive(url): 90 """Check if the provided endpoint is alive by making a health check request.""" 91 health_url = f"{url.rstrip('/')}/xrpc/_health" 92 try: 93 response = requests.get(health_url, timeout=5) 94 if response.status_code == 200: 95 logger.info(f"Endpoint {url} is alive and healthy.") 96 return True 97 else: 98 logger.warning(f"Endpoint {url} is not responding correctly: {response.status_code}") 99 return False 100 except requests.RequestException as e: 101 logger.error(f"Health check failed for {health_url}: {e}") 102 return False 103 104def fetch_blob(did, cid, endpoint): 105 """Fetch the blob from the endpoint.""" 106 url = f"{endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 107 try: 108 response = requests.get(url, timeout=5) 109 response.raise_for_status() 110 logger.info(f"Fetched blob {cid} successfully.") 111 return response.content 112 except requests.RequestException as e: 113 logger.error(f"Failed to fetch blob {cid} for DID {did}: {e}") 114 return None 115 116def get_blob_metadata(cid, did, endpoint): 117 """Get the metadata for the blob.""" 118 try: 119 logger.info(f"Retrieving metadata for blob {cid}.") 120 blob_data = fetch_blob(did, cid, endpoint) 121 if blob_data is None: 122 return None 123 124 mime = magic.Magic(mime=True) 125 mime_type = mime.from_buffer(blob_data) 126 size = len(blob_data) 127 128 logger.debug(f"Blob metadata: MIME Type - {mime_type}, Size - {size}") 129 return { 130 "$type": "blob", 131 "ref": {"$link": cid}, 132 "mimeType": mime_type, 133 "size": size, 134 } 135 except Exception as e: 136 logger.error(f"Error retrieving metadata for blob {cid}: {e}") 137 return None 138 139def validate_environment_variables(): 140 """Validate environment variables and return a dictionary of values.""" 141 endpoint = os.getenv("ENDPOINT") 142 handle = os.getenv("HANDLE") 143 password = os.getenv("PASSWORD") 144 did = os.getenv("DID") 145 update_banner = os.getenv("UPDATE_BANNER", "false").lower() == "true" 146 147 if not all([endpoint, handle, password, did]): 148 logger.error("Missing environment variables. Ensure ENDPOINT, HANDLE, PASSWORD, and DID are set in .env file.") 149 return None 150 return { 151 "endpoint": endpoint, 152 "handle": handle, 153 "password": password, 154 "did": did, 155 "update_banner": update_banner 156 } 157 158def setup_cron_job(): 159 """Set up the cron job to run every hour.""" 160 # Get the path to the virtual environment's Python interpreter 161 venv_python = os.path.join(BASE_DIR, ".venv", "bin", "python3") 162 163 # Check if the cron job already exists 164 cron = CronTab(user=True) 165 job_exists = False 166 for job in cron: 167 if SCRIPT_PATH in job.command: 168 job_exists = True 169 break 170 171 if not job_exists: 172 # Set up the cron job to run every hour (top of the hour) 173 cron_command = f"{venv_python} {SCRIPT_PATH}" 174 job = cron.new(command=cron_command, comment="Avatar update script") 175 job.minute.on(0) # Run at the start of every hour 176 cron.write() 177 logger.info("Cron job has been set up to run every hour within the virtual environment.") 178 else: 179 logger.info("Cron job already exists.") 180 181def main(): 182 """Main function to run the avatar and banner update process.""" 183 # Set up the cron job (only once) 184 try: 185 setup_cron_job() 186 except Exception as e: 187 logger.error(f"Error setting cron job: {e}") 188 pass 189 190 logger.info("Script started.") 191 logger.info("Starting update process...") 192 193 # Load environment variables from the .env file 194 if os.path.exists(ENV_PATH): 195 load_dotenv(ENV_PATH) 196 logger.info(f"Loaded environment from {ENV_PATH}") 197 else: 198 logger.error(f"Missing .env file at {ENV_PATH}") 199 return 200 201 env_vars = validate_environment_variables() 202 if not env_vars: 203 return 204 205 # Ensure endpoint URL is correct and alive 206 endpoint = ensure_https(env_vars["endpoint"]) 207 if not is_endpoint_alive(endpoint): 208 logger.error(f"Endpoint {endpoint} is not responding.") 209 return 210 211 # Load the CID mapping from the JSON file 212 try: 213 with open(JSON_PATH, "r") as f: 214 blob_dict = json.load(f) 215 logger.info(f"Loaded blob CIDs from {JSON_PATH}.") 216 except Exception as e: 217 logger.error(f"Error loading cids.json from {JSON_PATH}: {e}") 218 return 219 220 # Determine the blob CIDs for the current hour from the modified structure 221 current_hour = datetime.now().strftime("%H") 222 logger.info(f"Current hour: {current_hour}") 223 224 current_entry = blob_dict.get(current_hour) 225 if not current_entry: 226 logger.warning(f"No entry found for hour {current_hour} in cids.json") 227 return 228 229 new_avatar_cid = current_entry.get("avatar") 230 new_banner_cid = current_entry.get("banner") if env_vars["update_banner"] else None 231 232 if not new_avatar_cid: 233 logger.warning(f"No avatar CID found for hour {current_hour}") 234 return 235 236 logger.info(f"Selected avatar CID: {new_avatar_cid}") 237 if env_vars["update_banner"]: 238 if new_banner_cid: 239 logger.info(f"Selected banner CID: {new_banner_cid}") 240 else: 241 logger.warning(f"UPDATE_BANNER is enabled, but no banner CID found for hour {current_hour}") 242 243 # Authenticate with the endpoint 244 client = Client(endpoint) 245 try: 246 client.login(env_vars["handle"], env_vars["password"]) 247 logger.info("Authentication successful.") 248 except Exception as e: 249 logger.error(f"Authentication failed: {e}") 250 return 251 252 # Fetch the current profile and update it with the new avatar (and optionally banner) 253 try: 254 current_profile_record = client.app.bsky.actor.profile.get( 255 client.me.did, "self" 256 ) 257 current_profile = current_profile_record.value 258 swap_record_cid = current_profile_record.cid 259 logger.info("Current profile record fetched successfully.") 260 except BadRequestError: 261 current_profile = swap_record_cid = None 262 logger.warning("Failed to fetch current profile record.") 263 264 old_description = current_profile.description if current_profile else None 265 old_display_name = current_profile.display_name if current_profile else None 266 old_banner = current_profile.banner if current_profile else None 267 268 avatar_metadata = get_blob_metadata(new_avatar_cid, env_vars["did"], endpoint) 269 if avatar_metadata is None: 270 logger.error(f"Could not retrieve metadata for avatar blob CID: {new_avatar_cid}") 271 return 272 273 banner_metadata = None 274 if env_vars["update_banner"] and new_banner_cid: 275 banner_metadata = get_blob_metadata(new_banner_cid, env_vars["did"], endpoint) 276 if banner_metadata is None: 277 logger.warning(f"Could not retrieve metadata for banner blob CID: {new_banner_cid}") 278 banner_metadata = old_banner 279 else: 280 banner_metadata = old_banner 281 282 # Update the profile with the new avatar and optionally the new banner 283 try: 284 client.com.atproto.repo.put_record( 285 models.ComAtprotoRepoPutRecord.Data( 286 collection=models.ids.AppBskyActorProfile, 287 repo=client.me.did, 288 rkey="self", 289 swap_record=swap_record_cid, 290 record=models.AppBskyActorProfile.Record( 291 avatar=avatar_metadata, 292 banner=banner_metadata, 293 description=old_description, 294 display_name=old_display_name, 295 ), 296 ) 297 ) 298 if env_vars["update_banner"] and new_banner_cid: 299 logger.info(f"Profile updated successfully with avatar CID {new_avatar_cid} and banner CID {new_banner_cid}") 300 else: 301 logger.info(f"Profile updated successfully with avatar CID: {new_avatar_cid}") 302 except Exception as e: 303 logger.error(f"Failed to update profile record: {e}") 304 305if __name__ == "__main__": 306 main()