This repository has no description
1"""ATProto / PDS helpers for live issue loading."""
2
3from __future__ import annotations
4
5import re
6from typing import Any
7
8import httpx
9
10DEFAULT_PDS = "https://tngl.sh"
11ISSUE_COLLECTION = "sh.tangled.repo.issue"
12STATE_COLLECTION = "sh.tangled.repo.issue.state"
13REPO_COLLECTION = "sh.tangled.repo"
14STATE_OPEN = "sh.tangled.repo.issue.state.open"
15STATE_CLOSED = "sh.tangled.repo.issue.state.closed"
16
17_AT_URI_RE = re.compile(
18 r"^at://(?P<did>did:[^/]+)/(?P<collection>[^/]+)/(?P<rkey>[^/]+)$"
19)
20
21
22def parse_at_uri(uri: str) -> tuple[str, str, str]:
23 match = _AT_URI_RE.match(uri.strip())
24 if not match:
25 raise ValueError(f"Not a valid at:// URI: {uri!r}")
26 return match.group("did"), match.group("collection"), match.group("rkey")
27
28
29def pds_host_for_did(client: httpx.Client, did: str) -> str | None:
30 resp = client.get(f"https://plc.directory/{did}", timeout=15.0)
31 if resp.status_code != 200:
32 return None
33 doc = resp.json()
34 for svc in doc.get("service", []):
35 if svc.get("type") == "AtprotoPersonalDataServer":
36 endpoint = svc.get("serviceEndpoint")
37 if isinstance(endpoint, str):
38 return endpoint.rstrip("/")
39 return None
40
41
42def handle_from_plc(client: httpx.Client, did: str) -> str | None:
43 resp = client.get(f"https://plc.directory/{did}", timeout=15.0)
44 if resp.status_code != 200:
45 return None
46 for alias in resp.json().get("alsoKnownAs", []):
47 if isinstance(alias, str) and alias.startswith("at://"):
48 return alias.removeprefix("at://")
49 return None
50
51
52def get_record(
53 client: httpx.Client,
54 pds_host: str,
55 repo_did: str,
56 collection: str,
57 rkey: str,
58) -> dict[str, Any]:
59 resp = client.get(
60 f"{pds_host.rstrip('/')}/xrpc/com.atproto.repo.getRecord",
61 params={"repo": repo_did, "collection": collection, "rkey": rkey},
62 timeout=20.0,
63 )
64 resp.raise_for_status()
65 data = resp.json()
66 if not isinstance(data, dict):
67 raise RuntimeError("getRecord returned non-object")
68 return data
69
70
71def list_records(
72 client: httpx.Client,
73 pds_host: str,
74 repo_did: str,
75 collection: str,
76 *,
77 limit: int = 100,
78) -> list[dict[str, Any]]:
79 resp = client.get(
80 f"{pds_host.rstrip('/')}/xrpc/com.atproto.repo.listRecords",
81 params={"repo": repo_did, "collection": collection, "limit": limit},
82 timeout=20.0,
83 )
84 resp.raise_for_status()
85 page = resp.json().get("records") or []
86 return [r for r in page if isinstance(r, dict)]
87
88
89def issue_state_for_uri(
90 client: httpx.Client,
91 pds_host: str,
92 author_did: str,
93 issue_uri: str,
94 issue_rkey: str,
95) -> str:
96 try:
97 records = list_records(client, pds_host, author_did, STATE_COLLECTION, limit=200)
98 except Exception:
99 return "open"
100 for rec in records:
101 value = rec.get("value")
102 if not isinstance(value, dict):
103 continue
104 target = value.get("issue")
105 if target == issue_uri:
106 state = value.get("state")
107 if state == STATE_CLOSED:
108 return "closed"
109 return "open"
110 return "open"
111
112
113def repo_did_from_at_uri(uri: str) -> str | None:
114 if not uri.startswith("at://"):
115 return None
116 did = uri.removeprefix("at://").split("/", 1)[0]
117 return did if did.startswith("did:") else None
118
119
120def resolve_repo(
121 client: httpx.Client,
122 repo_ref: Any,
123) -> dict[str, Any]:
124 """Resolve issue's ``repo`` field to repo_did, knot_hostname, name, owner_handle."""
125 if not isinstance(repo_ref, str) or not repo_ref.strip():
126 raise RuntimeError("Issue record has no repo reference")
127
128 if repo_ref.startswith("at://"):
129 owner_did, collection, repo_rkey = parse_at_uri(repo_ref)
130 if collection != REPO_COLLECTION:
131 raise RuntimeError(f"Unexpected repo collection: {collection}")
132 pds = pds_host_for_did(client, owner_did) or DEFAULT_PDS
133 rec = get_record(client, pds, owner_did, REPO_COLLECTION, repo_rkey)
134 value = rec.get("value") if isinstance(rec.get("value"), dict) else {}
135 repo_did = value.get("repoDid") if isinstance(value.get("repoDid"), str) else owner_did
136 knot = value.get("knotHostname") or value.get("knotHost") or value.get("knot")
137 name = value.get("name")
138 owner_handle = handle_from_plc(client, owner_did)
139 if not isinstance(knot, str) or not knot.strip():
140 raise RuntimeError("Repo record missing knot / knotHostname")
141 return {
142 "repo_did": repo_did,
143 "repo_uri": repo_ref,
144 "repo_name": name if isinstance(name, str) else "",
145 "repo_owner_did": owner_did,
146 "repo_owner_handle": owner_handle or "",
147 "knot_hostname": knot.strip(),
148 }
149
150 if repo_ref.startswith("did:"):
151 repo_did = repo_ref
152 raise RuntimeError(
153 f"Issue references repo by DID only ({repo_did}). "
154 "Need at:// owner/repo record URI or a indexed tangled_repos row."
155 )
156
157 raise RuntimeError(f"Unsupported repo reference: {repo_ref!r}")