This repository has no description
1import os
2import json
3import logging
4import requests
5import magic
6from datetime import datetime
7from dotenv import load_dotenv
8from atproto import Client, models
9from atproto.exceptions import BadRequestError
10import sys
11from crontab import CronTab
12from logging.handlers import TimedRotatingFileHandler
13import glob
14import time
15
16# Ensure the script is run inside a virtual environment
17if not hasattr(sys, 'real_prefix') and not (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
18 print("Error: This script must be run inside a virtual environment.")
19 sys.exit(1)
20else:
21 logging.info("Virtual environment detected.")
22
23# Define the paths
24BASE_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
25ASSETS_DIR = os.path.join(BASE_DIR, "assets")
26ENV_PATH = os.path.join(ASSETS_DIR, ".env")
27JSON_PATH = os.path.join(ASSETS_DIR, "cids.json")
28SCRIPT_PATH = os.path.abspath(__file__)
29
30# Define the log file directory and log file path
31log_dir = os.path.join(BASE_DIR, "logs")
32if not os.path.exists(log_dir):
33 os.makedirs(log_dir)
34
35def cleanup_old_logs(log_directory, days=30):
36 """Deletes log files older than the specified number of days."""
37 cutoff = time.time() - (days * 86400) # Convert days to seconds
38 for log_file in glob.glob(os.path.join(log_directory, "update.log")):
39 if os.path.isfile(log_file) and os.path.getmtime(log_file) < cutoff:
40 os.remove(log_file)
41 print(f"Deleted old log: {log_file}")
42
43# Cleanup logs older than 30 days before setting up new logging
44cleanup_old_logs(log_dir, days=30)
45
46# Use a fixed log file name for the current log; TimedRotatingFileHandler will manage rotations.
47log_file_path = os.path.join(log_dir, "update.log")
48
49# Configure logging to both console and file with bi-weekly rotation
50console_handler = logging.StreamHandler()
51console_handler.setLevel(logging.INFO) # Show only INFO and higher levels on console
52
53# Create a timed rotating file handler (rotate every 14 days, keep up to 5 backups)
54file_handler = TimedRotatingFileHandler(
55 log_file_path,
56 when="D", # 'D' stands for days
57 interval=14, # Rotate every 14 days
58 backupCount=5
59)
60file_handler.setLevel(logging.INFO) # Save all logs to file
61
62formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
63console_handler.setFormatter(formatter)
64file_handler.setFormatter(formatter)
65
66# Root logger setup
67logger = logging.getLogger()
68logger.setLevel(logging.INFO) # Set the root logger level to INFO
69
70# Remove default handlers (to prevent duplication)
71for handler in logger.handlers[:]:
72 logger.removeHandler(handler)
73
74# Add the custom handlers
75logger.addHandler(console_handler)
76logger.addHandler(file_handler)
77
78# Suppress httpx logging (this stops httpx internal logs)
79logging.getLogger("httpx").setLevel(logging.WARNING) # Suppress INFO and DEBUG logs from httpx
80
81def ensure_https(url):
82 """Ensure the URL starts with https://."""
83 if not url.startswith("http://") and not url.startswith("https://"):
84 return "https://" + url
85 if url.startswith("http://"):
86 return "https://" + url[7:]
87 return url
88
89def is_endpoint_alive(url):
90 """Check if the provided endpoint is alive by making a health check request."""
91 health_url = f"{url.rstrip('/')}/xrpc/_health"
92 try:
93 response = requests.get(health_url, timeout=5)
94 if response.status_code == 200:
95 logger.info(f"Endpoint {url} is alive and healthy.")
96 return True
97 else:
98 logger.warning(f"Endpoint {url} is not responding correctly: {response.status_code}")
99 return False
100 except requests.RequestException as e:
101 logger.error(f"Health check failed for {health_url}: {e}")
102 return False
103
104def fetch_blob(did, cid, endpoint):
105 """Fetch the blob from the endpoint."""
106 url = f"{endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}"
107 try:
108 response = requests.get(url, timeout=5)
109 response.raise_for_status()
110 logger.info(f"Fetched blob {cid} successfully.")
111 return response.content
112 except requests.RequestException as e:
113 logger.error(f"Failed to fetch blob {cid} for DID {did}: {e}")
114 return None
115
116def get_blob_metadata(cid, did, endpoint):
117 """Get the metadata for the blob."""
118 try:
119 logger.info(f"Retrieving metadata for blob {cid}.")
120 blob_data = fetch_blob(did, cid, endpoint)
121 if blob_data is None:
122 return None
123
124 mime = magic.Magic(mime=True)
125 mime_type = mime.from_buffer(blob_data)
126 size = len(blob_data)
127
128 logger.debug(f"Blob metadata: MIME Type - {mime_type}, Size - {size}")
129 return {
130 "$type": "blob",
131 "ref": {"$link": cid},
132 "mimeType": mime_type,
133 "size": size,
134 }
135 except Exception as e:
136 logger.error(f"Error retrieving metadata for blob {cid}: {e}")
137 return None
138
139def validate_environment_variables():
140 """Validate environment variables and return a dictionary of values."""
141 endpoint = os.getenv("ENDPOINT")
142 handle = os.getenv("HANDLE")
143 password = os.getenv("PASSWORD")
144 did = os.getenv("DID")
145 update_banner = os.getenv("UPDATE_BANNER", "false").lower() == "true"
146
147 if not all([endpoint, handle, password, did]):
148 logger.error("Missing environment variables. Ensure ENDPOINT, HANDLE, PASSWORD, and DID are set in .env file.")
149 return None
150 return {
151 "endpoint": endpoint,
152 "handle": handle,
153 "password": password,
154 "did": did,
155 "update_banner": update_banner
156 }
157
158def setup_cron_job():
159 """Set up the cron job to run every hour."""
160 # Get the path to the virtual environment's Python interpreter
161 venv_python = os.path.join(BASE_DIR, ".venv", "bin", "python3")
162
163 # Check if the cron job already exists
164 cron = CronTab(user=True)
165 job_exists = False
166 for job in cron:
167 if SCRIPT_PATH in job.command:
168 job_exists = True
169 break
170
171 if not job_exists:
172 # Set up the cron job to run every hour (top of the hour)
173 cron_command = f"{venv_python} {SCRIPT_PATH}"
174 job = cron.new(command=cron_command, comment="Avatar update script")
175 job.minute.on(0) # Run at the start of every hour
176 cron.write()
177 logger.info("Cron job has been set up to run every hour within the virtual environment.")
178 else:
179 logger.info("Cron job already exists.")
180
181def main():
182 """Main function to run the avatar and banner update process."""
183 # Set up the cron job (only once)
184 try:
185 setup_cron_job()
186 except Exception as e:
187 logger.error(f"Error setting cron job: {e}")
188 pass
189
190 logger.info("Script started.")
191 logger.info("Starting update process...")
192
193 # Load environment variables from the .env file
194 if os.path.exists(ENV_PATH):
195 load_dotenv(ENV_PATH)
196 logger.info(f"Loaded environment from {ENV_PATH}")
197 else:
198 logger.error(f"Missing .env file at {ENV_PATH}")
199 return
200
201 env_vars = validate_environment_variables()
202 if not env_vars:
203 return
204
205 # Ensure endpoint URL is correct and alive
206 endpoint = ensure_https(env_vars["endpoint"])
207 if not is_endpoint_alive(endpoint):
208 logger.error(f"Endpoint {endpoint} is not responding.")
209 return
210
211 # Load the CID mapping from the JSON file
212 try:
213 with open(JSON_PATH, "r") as f:
214 blob_dict = json.load(f)
215 logger.info(f"Loaded blob CIDs from {JSON_PATH}.")
216 except Exception as e:
217 logger.error(f"Error loading cids.json from {JSON_PATH}: {e}")
218 return
219
220 # Determine the blob CIDs for the current hour from the modified structure
221 current_hour = datetime.now().strftime("%H")
222 logger.info(f"Current hour: {current_hour}")
223
224 current_entry = blob_dict.get(current_hour)
225 if not current_entry:
226 logger.warning(f"No entry found for hour {current_hour} in cids.json")
227 return
228
229 new_avatar_cid = current_entry.get("avatar")
230 new_banner_cid = current_entry.get("banner") if env_vars["update_banner"] else None
231
232 if not new_avatar_cid:
233 logger.warning(f"No avatar CID found for hour {current_hour}")
234 return
235
236 logger.info(f"Selected avatar CID: {new_avatar_cid}")
237 if env_vars["update_banner"]:
238 if new_banner_cid:
239 logger.info(f"Selected banner CID: {new_banner_cid}")
240 else:
241 logger.warning(f"UPDATE_BANNER is enabled, but no banner CID found for hour {current_hour}")
242
243 # Authenticate with the endpoint
244 client = Client(endpoint)
245 try:
246 client.login(env_vars["handle"], env_vars["password"])
247 logger.info("Authentication successful.")
248 except Exception as e:
249 logger.error(f"Authentication failed: {e}")
250 return
251
252 # Fetch the current profile and update it with the new avatar (and optionally banner)
253 try:
254 current_profile_record = client.app.bsky.actor.profile.get(
255 client.me.did, "self"
256 )
257 current_profile = current_profile_record.value
258 swap_record_cid = current_profile_record.cid
259 logger.info("Current profile record fetched successfully.")
260 except BadRequestError:
261 current_profile = swap_record_cid = None
262 logger.warning("Failed to fetch current profile record.")
263
264 old_description = current_profile.description if current_profile else None
265 old_display_name = current_profile.display_name if current_profile else None
266 old_banner = current_profile.banner if current_profile else None
267
268 avatar_metadata = get_blob_metadata(new_avatar_cid, env_vars["did"], endpoint)
269 if avatar_metadata is None:
270 logger.error(f"Could not retrieve metadata for avatar blob CID: {new_avatar_cid}")
271 return
272
273 banner_metadata = None
274 if env_vars["update_banner"] and new_banner_cid:
275 banner_metadata = get_blob_metadata(new_banner_cid, env_vars["did"], endpoint)
276 if banner_metadata is None:
277 logger.warning(f"Could not retrieve metadata for banner blob CID: {new_banner_cid}")
278 banner_metadata = old_banner
279 else:
280 banner_metadata = old_banner
281
282 # Update the profile with the new avatar and optionally the new banner
283 try:
284 client.com.atproto.repo.put_record(
285 models.ComAtprotoRepoPutRecord.Data(
286 collection=models.ids.AppBskyActorProfile,
287 repo=client.me.did,
288 rkey="self",
289 swap_record=swap_record_cid,
290 record=models.AppBskyActorProfile.Record(
291 avatar=avatar_metadata,
292 banner=banner_metadata,
293 description=old_description,
294 display_name=old_display_name,
295 ),
296 )
297 )
298 if env_vars["update_banner"] and new_banner_cid:
299 logger.info(f"Profile updated successfully with avatar CID {new_avatar_cid} and banner CID {new_banner_cid}")
300 else:
301 logger.info(f"Profile updated successfully with avatar CID: {new_avatar_cid}")
302 except Exception as e:
303 logger.error(f"Failed to update profile record: {e}")
304
305if __name__ == "__main__":
306 main()