This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1import os 2import json 3import logging 4import requests 5import magic 6from datetime import datetime 7from dotenv import load_dotenv 8from atproto import Client, models 9from atproto.exceptions import BadRequestError 10 11# Define the paths 12BASE_DIR = os.path.abspath(os.path.dirname(__file__)) 13ASSETS_DIR = os.path.join(BASE_DIR, "../assets") 14ENV_PATH = os.path.join(ASSETS_DIR, ".env") 15JSON_PATH = os.path.join(ASSETS_DIR, "cids.json") 16 17# Configure logging 18logging.basicConfig( 19 filename="avatar_update.log", 20 level=logging.DEBUG, 21 format="%(asctime)s - %(levelname)s - %(message)s", 22) 23console_handler = logging.StreamHandler() 24console_handler.setLevel(logging.INFO) 25formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") 26console_handler.setFormatter(formatter) 27logging.getLogger().addHandler(console_handler) 28 29 30def ensure_https(url): 31 if not url.startswith("http://") and not url.startswith("https://"): 32 return "https://" + url 33 if url.startswith("http://"): 34 return "https://" + url[7:] 35 return url 36 37 38def is_endpoint_alive(url): 39 health_url = f"{url.rstrip('/')}/xrpc/_health" 40 try: 41 response = requests.get(health_url, timeout=5) 42 return response.status_code == 200 43 except requests.RequestException as e: 44 logging.error(f"Health check failed for {health_url}: {e}") 45 return False 46 47 48def fetch_blob(did, cid, endpoint): 49 url = f"{endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 50 try: 51 response = requests.get(url, timeout=5) 52 response.raise_for_status() 53 return response.content 54 except requests.RequestException as e: 55 logging.error(f"Failed to fetch blob {cid} for DID {did}: {e}") 56 return None 57 58 59def get_blob_metadata(cid, did, endpoint): 60 blob_data = fetch_blob(did, cid, endpoint) 61 if blob_data is None: 62 return None 63 64 mime = magic.Magic(mime=True) 65 mime_type = mime.from_buffer(blob_data) 66 size = len(blob_data) 67 68 return { 69 "$type": "blob", 70 "ref": {"$link": cid}, 71 "mimeType": mime_type, 72 "size": size, 73 } 74 75def main(): 76 logging.info("Starting avatar update script...") 77 78 if os.path.exists(ENV_PATH): 79 load_dotenv(ENV_PATH) 80 else: 81 logging.error(f"Missing .env file at {ENV_PATH}") 82 return 83 84 endpoint = os.getenv("ENDPOINT") 85 handle = os.getenv("HANDLE") 86 password = os.getenv("PASSWORD") 87 did = os.getenv("DID") 88 89 if not (endpoint and handle and password and did): 90 logging.error( 91 "Missing environment variables. Ensure ENDPOINT, HANDLE, PASSWORD, and DID are set in .env file." 92 ) 93 return 94 95 endpoint = ensure_https(endpoint) 96 if not is_endpoint_alive(endpoint): 97 logging.error(f"Endpoint {endpoint} is not responding.") 98 return 99 100 try: 101 with open(JSON_PATH, "r") as f: 102 blob_dict = json.load(f) 103 logging.debug(f"Loaded blob CIDs from {JSON_PATH}: {blob_dict}") 104 except Exception as e: 105 logging.error(f"Error loading cids.json from {JSON_PATH}: {e}") 106 return 107 108 current_hour = datetime.now().strftime("%H") 109 logging.info(f"Current hour: {current_hour}") 110 new_blob_cid = blob_dict.get(current_hour) 111 if not new_blob_cid: 112 logging.warning(f"No blob CID found for hour {current_hour}") 113 return 114 logging.info(f"Selected blob CID: {new_blob_cid}") 115 116 client = Client(endpoint) 117 118 try: 119 client.login(handle, password) 120 logging.info("Authentication successful.") 121 except Exception as e: 122 logging.error(f"Authentication failed: {e}") 123 return 124 125 try: 126 current_profile_record = client.app.bsky.actor.profile.get( 127 client.me.did, "self" 128 ) 129 current_profile = current_profile_record.value 130 swap_record_cid = current_profile_record.cid 131 except BadRequestError: 132 current_profile = swap_record_cid = None 133 134 old_description = old_display_name = None 135 if current_profile: 136 old_description = current_profile.description 137 old_display_name = current_profile.display_name 138 139 blob_metadata = get_blob_metadata(new_blob_cid, did, endpoint) 140 141 if blob_metadata is None: 142 logging.error(f"Could not retrieve metadata for blob CID: {new_blob_cid}") 143 return 144 145 try: 146 client.com.atproto.repo.put_record( 147 models.ComAtprotoRepoPutRecord.Data( 148 collection=models.ids.AppBskyActorProfile, 149 repo=client.me.did, 150 rkey="self", 151 swap_record=swap_record_cid, 152 record=models.AppBskyActorProfile.Record( 153 avatar=blob_metadata, 154 banner=current_profile.banner if current_profile else None, 155 description=old_description, 156 display_name=old_display_name, 157 ), 158 ) 159 ) 160 logging.info("Avatar updated successfully!") 161 except Exception as e: 162 logging.error(f"Failed to update profile record: {e}") 163 164 165if __name__ == "__main__": 166 main()