This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1import os 2import sys 3import subprocess 4import logging 5import json 6import requests 7import magic 8from datetime import datetime 9from dotenv import load_dotenv 10from atproto import Client, models 11from atproto.exceptions import BadRequestError 12 13# Ensure the script is run inside a virtual environment 14if not hasattr(sys, 'real_prefix') and not (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix): 15 print("Error: This script must be run inside a virtual environment.") 16 sys.exit(1) 17else: 18 print("Virtual environment detected.") 19 20# Define paths 21BASE_DIR = os.path.abspath(os.path.dirname(__file__)) # /src/ 22REQ_PATH = os.path.abspath(os.path.join(BASE_DIR, "../requirements.txt")) # /requirements.txt 23ASSETS_DIR = os.path.join(BASE_DIR, "../assets") 24ENV_PATH = os.path.join(ASSETS_DIR, ".env") 25JSON_PATH = os.path.join(ASSETS_DIR, "cids.json") 26LOG_PATH = os.path.join(BASE_DIR, "avatar_update.log") 27 28# Configure logging 29logging.basicConfig( 30 filename=LOG_PATH, 31 level=logging.DEBUG, 32 format="%(asctime)s - %(levelname)s - %(message)s", 33) 34console_handler = logging.StreamHandler() 35console_handler.setLevel(logging.INFO) 36formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") 37console_handler.setFormatter(formatter) 38logging.getLogger().addHandler(console_handler) 39 40 41def install_and_rerun(): 42 """Install missing packages from requirements.txt and re-run the script.""" 43 if os.path.exists(REQ_PATH): 44 logging.info(f"Installing missing packages from {REQ_PATH}...") 45 try: 46 subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", REQ_PATH]) 47 logging.info("Packages installed successfully. Restarting script...") 48 os.execv(sys.executable, [sys.executable] + sys.argv) 49 except subprocess.CalledProcessError as e: 50 logging.error(f"Failed to install packages: {e}") 51 sys.exit(1) 52 else: 53 logging.error(f"requirements.txt not found at {REQ_PATH}, cannot install missing packages.") 54 sys.exit(1) 55 56 57# Try to import external packages; if any are missing, install them. 58try: 59 import requests 60 import magic 61 from atproto import Client, models 62 from atproto.exceptions import BadRequestError 63 from dotenv import load_dotenv 64except ImportError as e: 65 logging.error(f"Missing package(s): {e}") 66 install_and_rerun() 67 68 69def ensure_https(url): 70 """Ensure the URL starts with https://""" 71 if not url.startswith("http://") and not url.startswith("https://"): 72 return "https://" + url 73 if url.startswith("http://"): 74 return "https://" + url[7:] 75 return url 76 77 78def is_endpoint_alive(url): 79 """Check if the endpoint is alive by making a health check request.""" 80 health_url = f"{url.rstrip('/')}/xrpc/_health" 81 logging.info(f"Checking endpoint health: {health_url}") 82 try: 83 response = requests.get(health_url, timeout=5) 84 if response.status_code == 200: 85 logging.info("Endpoint is alive.") 86 return True 87 else: 88 logging.warning(f"Endpoint returned status code {response.status_code}") 89 return False 90 except requests.RequestException as e: 91 logging.error(f"Health check failed for {health_url}: {e}") 92 return False 93 94 95def fetch_blob(did, cid, endpoint): 96 """Fetch blob data from the given endpoint.""" 97 url = f"{endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 98 logging.info(f"Fetching blob: {url}") 99 try: 100 response = requests.get(url, timeout=5) 101 response.raise_for_status() 102 logging.info(f"Successfully fetched blob {cid} for DID {did}.") 103 return response.content 104 except requests.RequestException as e: 105 logging.error(f"Failed to fetch blob {cid} for DID {did}: {e}") 106 return None 107 108 109def get_blob_metadata(cid, did, endpoint): 110 """Retrieve metadata for a given blob CID.""" 111 blob_data = fetch_blob(did, cid, endpoint) 112 if blob_data is None: 113 logging.error("Blob data is empty.") 114 return None 115 116 mime = magic.Magic(mime=True) 117 mime_type = mime.from_buffer(blob_data) 118 size = len(blob_data) 119 120 logging.info(f"Retrieved metadata - MIME Type: {mime_type}, Size: {size} bytes") 121 122 return { 123 "$type": "blob", 124 "ref": {"$link": cid}, 125 "mimeType": mime_type, 126 "size": size, 127 } 128 129 130def setup_cron_job(): 131 """Set up a cron job to run the script hourly.""" 132 cron_job_command = f"0 * * * * {sys.executable} {os.path.abspath(__file__)} >> {LOG_PATH} 2>&1" 133 logging.info("Setting up cron job...") 134 135 try: 136 result = subprocess.run(["crontab", "-l"], capture_output=True, text=True) 137 cron_jobs = result.stdout.strip() if result.returncode == 0 else "" 138 139 if cron_job_command in cron_jobs: 140 logging.info("Cron job is already set.") 141 return 142 143 new_cron_jobs = cron_jobs + "\n" + cron_job_command if cron_jobs else cron_job_command 144 subprocess.run(["crontab"], input=new_cron_jobs, text=True, check=True) 145 logging.info("Cron job added successfully.") 146 except Exception as e: 147 logging.error(f"Failed to set up cron job: {e}") 148 149 150def main(): 151 logging.info("Starting avatar update script...") 152 153 if os.path.exists(ENV_PATH): 154 load_dotenv(ENV_PATH) 155 logging.info("Loaded .env file successfully.") 156 else: 157 logging.error(f"Missing .env file at {ENV_PATH}") 158 return 159 160 endpoint = os.getenv("ENDPOINT") 161 handle = os.getenv("HANDLE") 162 password = os.getenv("PASSWORD") 163 did = os.getenv("DID") 164 165 if not (endpoint and handle and password and did): 166 logging.error("Missing environment variables. Check .env file.") 167 return 168 169 endpoint = ensure_https(endpoint) 170 if not is_endpoint_alive(endpoint): 171 logging.error(f"Endpoint {endpoint} is not responding.") 172 return 173 174 try: 175 with open(JSON_PATH, "r") as f: 176 blob_dict = json.load(f) 177 logging.debug(f"Loaded blob CIDs from {JSON_PATH}: {blob_dict}") 178 except Exception as e: 179 logging.error(f"Error loading cids.json from {JSON_PATH}: {e}") 180 return 181 182 current_hour = datetime.now().strftime("%H") 183 logging.info(f"Current hour: {current_hour}") 184 new_blob_cid = blob_dict.get(current_hour) 185 if not new_blob_cid: 186 logging.warning(f"No blob CID found for hour {current_hour}") 187 return 188 logging.info(f"Selected blob CID: {new_blob_cid}") 189 190 client = Client(endpoint) 191 192 try: 193 client.login(handle, password) 194 logging.info("Authentication successful.") 195 except Exception as e: 196 logging.error(f"Authentication failed: {e}") 197 return 198 199 blob_metadata = get_blob_metadata(new_blob_cid, did, endpoint) 200 if blob_metadata is None: 201 logging.error("Blob metadata retrieval failed.") 202 return 203 204 try: 205 client.com.atproto.repo.put_record( 206 models.ComAtprotoRepoPutRecord.Data( 207 collection=models.ids.AppBskyActorProfile, 208 repo=client.me.did, 209 rkey="self", 210 record=models.AppBskyActorProfile.Record( 211 avatar=blob_metadata, 212 ), 213 ) 214 ) 215 logging.info("Avatar updated successfully!") 216 except Exception as e: 217 logging.error(f"Failed to update profile record: {e}") 218 219 220if __name__ == "__main__": 221 setup_cron_job() 222 main() 223