Sunstead trust scoring project
1"""M3 structural signal: EigenTrust over the vouches edge list (PRD 6.4).
2
3Reads rows into a SciPy sparse matrix and runs personalized power iteration in
4memory. No graph DB (PRD 2). Path explanations come from an in-memory BFS from
5the seed, not graph-DB traversal.
6"""
7
8from __future__ import annotations
9
10import math
11from collections import deque
12from dataclasses import dataclass, field
13
14import numpy as np
15from scipy import sparse
16
17from .config import CFG
18
19
20@dataclass
21class EigenResult:
22 trust: dict[str, float] # did -> structural trust in [0,1] (max-normalized)
23 index: dict[str, int]
24 seeds: list[str]
25 _adj: dict[str, list[str]] # positive-edge adjacency for BFS paths
26 stars_trust: dict[str, float] = field(default_factory=dict) # owner -> Σ trust[starrer] (sybil-resistant stars)
27
28 def path_from_seed(self, did: str, max_hops: int = 4) -> list[str]:
29 """Shortest positive-vouch path seed -> did, for the explanation (PRD 6.4)."""
30 if did in self.seeds:
31 return [did]
32 seen = set(self.seeds)
33 q: deque[list[str]] = deque([[s] for s in self.seeds])
34 while q:
35 path = q.popleft()
36 if len(path) > max_hops:
37 continue
38 for nxt in self._adj.get(path[-1], ()):
39 if nxt in seen:
40 continue
41 if nxt == did:
42 return path + [nxt]
43 seen.add(nxt)
44 q.append(path + [nxt])
45 return []
46
47
48def _age_weight(created_at, now_us: float) -> float:
49 if created_at is None:
50 return 1.0
51 try:
52 age_days = (now_us - created_at.timestamp()) / 86400.0
53 except (AttributeError, TypeError):
54 return 1.0
55 return 0.5 ** (max(age_days, 0) / CFG.eigen.age_halflife_days)
56
57
58def compute(con) -> EigenResult:
59 import datetime
60
61 now_us = datetime.datetime.now(datetime.timezone.utc).timestamp()
62 rows = con.execute(
63 "SELECT voucher_did, subject_did, polarity, weight, evidence_uri, created_at FROM vouches"
64 ).fetchall()
65 seeds = [r[0] for r in con.execute("SELECT did FROM seeds").fetchall()]
66 dids = {d for (d,) in con.execute("SELECT did FROM contributors").fetchall()}
67 for v, s, *_ in rows:
68 dids.update((v, s))
69 dids.update(seeds)
70
71 if not dids:
72 return EigenResult({}, {}, seeds, {})
73 index = {d: i for i, d in enumerate(sorted(dids))}
74 n = len(index)
75
76 # denounced nodes get incoming trust zeroed (PRD 6.4: distrust does NOT flow transitively)
77 denounced = {s for (v, s, pol, *_) in rows if pol is not None and pol < 0}
78
79 src, dst, data = [], [], []
80 adj: dict[str, list[str]] = {}
81 for voucher, subject, polarity, weight, evidence, created_at in rows:
82 if polarity is not None and polarity < 0:
83 continue # denounce: recorded as a feature, never a positive edge
84 if subject in denounced:
85 continue # ponytail: any denounce starves the node; per-edge weighting if it matters
86 w = (weight or 1.0) * _age_weight(created_at, now_us)
87 if evidence:
88 w *= CFG.eigen.evidence_boost
89 src.append(index[voucher]); dst.append(index[subject]); data.append(w)
90 adj.setdefault(voucher, []).append(subject)
91
92 C = sparse.csr_matrix((data, (src, dst)), shape=(n, n))
93 row_sums = np.asarray(C.sum(axis=1)).ravel()
94 row_sums[row_sums == 0] = 1.0
95 C = sparse.diags(1.0 / row_sums) @ C # row-normalize: C[i,j] = trust i places in j
96
97 p = np.zeros(n)
98 if seeds:
99 for s in seeds:
100 p[index[s]] = 1.0
101 else:
102 p[:] = 1.0 # ponytail: no seed configured -> uniform restart (global PageRank fallback)
103 p /= p.sum()
104
105 Ct = C.T.tocsr()
106 t = p.copy()
107 a = CFG.eigen.alpha
108 for _ in range(CFG.eigen.iters):
109 t = (1 - a) * (Ct @ t) + a * p
110 s = t.sum()
111 if s > 0:
112 t /= s
113
114 hi = t.max() or 1.0
115 trust = {d: float(t[i] / hi) for d, i in index.items()} # max-normalize to [0,1]
116
117 # Trust-weighted stars: a star counts only as much as the starrer is itself trusted,
118 # so sybil star-farms (trust ~0) contribute ~nothing. Turns a gameable popularity count
119 # into a sybil-resistant reputation feature, same philosophy as the vouch graph.
120 stars_trust: dict[str, float] = {}
121 try:
122 for owner, starrer in con.execute("SELECT owner_did, starrer_did FROM stars").fetchall():
123 stars_trust[owner] = stars_trust.get(owner, 0.0) + trust.get(starrer, 0.0)
124 except Exception:
125 pass # stars table absent on a pre-stars DB -> feature stays 0 until schema upgrades
126
127 return EigenResult(trust, index, seeds, adj, stars_trust)
128
129
130def demo() -> None:
131 """Self-check: a sybil cluster vouching for itself must be starved (PRD 1.1)."""
132 from .db import init_db
133
134 con = init_db()
135 con.execute("DELETE FROM vouches; DELETE FROM seeds; DELETE FROM contributors")
136 edges = [("seed", "alice"), ("alice", "bob"), ("bob", "carol"), # trusted chain
137 ("sybil1", "sybil2"), ("sybil2", "sybil1"), ("sybil2", "sybil3"), ("sybil3", "sybil1")]
138 for v, s in edges:
139 con.execute("INSERT INTO contributors (did) VALUES (?) ON CONFLICT DO NOTHING", [v])
140 con.execute("INSERT INTO contributors (did) VALUES (?) ON CONFLICT DO NOTHING", [s])
141 con.execute("INSERT INTO vouches (voucher_did, subject_did) VALUES (?,?)", [v, s])
142 con.execute("INSERT INTO seeds VALUES ('seed')")
143 r = compute(con)
144 trusted = r.trust.get("bob", 0)
145 sybil = max(r.trust.get(f"sybil{i}", 0) for i in (1, 2, 3))
146 print(f"bob={trusted:.3f} sybil_max={sybil:.3f} path(carol)={r.path_from_seed('carol')}")
147 assert trusted > sybil, "sybil cluster should be starved relative to the trusted chain"
148 assert r.path_from_seed("carol") == ["seed", "alice", "bob", "carol"]
149 print("ok")
150
151
152if __name__ == "__main__":
153 demo()