Monorepo for Tangled
tangled.org
1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use bobbin_edge_index::EdgeStore;
5use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static};
6use bobbin_types::knot_acl::{self, KnotHostKey};
7use jacquard_common::DefaultStr;
8use jacquard_common::types::did::Did;
9use serde::Deserialize;
10
11use crate::registry::KnotRegistry;
12
13const REPO_COLLABORATOR_KIND: &str = "sh.tangled.repo.collaborator";
14
15#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
16pub struct Cursor(pub i64);
17
18impl Cursor {
19 fn micros(self) -> u64 {
20 self.0.max(0) as u64 / 1000
21 }
22}
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize)]
25#[serde(rename_all = "lowercase")]
26pub enum AclOp {
27 Add,
28 Remove,
29}
30
31#[derive(Clone, Eq, Hash, PartialEq)]
32enum DedupKey {
33 Member(Did<DefaultStr>),
34 Collaborator(Did<DefaultStr>, Did<DefaultStr>),
35}
36
37struct SeenState {
38 cursor: Cursor,
39 present: bool,
40}
41
42pub struct Roster {
43 store: Arc<EdgeStore>,
44 knot: Did<DefaultStr>,
45 registry: Arc<KnotRegistry>,
46 host: KnotHostKey,
47 seen: HashMap<DedupKey, SeenState>,
48}
49
50impl Roster {
51 pub fn new(
52 store: Arc<EdgeStore>,
53 knot: Did<DefaultStr>,
54 registry: Arc<KnotRegistry>,
55 host: KnotHostKey,
56 ) -> Self {
57 Self {
58 store,
59 knot,
60 registry,
61 host,
62 seen: HashMap::new(),
63 }
64 }
65
66 pub fn apply_member(&mut self, op: AclOp, subject: Did<DefaultStr>, cursor: Cursor) {
67 if !self.advance(
68 DedupKey::Member(subject.clone()),
69 cursor,
70 matches!(op, AclOp::Add),
71 ) {
72 return;
73 }
74 match op {
75 AclOp::Add => {
76 if let Some((source, edges)) =
77 knot_acl::member_upsert(&self.knot, &subject, cursor.micros())
78 {
79 self.store.upsert_source(&source, edges);
80 }
81 }
82 AclOp::Remove => {
83 if let Some(source) = knot_acl::member_source(&self.knot, &subject) {
84 self.store.remove_source(&source);
85 }
86 }
87 }
88 }
89
90 pub fn apply_collaborator(
91 &mut self,
92 op: AclOp,
93 repo: Did<DefaultStr>,
94 subject: Did<DefaultStr>,
95 cursor: Cursor,
96 ) {
97 if !self.registry.repo_on_host(&self.host, &repo) {
98 return;
99 }
100 if !self.advance(
101 DedupKey::Collaborator(repo.clone(), subject.clone()),
102 cursor,
103 matches!(op, AclOp::Add),
104 ) {
105 return;
106 }
107 match op {
108 AclOp::Add => {
109 if let Some((source, edges)) =
110 knot_acl::collaborator_upsert(&repo, &subject, cursor.micros())
111 {
112 self.store.upsert_source(&source, edges);
113 }
114 }
115 AclOp::Remove => {
116 if let Some(source) = knot_acl::collaborator_source(&repo, &subject) {
117 self.store.remove_source(&source);
118 }
119 }
120 }
121 }
122
123 pub fn max_cursor(&self) -> Cursor {
124 self.seen
125 .values()
126 .map(|state| state.cursor)
127 .max()
128 .unwrap_or(Cursor(0))
129 }
130
131 pub fn reap_members(&mut self, present: &HashSet<Did<DefaultStr>>, horizon: Cursor) {
132 let stale: Vec<Did<DefaultStr>> = self
133 .seen
134 .iter()
135 .filter_map(|(key, state)| match key {
136 DedupKey::Member(subject)
137 if state.present && state.cursor <= horizon && !present.contains(subject) =>
138 {
139 Some(subject.clone())
140 }
141 _ => None,
142 })
143 .collect();
144 stale
145 .into_iter()
146 .for_each(|subject| self.retire_member(subject));
147 }
148
149 pub fn reap_collaborators(
150 &mut self,
151 repo: &Did<DefaultStr>,
152 present: &HashSet<Did<DefaultStr>>,
153 horizon: Cursor,
154 ) {
155 let stale: Vec<Did<DefaultStr>> = self
156 .seen
157 .iter()
158 .filter_map(|(key, state)| match key {
159 DedupKey::Collaborator(edge_repo, subject)
160 if edge_repo == repo
161 && state.present
162 && state.cursor <= horizon
163 && !present.contains(subject) =>
164 {
165 Some(subject.clone())
166 }
167 _ => None,
168 })
169 .collect();
170 stale
171 .into_iter()
172 .for_each(|subject| self.retire_collaborator(repo.clone(), subject));
173 }
174
175 pub fn purge_legacy(&self) {
176 self.registry
177 .drain_legacy_members(&self.host)
178 .iter()
179 .for_each(|source| self.store.remove_source(source));
180
181 self.registry
182 .repos(&self.host)
183 .into_iter()
184 .for_each(|repo| {
185 let key = EdgeKey::new(nsid_static(REPO_COLLABORATOR_KIND), SubjectRef::Did(repo));
186 self.store
187 .sources_for(&key)
188 .into_iter()
189 .filter(|source| knot_acl::decode_knot_owned_source(source).is_none())
190 .for_each(|source| self.store.remove_source(&source));
191 });
192 }
193
194 fn retire_member(&mut self, subject: Did<DefaultStr>) {
195 if let Some(source) = knot_acl::member_source(&self.knot, &subject) {
196 self.store.remove_source(&source);
197 }
198 if let Some(state) = self.seen.get_mut(&DedupKey::Member(subject)) {
199 state.present = false;
200 }
201 }
202
203 fn retire_collaborator(&mut self, repo: Did<DefaultStr>, subject: Did<DefaultStr>) {
204 if let Some(source) = knot_acl::collaborator_source(&repo, &subject) {
205 self.store.remove_source(&source);
206 }
207 if let Some(state) = self.seen.get_mut(&DedupKey::Collaborator(repo, subject)) {
208 state.present = false;
209 }
210 }
211
212 fn advance(&mut self, key: DedupKey, cursor: Cursor, present: bool) -> bool {
213 match self.seen.get(&key) {
214 Some(state) if cursor <= state.cursor => false,
215 _ => {
216 self.seen.insert(key, SeenState { cursor, present });
217 true
218 }
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use bobbin_runtime::RuntimeHasher;
227 use bobbin_types::edges::Edge;
228 use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static};
229 use jacquard_common::types::string::AtUri;
230
231 fn store() -> Arc<EdgeStore> {
232 Arc::new(EdgeStore::new(RuntimeHasher::default()))
233 }
234
235 fn did(s: &str) -> Did<DefaultStr> {
236 Did::new_owned(s).unwrap()
237 }
238
239 fn at(s: &str) -> AtUri<DefaultStr> {
240 AtUri::new_owned(s).unwrap()
241 }
242
243 fn host() -> KnotHostKey {
244 KnotHostKey::new("oyster.cafe")
245 }
246
247 fn add_legacy_edge(
248 store: &EdgeStore,
249 kind: &'static str,
250 subject: &Did<DefaultStr>,
251 source: &AtUri<DefaultStr>,
252 ) {
253 store.upsert_source(
254 source,
255 vec![Edge {
256 kind: nsid_static(kind),
257 subject: SubjectRef::Did(subject.clone()),
258 source: source.clone(),
259 sort_micros: 0,
260 }],
261 );
262 }
263
264 fn knot() -> Did<DefaultStr> {
265 knot_acl::host_to_knot_did("oyster.cafe").unwrap()
266 }
267
268 fn member_count(store: &EdgeStore, subject: &Did<DefaultStr>) -> u64 {
269 store.count(&EdgeKey::new(
270 nsid_static("sh.tangled.knot.member"),
271 SubjectRef::Did(subject.clone()),
272 ))
273 }
274
275 fn collaborator_count(store: &EdgeStore, repo: &Did<DefaultStr>) -> u64 {
276 store.count(&EdgeKey::new(
277 nsid_static("sh.tangled.repo.collaborator"),
278 SubjectRef::Did(repo.clone()),
279 ))
280 }
281
282 fn empty_registry() -> Arc<KnotRegistry> {
283 Arc::new(KnotRegistry::new())
284 }
285
286 fn member_roster(store: Arc<EdgeStore>) -> Roster {
287 Roster::new(store, knot(), empty_registry(), host())
288 }
289
290 fn subjects(items: &[&Did<DefaultStr>]) -> HashSet<Did<DefaultStr>> {
291 items.iter().map(|d| (*d).clone()).collect()
292 }
293
294 #[test]
295 fn member_add_then_remove() {
296 let store = store();
297 let mut roster = member_roster(store.clone());
298 let m = did("did:plc:boltless");
299 roster.apply_member(AclOp::Add, m.clone(), Cursor(1_000_000));
300 assert_eq!(member_count(&store, &m), 1);
301 roster.apply_member(AclOp::Remove, m.clone(), Cursor(2_000_000));
302 assert_eq!(member_count(&store, &m), 0);
303 }
304
305 #[test]
306 fn stale_add_cannot_resurrect_removed_member() {
307 let store = store();
308 let mut roster = member_roster(store.clone());
309 let m = did("did:plc:boltless");
310 roster.apply_member(AclOp::Remove, m.clone(), Cursor(5));
311 roster.apply_member(AclOp::Add, m.clone(), Cursor(1));
312 assert_eq!(member_count(&store, &m), 0);
313 }
314
315 #[test]
316 fn duplicate_cursor_is_idempotent() {
317 let store = store();
318 let mut roster = member_roster(store.clone());
319 let m = did("did:plc:akshay");
320 roster.apply_member(AclOp::Add, m.clone(), Cursor(10));
321 roster.apply_member(AclOp::Add, m.clone(), Cursor(10));
322 assert_eq!(member_count(&store, &m), 1);
323 }
324
325 #[test]
326 fn collaborator_keyed_on_repo_when_hosted() {
327 let store = store();
328 let repo = did("did:plc:scallop");
329 let registry = empty_registry();
330 registry.observe_repo(&host(), repo.clone());
331 let mut roster = Roster::new(store.clone(), knot(), registry, host());
332 let subject = did("did:plc:olaren");
333 roster.apply_collaborator(AclOp::Add, repo.clone(), subject.clone(), Cursor(7));
334 assert_eq!(collaborator_count(&store, &repo), 1);
335 roster.apply_collaborator(AclOp::Remove, repo.clone(), subject.clone(), Cursor(8));
336 assert_eq!(collaborator_count(&store, &repo), 0);
337 }
338
339 #[test]
340 fn reap_removes_departed_member() {
341 let store = store();
342 let mut roster = member_roster(store.clone());
343 let stayed = did("did:plc:akshay");
344 let left = did("did:plc:boltless");
345 roster.apply_member(AclOp::Add, stayed.clone(), Cursor(10));
346 roster.apply_member(AclOp::Add, left.clone(), Cursor(20));
347
348 let horizon = roster.max_cursor();
349 roster.reap_members(&subjects(&[&stayed]), horizon);
350
351 assert_eq!(member_count(&store, &stayed), 1);
352 assert_eq!(
353 member_count(&store, &left),
354 0,
355 "a member absent from the authoritative snapshot is reaped"
356 );
357 }
358
359 #[test]
360 fn reap_skips_member_added_after_horizon() {
361 let store = store();
362 let mut roster = member_roster(store.clone());
363 let m = did("did:plc:boltless");
364 let horizon = roster.max_cursor();
365 roster.apply_member(AclOp::Add, m.clone(), Cursor(100));
366
367 roster.reap_members(&subjects(&[]), horizon);
368
369 assert_eq!(
370 member_count(&store, &m),
371 1,
372 "a member added after the snapshot horizon must survive the reap"
373 );
374 }
375
376 #[test]
377 fn reap_removes_departed_collaborator() {
378 let store = store();
379 let repo = did("did:plc:scallop");
380 let registry = empty_registry();
381 registry.observe_repo(&host(), repo.clone());
382 let mut roster = Roster::new(store.clone(), knot(), registry, host());
383 let left = did("did:plc:olaren");
384 roster.apply_collaborator(AclOp::Add, repo.clone(), left.clone(), Cursor(7));
385
386 let horizon = roster.max_cursor();
387 roster.reap_collaborators(&repo, &subjects(&[]), horizon);
388
389 assert_eq!(collaborator_count(&store, &repo), 0);
390 }
391
392 #[test]
393 fn purge_legacy_strips_pds_collaborator_keeps_knot_owned() {
394 let store = store();
395 let repo = did("did:plc:scallop");
396 let registry = empty_registry();
397 registry.observe_repo(&host(), repo.clone());
398 let mut roster = Roster::new(store.clone(), knot(), registry, host());
399
400 roster.apply_collaborator(AclOp::Add, repo.clone(), did("did:plc:olaren"), Cursor(7));
401 add_legacy_edge(
402 &store,
403 "sh.tangled.repo.collaborator",
404 &repo,
405 &at("at://did:plc:akshay/sh.tangled.repo.collaborator/r1"),
406 );
407 assert_eq!(collaborator_count(&store, &repo), 2);
408
409 roster.purge_legacy();
410 assert_eq!(
411 collaborator_count(&store, &repo),
412 1,
413 "only the knot-owned collaborator survives the purge"
414 );
415 }
416
417 #[test]
418 fn purge_legacy_removes_indexed_member_edges() {
419 let store = store();
420 let registry = empty_registry();
421 let roster = Roster::new(store.clone(), knot(), registry.clone(), host());
422 let member = did("did:plc:boltless");
423 let source = at("at://did:plc:akshay/sh.tangled.knot.member/r1");
424
425 add_legacy_edge(&store, "sh.tangled.knot.member", &member, &source);
426 registry.note_legacy_member(source.clone(), &host());
427 assert_eq!(member_count(&store, &member), 1);
428
429 roster.purge_legacy();
430 assert_eq!(member_count(&store, &member), 0);
431 }
432
433 #[test]
434 fn collaborator_for_unhosted_repo_is_dropped() {
435 let store = store();
436 let repo = did("did:plc:scallop");
437 let mut roster = Roster::new(store.clone(), knot(), empty_registry(), host());
438 roster.apply_collaborator(AclOp::Add, repo.clone(), did("did:plc:olaren"), Cursor(7));
439 assert_eq!(
440 collaborator_count(&store, &repo),
441 0,
442 "a knot cannot assert collaborators on a repo it does not host"
443 );
444 }
445}