This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 8.1 kB View raw
1"""Load issue session context from a single issue URI (live PDS + knot).""" 2 3from __future__ import annotations 4 5import os 6from collections import deque 7from dataclasses import replace 8 9import httpx 10import psycopg 11from psycopg.rows import dict_row 12 13from agent.atproto import ( 14 DEFAULT_PDS, 15 ISSUE_COLLECTION, 16 get_record, 17 handle_from_plc, 18 issue_state_for_uri, 19 parse_at_uri, 20 pds_host_for_did, 21 resolve_repo, 22) 23from agent.context import IssueSessionContext 24from agent.tangled_client import DEFAULT_TIMEOUT, list_tree, normalize_tree_entries 25 26_ISSUE_SQL = """ 27 select 28 i.uri as issue_uri, 29 i.rkey as issue_rkey, 30 i.title, 31 i.body, 32 i.state, 33 i.author_did, 34 i.author_handle, 35 i.repo_did, 36 i.repo_uri, 37 coalesce(r.owner_handle, ti.handle) as repo_owner_handle, 38 r.name as repo_name, 39 r.knot_hostname 40 from tangled_issues i 41 left join tangled_repos r on r.repo_did = i.repo_did 42 left join tangled_identities ti 43 on ti.did = split_part(replace(i.repo_uri, 'at://', ''), '/', 1) 44 where i.uri = %s 45""" 46 47_REPO_SQL = """ 48 select repo_did, name as repo_name, owner_handle as repo_owner_handle, 49 knot_hostname, uri as repo_uri 50 from tangled_repos 51 where repo_did = %s 52 limit 1 53""" 54 55 56def _join_path(parent: str, name: str) -> str: 57 if not parent: 58 return name 59 return f"{parent.rstrip('/')}/{name}" 60 61 62def build_file_tree( 63 knot_hostname: str, 64 repo_did: str, 65 *, 66 ref: str = "HEAD", 67 max_paths: int = 400, 68 max_depth: int = 4, 69) -> list[str]: 70 paths: list[str] = [] 71 queue: deque[tuple[str, int]] = deque([("", 0)]) 72 73 with httpx.Client(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client: 74 while queue and len(paths) < max_paths: 75 directory, depth = queue.popleft() 76 try: 77 tree = list_tree( 78 client, 79 knot_hostname=knot_hostname, 80 repo_did=repo_did, 81 path=directory, 82 ref=ref, 83 ) 84 except Exception: 85 continue 86 for entry in normalize_tree_entries(tree): 87 full = _join_path(directory, entry["name"]) 88 if entry["type"] == "dir": 89 if depth + 1 < max_depth: 90 queue.append((full, depth + 1)) 91 else: 92 paths.append(full) 93 94 return sorted(paths) 95 96 97def _repo_from_db(repo_did: str) -> dict | None: 98 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 99 if not dsn: 100 return None 101 if "sslmode=" not in dsn: 102 sep = "&" if "?" in dsn else "?" 103 dsn = f"{dsn}{sep}sslmode=require" 104 try: 105 with psycopg.connect(dsn, row_factory=dict_row) as conn: 106 return conn.execute(_REPO_SQL, (repo_did,)).fetchone() 107 except Exception: 108 return None 109 110 111def _db_row(issue_uri: str) -> dict | None: 112 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 113 if not dsn: 114 return None 115 if "sslmode=" not in dsn: 116 sep = "&" if "?" in dsn else "?" 117 dsn = f"{dsn}{sep}sslmode=require" 118 try: 119 with psycopg.connect(dsn, row_factory=dict_row) as conn: 120 return conn.execute(_ISSUE_SQL, (issue_uri,)).fetchone() 121 except Exception: 122 return None 123 124 125def _resolve_repo_did_only( 126 client: httpx.Client, 127 repo_did: str, 128 db_row: dict | None, 129) -> dict[str, str]: 130 repo_row = _repo_from_db(repo_did) 131 knot = (repo_row or {}).get("knot_hostname") or (db_row or {}).get("knot_hostname") 132 name = (repo_row or {}).get("repo_name") or (db_row or {}).get("repo_name") 133 owner_handle = (repo_row or {}).get("repo_owner_handle") or (db_row or {}).get( 134 "repo_owner_handle" 135 ) 136 repo_uri = (repo_row or {}).get("repo_uri") or (db_row or {}).get("repo_uri") or "" 137 138 if isinstance(knot, str) and knot.strip(): 139 return { 140 "repo_did": repo_did, 141 "knot_hostname": knot.strip(), 142 "repo_name": name if isinstance(name, str) else "", 143 "repo_owner_handle": owner_handle if isinstance(owner_handle, str) else "", 144 "repo_uri": repo_uri if isinstance(repo_uri, str) else "", 145 } 146 147 raise RuntimeError( 148 f"Cannot resolve knot for repo_did={repo_did}. " 149 "Issue should reference at://owner/sh.tangled.repo/rkey when possible." 150 ) 151 152 153def fetch_issue_live(issue_uri: str) -> IssueSessionContext: 154 """Load everything from Tangled live (PDS + knot). DB not required.""" 155 author_did, collection, rkey = parse_at_uri(issue_uri) 156 if collection != ISSUE_COLLECTION: 157 raise ValueError(f"Expected {ISSUE_COLLECTION}, got {collection}") 158 159 db_row = _db_row(issue_uri) 160 161 with httpx.Client(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client: 162 pds = pds_host_for_did(client, author_did) or DEFAULT_PDS 163 record = get_record(client, pds, author_did, collection, rkey) 164 value = record.get("value") 165 if not isinstance(value, dict): 166 raise RuntimeError("Issue record missing value") 167 168 title = value.get("title") if isinstance(value.get("title"), str) else "" 169 body = value.get("body") if isinstance(value.get("body"), str) else "" 170 author_handle = handle_from_plc(client, author_did) or "" 171 state = issue_state_for_uri(client, pds, author_did, issue_uri, rkey) 172 173 repo_ref = value.get("repo") 174 if isinstance(repo_ref, str) and repo_ref.startswith("did:"): 175 repo = _resolve_repo_did_only(client, repo_ref, db_row) 176 else: 177 repo = resolve_repo(client, repo_ref) 178 179 file_tree = build_file_tree(repo["knot_hostname"], repo["repo_did"]) 180 181 return IssueSessionContext( 182 issue_uri=issue_uri, 183 issue_rkey=rkey, 184 title=title or (db_row or {}).get("title") or "", 185 body=body or (db_row or {}).get("body") or "", 186 state=state or (db_row or {}).get("state") or "open", 187 author_did=author_did, 188 author_handle=author_handle or (db_row or {}).get("author_handle") or "", 189 repo_did=repo["repo_did"], 190 repo_owner_handle=repo.get("repo_owner_handle") or "", 191 repo_name=repo.get("repo_name") or "", 192 knot_hostname=repo["knot_hostname"], 193 file_tree=file_tree, 194 ref="HEAD", 195 ) 196 197 198def load_issue_context( 199 issue_uri: str, 200 *, 201 fetch_file_tree: bool = True, 202 ref: str = "HEAD", 203) -> IssueSessionContext: 204 """Hydrate session from live Tangled APIs; DB is optional cache only.""" 205 ctx = fetch_issue_live(issue_uri) 206 if not fetch_file_tree: 207 return replace(ctx, file_tree=[], ref=ref) 208 if ref != ctx.ref: 209 return replace( 210 ctx, 211 file_tree=build_file_tree(ctx.knot_hostname, ctx.repo_did, ref=ref), 212 ref=ref, 213 ) 214 return ctx 215 216 217# Backwards-compatible alias 218fetch_issue_context = load_issue_context 219 220 221def resolve_issue_uri(issue_id: str) -> str: 222 """Resolve a full ``at://`` URI or a per-repo issue rkey via ``tangled_issues``.""" 223 raw = issue_id.strip() 224 if raw.startswith("at://"): 225 return raw 226 227 dsn = os.getenv("DB_CONNECTION_STRING", "").strip() 228 if not dsn: 229 raise RuntimeError( 230 "DB_CONNECTION_STRING is required to resolve issue rkey without at:// URI" 231 ) 232 if "sslmode=" not in dsn: 233 sep = "&" if "?" in dsn else "?" 234 dsn = f"{dsn}{sep}sslmode=require" 235 236 with psycopg.connect(dsn, row_factory=dict_row) as conn: 237 rows = conn.execute( 238 "select uri from tangled_issues where rkey = %s order by fetched_at desc", 239 (raw,), 240 ).fetchall() 241 242 if not rows: 243 raise ValueError(f"No issue with rkey {raw!r} in tangled_issues — pass full at:// URI") 244 if len(rows) > 1: 245 uris = [r["uri"] for r in rows[:5]] 246 raise ValueError( 247 f"Ambiguous rkey {raw!r} ({len(rows)} issues). Pass full at:// URI. Examples: {uris}" 248 ) 249 return rows[0]["uri"]