Sunstead trust scoring project
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.3 kB View raw
1"""M7 AT-Proto-native output (PRD 6.11): publish each assessment as a public record. 2 3The service has its own DID/PDS account; it consumes state from the firehose and 4emits state as records on the network, so verdicts are auditable provenance rather 5than rows in a private file. Records use our own lexicon, `sh.tangled.trust.score`. 6 7Credentials via env (only these touch the network): 8 ATPROTO_PDS PDS base URL (e.g. https://pds.example) 9 ATPROTO_IDENTIFIER the service handle or DID 10 ATPROTO_PASSWORD an app password 11No password -> automatic dry-run (prints the records it would publish), so this is 12demoable and testable without an account. 13""" 14 15from __future__ import annotations 16 17import argparse 18import json 19import os 20 21from .config import LOG_DIR # noqa: F401 (keeps DATA_ROOT import side effects consistent) 22from .db import connection 23 24LEXICON = "sh.tangled.trust.score" 25 26 27def build_record(row: dict) -> dict: 28 """One score row -> an `sh.tangled.trust.score` record (PRD 6.11 lexicon).""" 29 reason = row.get("explanation") or {} 30 summary = reason.get("compliance_block") or reason.get("content_summary") \ 31 or "; ".join(reason.get("top_factors", [])[:2]) or "structural trust assessment" 32 return { 33 "$type": LEXICON, 34 "subject": row["did"], # the assessed contributor (a DID is the native key) 35 "calibratedProb": round(float(row["calibrated_prob"]), 4), 36 "decision": row["decision"], 37 "structuralTrust": round(float(row["structural_trust"]), 4), 38 "contentRisk": row.get("content_risk"), 39 "summary": summary[:280], 40 "createdAt": str(row["as_of"]), 41 } 42 43 44def _latest_unpublished(con, limit: int) -> list[dict]: 45 rows = con.execute( 46 "SELECT s.did, s.as_of, s.structural_trust, s.content_risk, s.calibrated_prob, " 47 " s.decision, s.explanation_json " 48 "FROM scores s " 49 "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l " 50 " ON s.did = l.did AND s.as_of = l.m " 51 "WHERE NOT EXISTS (SELECT 1 FROM published_records p WHERE p.did=s.did AND p.as_of=s.as_of) " 52 "LIMIT ?", [limit] 53 ).fetchall() 54 return [{"did": r[0], "as_of": r[1], "structural_trust": r[2], "content_risk": r[3], 55 "calibrated_prob": r[4], "decision": r[5], "explanation": json.loads(r[6] or "{}")} 56 for r in rows] 57 58 59def _session(pds: str, identifier: str, password: str) -> tuple[str, str]: 60 import httpx 61 62 r = httpx.post(f"{pds}/xrpc/com.atproto.server.createSession", 63 json={"identifier": identifier, "password": password}, timeout=30) 64 r.raise_for_status() 65 d = r.json() 66 return d["accessJwt"], d["did"] 67 68 69def _create_record(pds: str, jwt: str, repo_did: str, record: dict) -> str: 70 import httpx 71 72 r = httpx.post(f"{pds}/xrpc/com.atproto.repo.createRecord", 73 headers={"Authorization": f"Bearer {jwt}"}, 74 json={"repo": repo_did, "collection": LEXICON, "record": record}, timeout=30) 75 r.raise_for_status() 76 return r.json()["uri"] 77 78 79def publish(dry_run: bool = False, limit: int = 100) -> list[dict]: 80 pds = os.environ.get("ATPROTO_PDS") 81 ident = os.environ.get("ATPROTO_IDENTIFIER") 82 pw = os.environ.get("ATPROTO_PASSWORD") 83 live = bool(pds and ident and pw) and not dry_run 84 85 with connection(read_only=not live) as con: 86 rows = _latest_unpublished(con, limit) 87 out = [] 88 jwt = repo_did = None 89 if live: 90 jwt, repo_did = _session(pds, ident, pw) 91 for row in rows: 92 rec = build_record(row) 93 if live: 94 uri = _create_record(pds, jwt, repo_did, rec) 95 con.execute("INSERT INTO published_records VALUES (?,?,?) " 96 "ON CONFLICT DO NOTHING", [row["did"], row["as_of"], uri]) 97 else: 98 uri = None 99 out.append({"uri": uri, "record": rec}) 100 return out 101 102 103def main() -> None: 104 ap = argparse.ArgumentParser(description="publish trust assessments as AT-Proto records") 105 ap.add_argument("--dry-run", action="store_true", help="print records without publishing") 106 ap.add_argument("--limit", type=int, default=100) 107 args = ap.parse_args() 108 results = publish(dry_run=args.dry_run, limit=args.limit) 109 mode = "published" if (results and results[0]["uri"]) else "dry-run (no creds or --dry-run)" 110 for r in results: 111 print(f" {r['uri'] or '(dry-run)'} {r['record']['decision']:<12} " 112 f"{r['record']['calibratedProb']} {r['record']['subject']}") 113 print(f"[publish] {len(results)} assessments, {mode}") 114 115 116def demo() -> None: 117 """Self-check: a score row builds a valid record (no network).""" 118 row = {"did": "did:plc:carol", "as_of": "2026-06-25T00:00:00Z", "structural_trust": 1.0, 119 "content_risk": None, "calibrated_prob": 1.0, "decision": "fast_lane", 120 "explanation": {"top_factors": ["trust reaches did:plc:carol via maintainer -> alice"]}} 121 rec = build_record(row) 122 assert rec["$type"] == LEXICON and rec["subject"] == "did:plc:carol" 123 assert rec["decision"] == "fast_lane" and 0 <= rec["calibratedProb"] <= 1 124 assert rec["summary"] and rec["createdAt"] 125 print(json.dumps(rec, indent=2)) 126 print("ok") 127 128 129if __name__ == "__main__": 130 demo()