This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.2 kB View raw
1"""ATProto / PDS helpers for live issue loading.""" 2 3from __future__ import annotations 4 5import re 6from typing import Any 7 8import httpx 9 10DEFAULT_PDS = "https://tngl.sh" 11ISSUE_COLLECTION = "sh.tangled.repo.issue" 12STATE_COLLECTION = "sh.tangled.repo.issue.state" 13REPO_COLLECTION = "sh.tangled.repo" 14STATE_OPEN = "sh.tangled.repo.issue.state.open" 15STATE_CLOSED = "sh.tangled.repo.issue.state.closed" 16 17_AT_URI_RE = re.compile( 18 r"^at://(?P<did>did:[^/]+)/(?P<collection>[^/]+)/(?P<rkey>[^/]+)$" 19) 20 21 22def parse_at_uri(uri: str) -> tuple[str, str, str]: 23 match = _AT_URI_RE.match(uri.strip()) 24 if not match: 25 raise ValueError(f"Not a valid at:// URI: {uri!r}") 26 return match.group("did"), match.group("collection"), match.group("rkey") 27 28 29def pds_host_for_did(client: httpx.Client, did: str) -> str | None: 30 resp = client.get(f"https://plc.directory/{did}", timeout=15.0) 31 if resp.status_code != 200: 32 return None 33 doc = resp.json() 34 for svc in doc.get("service", []): 35 if svc.get("type") == "AtprotoPersonalDataServer": 36 endpoint = svc.get("serviceEndpoint") 37 if isinstance(endpoint, str): 38 return endpoint.rstrip("/") 39 return None 40 41 42def handle_from_plc(client: httpx.Client, did: str) -> str | None: 43 resp = client.get(f"https://plc.directory/{did}", timeout=15.0) 44 if resp.status_code != 200: 45 return None 46 for alias in resp.json().get("alsoKnownAs", []): 47 if isinstance(alias, str) and alias.startswith("at://"): 48 return alias.removeprefix("at://") 49 return None 50 51 52def get_record( 53 client: httpx.Client, 54 pds_host: str, 55 repo_did: str, 56 collection: str, 57 rkey: str, 58) -> dict[str, Any]: 59 resp = client.get( 60 f"{pds_host.rstrip('/')}/xrpc/com.atproto.repo.getRecord", 61 params={"repo": repo_did, "collection": collection, "rkey": rkey}, 62 timeout=20.0, 63 ) 64 resp.raise_for_status() 65 data = resp.json() 66 if not isinstance(data, dict): 67 raise RuntimeError("getRecord returned non-object") 68 return data 69 70 71def list_records( 72 client: httpx.Client, 73 pds_host: str, 74 repo_did: str, 75 collection: str, 76 *, 77 limit: int = 100, 78) -> list[dict[str, Any]]: 79 resp = client.get( 80 f"{pds_host.rstrip('/')}/xrpc/com.atproto.repo.listRecords", 81 params={"repo": repo_did, "collection": collection, "limit": limit}, 82 timeout=20.0, 83 ) 84 resp.raise_for_status() 85 page = resp.json().get("records") or [] 86 return [r for r in page if isinstance(r, dict)] 87 88 89def issue_state_for_uri( 90 client: httpx.Client, 91 pds_host: str, 92 author_did: str, 93 issue_uri: str, 94 issue_rkey: str, 95) -> str: 96 try: 97 records = list_records(client, pds_host, author_did, STATE_COLLECTION, limit=200) 98 except Exception: 99 return "open" 100 for rec in records: 101 value = rec.get("value") 102 if not isinstance(value, dict): 103 continue 104 target = value.get("issue") 105 if target == issue_uri: 106 state = value.get("state") 107 if state == STATE_CLOSED: 108 return "closed" 109 return "open" 110 return "open" 111 112 113def repo_did_from_at_uri(uri: str) -> str | None: 114 if not uri.startswith("at://"): 115 return None 116 did = uri.removeprefix("at://").split("/", 1)[0] 117 return did if did.startswith("did:") else None 118 119 120def resolve_repo( 121 client: httpx.Client, 122 repo_ref: Any, 123) -> dict[str, Any]: 124 """Resolve issue's ``repo`` field to repo_did, knot_hostname, name, owner_handle.""" 125 if not isinstance(repo_ref, str) or not repo_ref.strip(): 126 raise RuntimeError("Issue record has no repo reference") 127 128 if repo_ref.startswith("at://"): 129 owner_did, collection, repo_rkey = parse_at_uri(repo_ref) 130 if collection != REPO_COLLECTION: 131 raise RuntimeError(f"Unexpected repo collection: {collection}") 132 pds = pds_host_for_did(client, owner_did) or DEFAULT_PDS 133 rec = get_record(client, pds, owner_did, REPO_COLLECTION, repo_rkey) 134 value = rec.get("value") if isinstance(rec.get("value"), dict) else {} 135 repo_did = value.get("repoDid") if isinstance(value.get("repoDid"), str) else owner_did 136 knot = value.get("knotHostname") or value.get("knotHost") or value.get("knot") 137 name = value.get("name") 138 owner_handle = handle_from_plc(client, owner_did) 139 if not isinstance(knot, str) or not knot.strip(): 140 raise RuntimeError("Repo record missing knot / knotHostname") 141 return { 142 "repo_did": repo_did, 143 "repo_uri": repo_ref, 144 "repo_name": name if isinstance(name, str) else "", 145 "repo_owner_did": owner_did, 146 "repo_owner_handle": owner_handle or "", 147 "knot_hostname": knot.strip(), 148 } 149 150 if repo_ref.startswith("did:"): 151 repo_did = repo_ref 152 raise RuntimeError( 153 f"Issue references repo by DID only ({repo_did}). " 154 "Need at:// owner/repo record URI or a indexed tangled_repos row." 155 ) 156 157 raise RuntimeError(f"Unsupported repo reference: {repo_ref!r}")