This repository has no description
1import os
2import sys
3import subprocess
4import logging
5import json
6import requests
7import magic
8from datetime import datetime
9from dotenv import load_dotenv
10from atproto import Client, models
11from atproto.exceptions import BadRequestError
12
13# Ensure the script is run inside a virtual environment
14if not hasattr(sys, 'real_prefix') and not (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
15 print("Error: This script must be run inside a virtual environment.")
16 sys.exit(1)
17else:
18 print("Virtual environment detected.")
19
20# Define paths
21BASE_DIR = os.path.abspath(os.path.dirname(__file__)) # /src/
22REQ_PATH = os.path.abspath(os.path.join(BASE_DIR, "../requirements.txt")) # /requirements.txt
23ASSETS_DIR = os.path.join(BASE_DIR, "../assets")
24ENV_PATH = os.path.join(ASSETS_DIR, ".env")
25JSON_PATH = os.path.join(ASSETS_DIR, "cids.json")
26LOG_PATH = os.path.join(BASE_DIR, "avatar_update.log")
27
28# Configure logging
29logging.basicConfig(
30 filename=LOG_PATH,
31 level=logging.DEBUG,
32 format="%(asctime)s - %(levelname)s - %(message)s",
33)
34console_handler = logging.StreamHandler()
35console_handler.setLevel(logging.INFO)
36formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
37console_handler.setFormatter(formatter)
38logging.getLogger().addHandler(console_handler)
39
40
41def install_and_rerun():
42 """Install missing packages from requirements.txt and re-run the script."""
43 if os.path.exists(REQ_PATH):
44 logging.info(f"Installing missing packages from {REQ_PATH}...")
45 try:
46 subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", REQ_PATH])
47 logging.info("Packages installed successfully. Restarting script...")
48 os.execv(sys.executable, [sys.executable] + sys.argv)
49 except subprocess.CalledProcessError as e:
50 logging.error(f"Failed to install packages: {e}")
51 sys.exit(1)
52 else:
53 logging.error(f"requirements.txt not found at {REQ_PATH}, cannot install missing packages.")
54 sys.exit(1)
55
56
57# Try to import external packages; if any are missing, install them.
58try:
59 import requests
60 import magic
61 from atproto import Client, models
62 from atproto.exceptions import BadRequestError
63 from dotenv import load_dotenv
64except ImportError as e:
65 logging.error(f"Missing package(s): {e}")
66 install_and_rerun()
67
68
69def ensure_https(url):
70 """Ensure the URL starts with https://"""
71 if not url.startswith("http://") and not url.startswith("https://"):
72 return "https://" + url
73 if url.startswith("http://"):
74 return "https://" + url[7:]
75 return url
76
77
78def is_endpoint_alive(url):
79 """Check if the endpoint is alive by making a health check request."""
80 health_url = f"{url.rstrip('/')}/xrpc/_health"
81 logging.info(f"Checking endpoint health: {health_url}")
82 try:
83 response = requests.get(health_url, timeout=5)
84 if response.status_code == 200:
85 logging.info("Endpoint is alive.")
86 return True
87 else:
88 logging.warning(f"Endpoint returned status code {response.status_code}")
89 return False
90 except requests.RequestException as e:
91 logging.error(f"Health check failed for {health_url}: {e}")
92 return False
93
94
95def fetch_blob(did, cid, endpoint):
96 """Fetch blob data from the given endpoint."""
97 url = f"{endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}"
98 logging.info(f"Fetching blob: {url}")
99 try:
100 response = requests.get(url, timeout=5)
101 response.raise_for_status()
102 logging.info(f"Successfully fetched blob {cid} for DID {did}.")
103 return response.content
104 except requests.RequestException as e:
105 logging.error(f"Failed to fetch blob {cid} for DID {did}: {e}")
106 return None
107
108
109def get_blob_metadata(cid, did, endpoint):
110 """Retrieve metadata for a given blob CID."""
111 blob_data = fetch_blob(did, cid, endpoint)
112 if blob_data is None:
113 logging.error("Blob data is empty.")
114 return None
115
116 mime = magic.Magic(mime=True)
117 mime_type = mime.from_buffer(blob_data)
118 size = len(blob_data)
119
120 logging.info(f"Retrieved metadata - MIME Type: {mime_type}, Size: {size} bytes")
121
122 return {
123 "$type": "blob",
124 "ref": {"$link": cid},
125 "mimeType": mime_type,
126 "size": size,
127 }
128
129
130def setup_cron_job():
131 """Set up a cron job to run the script hourly."""
132 cron_job_command = f"0 * * * * {sys.executable} {os.path.abspath(__file__)} >> {LOG_PATH} 2>&1"
133 logging.info("Setting up cron job...")
134
135 try:
136 result = subprocess.run(["crontab", "-l"], capture_output=True, text=True)
137 cron_jobs = result.stdout.strip() if result.returncode == 0 else ""
138
139 if cron_job_command in cron_jobs:
140 logging.info("Cron job is already set.")
141 return
142
143 new_cron_jobs = cron_jobs + "\n" + cron_job_command if cron_jobs else cron_job_command
144 subprocess.run(["crontab"], input=new_cron_jobs, text=True, check=True)
145 logging.info("Cron job added successfully.")
146 except Exception as e:
147 logging.error(f"Failed to set up cron job: {e}")
148
149
150def main():
151 logging.info("Starting avatar update script...")
152
153 if os.path.exists(ENV_PATH):
154 load_dotenv(ENV_PATH)
155 logging.info("Loaded .env file successfully.")
156 else:
157 logging.error(f"Missing .env file at {ENV_PATH}")
158 return
159
160 endpoint = os.getenv("ENDPOINT")
161 handle = os.getenv("HANDLE")
162 password = os.getenv("PASSWORD")
163 did = os.getenv("DID")
164
165 if not (endpoint and handle and password and did):
166 logging.error("Missing environment variables. Check .env file.")
167 return
168
169 endpoint = ensure_https(endpoint)
170 if not is_endpoint_alive(endpoint):
171 logging.error(f"Endpoint {endpoint} is not responding.")
172 return
173
174 try:
175 with open(JSON_PATH, "r") as f:
176 blob_dict = json.load(f)
177 logging.debug(f"Loaded blob CIDs from {JSON_PATH}: {blob_dict}")
178 except Exception as e:
179 logging.error(f"Error loading cids.json from {JSON_PATH}: {e}")
180 return
181
182 current_hour = datetime.now().strftime("%H")
183 logging.info(f"Current hour: {current_hour}")
184 new_blob_cid = blob_dict.get(current_hour)
185 if not new_blob_cid:
186 logging.warning(f"No blob CID found for hour {current_hour}")
187 return
188 logging.info(f"Selected blob CID: {new_blob_cid}")
189
190 client = Client(endpoint)
191
192 try:
193 client.login(handle, password)
194 logging.info("Authentication successful.")
195 except Exception as e:
196 logging.error(f"Authentication failed: {e}")
197 return
198
199 blob_metadata = get_blob_metadata(new_blob_cid, did, endpoint)
200 if blob_metadata is None:
201 logging.error("Blob metadata retrieval failed.")
202 return
203
204 try:
205 client.com.atproto.repo.put_record(
206 models.ComAtprotoRepoPutRecord.Data(
207 collection=models.ids.AppBskyActorProfile,
208 repo=client.me.did,
209 rkey="self",
210 record=models.AppBskyActorProfile.Record(
211 avatar=blob_metadata,
212 ),
213 )
214 )
215 logging.info("Avatar updated successfully!")
216 except Exception as e:
217 logging.error(f"Failed to update profile record: {e}")
218
219
220if __name__ == "__main__":
221 setup_cron_job()
222 main()
223