Sunstead trust scoring project
1"""M7 AT-Proto-native output (PRD 6.11): publish each assessment as a public record.
2
3The service has its own DID/PDS account; it consumes state from the firehose and
4emits state as records on the network, so verdicts are auditable provenance rather
5than rows in a private file. Records use our own lexicon, `sh.tangled.trust.score`.
6
7Credentials via env (only these touch the network):
8 ATPROTO_PDS PDS base URL (e.g. https://pds.example)
9 ATPROTO_IDENTIFIER the service handle or DID
10 ATPROTO_PASSWORD an app password
11No password -> automatic dry-run (prints the records it would publish), so this is
12demoable and testable without an account.
13"""
14
15from __future__ import annotations
16
17import argparse
18import json
19import os
20
21from .config import LOG_DIR # noqa: F401 (keeps DATA_ROOT import side effects consistent)
22from .db import connection
23
24LEXICON = "sh.tangled.trust.score"
25
26
27def build_record(row: dict) -> dict:
28 """One score row -> an `sh.tangled.trust.score` record (PRD 6.11 lexicon)."""
29 reason = row.get("explanation") or {}
30 summary = reason.get("compliance_block") or reason.get("content_summary") \
31 or "; ".join(reason.get("top_factors", [])[:2]) or "structural trust assessment"
32 return {
33 "$type": LEXICON,
34 "subject": row["did"], # the assessed contributor (a DID is the native key)
35 "calibratedProb": round(float(row["calibrated_prob"]), 4),
36 "decision": row["decision"],
37 "structuralTrust": round(float(row["structural_trust"]), 4),
38 "contentRisk": row.get("content_risk"),
39 "summary": summary[:280],
40 "createdAt": str(row["as_of"]),
41 }
42
43
44def _latest_unpublished(con, limit: int) -> list[dict]:
45 rows = con.execute(
46 "SELECT s.did, s.as_of, s.structural_trust, s.content_risk, s.calibrated_prob, "
47 " s.decision, s.explanation_json "
48 "FROM scores s "
49 "JOIN (SELECT did, max(as_of) m FROM scores GROUP BY did) l "
50 " ON s.did = l.did AND s.as_of = l.m "
51 "WHERE NOT EXISTS (SELECT 1 FROM published_records p WHERE p.did=s.did AND p.as_of=s.as_of) "
52 "LIMIT ?", [limit]
53 ).fetchall()
54 return [{"did": r[0], "as_of": r[1], "structural_trust": r[2], "content_risk": r[3],
55 "calibrated_prob": r[4], "decision": r[5], "explanation": json.loads(r[6] or "{}")}
56 for r in rows]
57
58
59def _session(pds: str, identifier: str, password: str) -> tuple[str, str]:
60 import httpx
61
62 r = httpx.post(f"{pds}/xrpc/com.atproto.server.createSession",
63 json={"identifier": identifier, "password": password}, timeout=30)
64 r.raise_for_status()
65 d = r.json()
66 return d["accessJwt"], d["did"]
67
68
69def _create_record(pds: str, jwt: str, repo_did: str, record: dict) -> str:
70 import httpx
71
72 r = httpx.post(f"{pds}/xrpc/com.atproto.repo.createRecord",
73 headers={"Authorization": f"Bearer {jwt}"},
74 json={"repo": repo_did, "collection": LEXICON, "record": record}, timeout=30)
75 r.raise_for_status()
76 return r.json()["uri"]
77
78
79def publish(dry_run: bool = False, limit: int = 100) -> list[dict]:
80 pds = os.environ.get("ATPROTO_PDS")
81 ident = os.environ.get("ATPROTO_IDENTIFIER")
82 pw = os.environ.get("ATPROTO_PASSWORD")
83 live = bool(pds and ident and pw) and not dry_run
84
85 with connection(read_only=not live) as con:
86 rows = _latest_unpublished(con, limit)
87 out = []
88 jwt = repo_did = None
89 if live:
90 jwt, repo_did = _session(pds, ident, pw)
91 for row in rows:
92 rec = build_record(row)
93 if live:
94 uri = _create_record(pds, jwt, repo_did, rec)
95 con.execute("INSERT INTO published_records VALUES (?,?,?) "
96 "ON CONFLICT DO NOTHING", [row["did"], row["as_of"], uri])
97 else:
98 uri = None
99 out.append({"uri": uri, "record": rec})
100 return out
101
102
103def main() -> None:
104 ap = argparse.ArgumentParser(description="publish trust assessments as AT-Proto records")
105 ap.add_argument("--dry-run", action="store_true", help="print records without publishing")
106 ap.add_argument("--limit", type=int, default=100)
107 args = ap.parse_args()
108 results = publish(dry_run=args.dry_run, limit=args.limit)
109 mode = "published" if (results and results[0]["uri"]) else "dry-run (no creds or --dry-run)"
110 for r in results:
111 print(f" {r['uri'] or '(dry-run)'} {r['record']['decision']:<12} "
112 f"{r['record']['calibratedProb']} {r['record']['subject']}")
113 print(f"[publish] {len(results)} assessments, {mode}")
114
115
116def demo() -> None:
117 """Self-check: a score row builds a valid record (no network)."""
118 row = {"did": "did:plc:carol", "as_of": "2026-06-25T00:00:00Z", "structural_trust": 1.0,
119 "content_risk": None, "calibrated_prob": 1.0, "decision": "fast_lane",
120 "explanation": {"top_factors": ["trust reaches did:plc:carol via maintainer -> alice"]}}
121 rec = build_record(row)
122 assert rec["$type"] == LEXICON and rec["subject"] == "did:plc:carol"
123 assert rec["decision"] == "fast_lane" and 0 <= rec["calibratedProb"] <= 1
124 assert rec["summary"] and rec["createdAt"]
125 print(json.dumps(rec, indent=2))
126 print("ok")
127
128
129if __name__ == "__main__":
130 demo()