···
2
2
3
3
## Overview
4
4
5
5
-
This repository contains a Python script designed to automatically update your Bluesky avatar based on the current hour. The script leverages environment variables for configuration and reads a JSON file of blob CIDs to determine the appropriate avatar. This script was inspired by [@dame.is](https://bsky.app/profile/dame.is)'s blog post ['How I made an automated dynamic avatar for my Bluesky profile'](https://dame.is/blog/how-i-made-an-automated-dynamic-avatar-for-my-bluesky-profile).
5
5
+
This repository contains a Python script designed to automatically update your Bluesky avatar based on the current hour. The script uses environment variables for configuration and reads a JSON file of blob CIDs to determine the appropriate avatar. This script was inspired by [@dame.is](https://bsky.app/profile/dame.is)'s blog post ['How I made an automated dynamic avatar for my Bluesky profile'](https://dame.is/blog/how-i-made-an-automated-dynamic-avatar-for-my-bluesky-profile).
6
6
7
7
The script has been tested and is fully functional. It was developed on macOS but is intended for deployment on Linux.
8
8
···
42
42
```env
43
43
ENDPOINT=your_endpoint
44
44
HANDLE=your_handle
45
45
-
PASSWORD=your_password (app passwords are better)
45
45
+
PASSWORD=your_password (app passwords are recommended)
46
46
DID=your_did
47
47
```
48
48
···
73
73
- Authenticate using the AT Protocol.
74
74
- Update the Bluesky avatar accordingly.
75
75
76
76
-
Execution logs will be recorded in `logs/avatar_update.log` for your review.
76
76
+
Execution logs will be displayed directly in the console.
77
77
78
78
## Automating with Cron (Linux)
79
79
···
100
100
101
101
- **Endpoint not responding?** Verify that the Bluesky API endpoint is correct and accessible.
102
102
103
103
-
## Licence
103
103
+
## License
104
104
105
105
-
This project is released under the MIT Licence. See the [LICENSE](./LICENSE) file for full details.
105
105
+
This project is released under the MIT License. See the [LICENSE](./LICENSE) file for full details.
···
1
1
python-dotenv
2
2
-
atproto
2
2
+
atproto
3
3
+
requests
4
4
+
python-magic
5
5
+
python-crontab
···
1
1
-
#!/usr/bin/env python3
2
1
import os
3
3
-
import sys
4
4
-
import subprocess
2
2
+
import json
5
3
import logging
4
4
+
import requests
5
5
+
import magic
6
6
from datetime import datetime
7
7
from dotenv import load_dotenv
8
8
-
from logging.handlers import RotatingFileHandler
9
8
from atproto import Client, models
10
9
from atproto.exceptions import BadRequestError
10
10
+
import sys
11
11
+
from crontab import CronTab
12
12
+
13
13
+
if not hasattr(sys, 'real_prefix') and not (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
14
14
+
print("Error: This script must be run inside a virtual environment.")
15
15
+
sys.exit(1)
16
16
+
else:
17
17
+
logging.info("Virtual environment detected.")
11
18
12
19
# Define the paths
13
13
-
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
14
14
-
ASSETS_DIR = os.path.join(BASE_DIR, "../assets")
20
20
+
BASE_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
21
21
+
ASSETS_DIR = os.path.join(BASE_DIR, "assets")
15
22
ENV_PATH = os.path.join(ASSETS_DIR, ".env")
16
23
JSON_PATH = os.path.join(ASSETS_DIR, "cids.json")
17
17
-
LOG_PATH = os.path.join(BASE_DIR, "logs", "avatar_update.log")
18
18
-
19
19
-
# Ensure necessary directories exist
20
20
-
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
21
21
-
os.makedirs(ASSETS_DIR, exist_ok=True)
22
22
-
23
23
-
# Configure logging with log rotation (5MB per file, keeps last 5 logs)
24
24
-
log_handler = RotatingFileHandler(LOG_PATH, maxBytes=5 * 1024 * 1024, backupCount=5)
25
25
-
log_handler.setLevel(logging.DEBUG)
26
26
-
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
27
27
-
log_handler.setFormatter(log_formatter)
24
24
+
SCRIPT_PATH = __file__
28
25
26
26
+
# Configure basic console logging
29
27
console_handler = logging.StreamHandler()
30
30
-
console_handler.setLevel(logging.INFO)
28
28
+
console_handler.setLevel(logging.INFO) # Show only INFO and higher levels on console
31
29
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
32
30
console_handler.setFormatter(formatter)
33
33
-
logging.getLogger().addHandler(console_handler)
31
31
+
32
32
+
# Root logger setup
33
33
+
logger = logging.getLogger()
34
34
+
logger.setLevel(logging.INFO) # Set the root logger level to INFO
35
35
+
36
36
+
# Remove default handlers (to prevent duplication)
37
37
+
for handler in logger.handlers[:]:
38
38
+
logger.removeHandler(handler)
39
39
+
40
40
+
# Add the custom console handler
41
41
+
logger.addHandler(console_handler)
42
42
+
43
43
+
# Suppress httpx logging (this stops httpx internal logs)
44
44
+
logging.getLogger("httpx").setLevel(logging.WARNING) # Suppress INFO and DEBUG logs from httpx
45
45
+
46
46
+
# Log the start of the script
47
47
+
logger.info("Avatar update script started.")
48
48
+
49
49
+
def ensure_https(url):
50
50
+
if not url.startswith("http://") and not url.startswith("https://"):
51
51
+
return "https://" + url
52
52
+
if url.startswith("http://"):
53
53
+
return "https://" + url[7:]
54
54
+
return url
55
55
+
56
56
+
def is_endpoint_alive(url):
57
57
+
health_url = f"{url.rstrip('/')}/xrpc/_health"
58
58
+
try:
59
59
+
response = requests.get(health_url, timeout=5)
60
60
+
if response.status_code == 200:
61
61
+
logger.info(f"Endpoint {url} is alive and healthy.")
62
62
+
return True
63
63
+
else:
64
64
+
logger.warning(f"Endpoint {url} is not responding correctly: {response.status_code}")
65
65
+
return False
66
66
+
except requests.RequestException as e:
67
67
+
logger.error(f"Health check failed for {health_url}: {e}")
68
68
+
return False
69
69
+
70
70
+
def fetch_blob(did, cid, endpoint):
71
71
+
url = f"{endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}"
72
72
+
try:
73
73
+
response = requests.get(url, timeout=5)
74
74
+
response.raise_for_status()
75
75
+
logger.info(f"Fetched blob {cid} successfully.")
76
76
+
return response.content
77
77
+
except requests.RequestException as e:
78
78
+
logger.error(f"Failed to fetch blob {cid} for DID {did}: {e}")
79
79
+
return None
34
80
35
35
-
def main():
36
36
-
logging.info("Starting avatar update script...")
81
81
+
def get_blob_metadata(cid, did, endpoint):
82
82
+
logger.info(f"Retrieving metadata for blob {cid}.")
83
83
+
blob_data = fetch_blob(did, cid, endpoint)
84
84
+
if blob_data is None:
85
85
+
return None
37
86
38
38
-
if not os.path.exists(ENV_PATH):
39
39
-
logging.error(f"Missing .env file at {ENV_PATH}")
40
40
-
return
41
41
-
load_dotenv(ENV_PATH)
42
42
-
logging.info("Loaded .env file successfully.")
87
87
+
mime = magic.Magic(mime=True)
88
88
+
mime_type = mime.from_buffer(blob_data)
89
89
+
size = len(blob_data)
90
90
+
91
91
+
logger.debug(f"Blob metadata: MIME Type - {mime_type}, Size - {size}")
92
92
+
return {
93
93
+
"$type": "blob",
94
94
+
"ref": {"$link": cid},
95
95
+
"mimeType": mime_type,
96
96
+
"size": size,
97
97
+
}
43
98
99
99
+
def validate_environment_variables():
100
100
+
"""Validate environment variables and return a dictionary of values."""
44
101
endpoint = os.getenv("ENDPOINT")
45
102
handle = os.getenv("HANDLE")
46
103
password = os.getenv("PASSWORD")
104
104
+
did = os.getenv("DID")
47
105
48
48
-
if not (endpoint and handle and password):
49
49
-
logging.error("Missing environment variables. Ensure ENDPOINT, HANDLE, and PASSWORD are set in .env file.")
106
106
+
if not all([endpoint, handle, password, did]):
107
107
+
logger.error("Missing environment variables. Ensure ENDPOINT, HANDLE, PASSWORD, and DID are set in .env file.")
108
108
+
return None
109
109
+
return {
110
110
+
"endpoint": endpoint,
111
111
+
"handle": handle,
112
112
+
"password": password,
113
113
+
"did": did
114
114
+
}
115
115
+
116
116
+
def setup_cron_job():
117
117
+
# Get the path to the virtual environment's python interpreter
118
118
+
venv_python = os.path.join(BASE_DIR, "venv", "bin", "python3") # Update this if your venv is elsewhere
119
119
+
120
120
+
# Check if the cron job already exists
121
121
+
cron = CronTab(user=True)
122
122
+
job_exists = False
123
123
+
for job in cron:
124
124
+
if SCRIPT_PATH in job.command:
125
125
+
job_exists = True
126
126
+
break
127
127
+
128
128
+
if not job_exists:
129
129
+
# Set up the cron job to run every hour (top of the hour)
130
130
+
cron_command = f"{venv_python} {SCRIPT_PATH}"
131
131
+
job = cron.new(command=cron_command, comment="Avatar update script")
132
132
+
job.minute.on(0) # Run at the start of every hour
133
133
+
cron.write()
134
134
+
logger.info("Cron job has been set up to run every hour within the virtual environment.")
135
135
+
136
136
+
def main():
137
137
+
# Set up cron job (only once)
138
138
+
setup_cron_job()
139
139
+
140
140
+
logger.info("Starting avatar update process...")
141
141
+
142
142
+
if os.path.exists(ENV_PATH):
143
143
+
load_dotenv(ENV_PATH)
144
144
+
logger.info(f"Loaded environment from {ENV_PATH}")
145
145
+
else:
146
146
+
logger.error(f"Missing .env file at {ENV_PATH}")
50
147
return
51
148
52
52
-
if not os.path.exists(JSON_PATH) or os.path.getsize(JSON_PATH) == 0:
53
53
-
logging.error(f"Error: {JSON_PATH} is missing or empty.")
149
149
+
env_vars = validate_environment_variables()
150
150
+
if not env_vars:
151
151
+
return
152
152
+
153
153
+
endpoint = ensure_https(env_vars["endpoint"])
154
154
+
if not is_endpoint_alive(endpoint):
155
155
+
logger.error(f"Endpoint {endpoint} is not responding.")
54
156
return
55
157
56
158
try:
57
159
with open(JSON_PATH, "r") as f:
58
160
blob_dict = json.load(f)
59
59
-
logging.debug(f"Loaded blob CIDs from {JSON_PATH}: {blob_dict}")
161
161
+
logger.info(f"Loaded blob CIDs from {JSON_PATH}.")
60
162
except Exception as e:
61
61
-
logging.error(f"Error loading cids.json from {JSON_PATH}: {e}")
163
163
+
logger.error(f"Error loading cids.json from {JSON_PATH}: {e}")
62
164
return
63
165
64
166
current_hour = datetime.now().strftime("%H")
65
65
-
logging.info(f"Current hour: {current_hour}")
167
167
+
logger.info(f"Current hour: {current_hour}")
168
168
+
66
169
new_blob_cid = blob_dict.get(current_hour)
67
170
if not new_blob_cid:
68
68
-
logging.warning(f"No blob CID found for hour {current_hour}")
171
171
+
logger.warning(f"No blob CID found for hour {current_hour}")
69
172
return
70
70
-
logging.info(f"Selected blob CID: {new_blob_cid}")
173
173
+
174
174
+
logger.info(f"Selected blob CID: {new_blob_cid}")
71
175
72
176
client = Client(endpoint)
73
177
74
178
try:
75
75
-
profile = client.login(handle, password)
76
76
-
logging.info(f"Authentication successful. Welcome, {profile.display_name}")
77
77
-
did = profile.did
78
78
-
logging.info(f"User DID: {did}")
179
179
+
client.login(env_vars["handle"], env_vars["password"])
180
180
+
logger.info("Authentication successful.")
79
181
except Exception as e:
80
80
-
logging.error(f"Authentication failed: {e}")
182
182
+
logger.error(f"Authentication failed: {e}")
81
183
return
82
184
83
83
-
updated_profile_data = {
84
84
-
"$type": "app.bsky.actor.profile",
85
85
-
"avatar": {
86
86
-
"cid": new_blob_cid
87
87
-
}
88
88
-
}
185
185
+
try:
186
186
+
current_profile_record = client.app.bsky.actor.profile.get(
187
187
+
client.me.did, "self"
188
188
+
)
189
189
+
current_profile = current_profile_record.value
190
190
+
swap_record_cid = current_profile_record.cid
191
191
+
logger.info(f"Current profile record fetched successfully.")
192
192
+
except BadRequestError:
193
193
+
current_profile = swap_record_cid = None
194
194
+
logger.warning(f"Failed to fetch current profile record.")
195
195
+
196
196
+
old_description = old_display_name = None
197
197
+
if current_profile:
198
198
+
old_description = current_profile.description
199
199
+
old_display_name = current_profile.display_name
200
200
+
201
201
+
blob_metadata = get_blob_metadata(new_blob_cid, env_vars["did"], endpoint)
89
202
90
90
-
logging.debug(f"Updated profile data: {updated_profile_data}")
203
203
+
if blob_metadata is None:
204
204
+
logger.error(f"Could not retrieve metadata for blob CID: {new_blob_cid}")
205
205
+
return
91
206
92
207
try:
93
208
client.com.atproto.repo.put_record(
94
94
-
repo=did,
95
95
-
collection="app.bsky.actor.profile",
96
96
-
rkey="self",
97
97
-
record=updated_profile_data
209
209
+
models.ComAtprotoRepoPutRecord.Data(
210
210
+
collection=models.ids.AppBskyActorProfile,
211
211
+
repo=client.me.did,
212
212
+
rkey="self",
213
213
+
swap_record=swap_record_cid,
214
214
+
record=models.AppBskyActorProfile.Record(
215
215
+
avatar=blob_metadata,
216
216
+
banner=current_profile.banner if current_profile else None,
217
217
+
description=old_description,
218
218
+
display_name=old_display_name,
219
219
+
),
220
220
+
)
98
221
)
99
99
-
logging.info("Avatar updated successfully!")
222
222
+
logger.info(f"Avatar updated successfully with CID: {new_blob_cid}")
100
223
except Exception as e:
101
101
-
logging.error(f"Failed to update profile record: {e}")
224
224
+
logger.error(f"Failed to update profile record: {e}")
102
225
103
226
if __name__ == "__main__":
104
104
-
setup_cron_job()
105
227
main()