Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at master 14 kB View raw
1use std::collections::{HashMap, HashSet}; 2use std::sync::Arc; 3 4use bobbin_edge_index::EdgeStore; 5use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 6use bobbin_types::knot_acl::{self, KnotHostKey}; 7use jacquard_common::DefaultStr; 8use jacquard_common::types::did::Did; 9use serde::Deserialize; 10 11use crate::registry::KnotRegistry; 12 13const REPO_COLLABORATOR_KIND: &str = "sh.tangled.repo.collaborator"; 14 15#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] 16pub struct Cursor(pub i64); 17 18impl Cursor { 19 fn micros(self) -> u64 { 20 self.0.max(0) as u64 / 1000 21 } 22} 23 24#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize)] 25#[serde(rename_all = "lowercase")] 26pub enum AclOp { 27 Add, 28 Remove, 29} 30 31#[derive(Clone, Eq, Hash, PartialEq)] 32enum DedupKey { 33 Member(Did<DefaultStr>), 34 Collaborator(Did<DefaultStr>, Did<DefaultStr>), 35} 36 37struct SeenState { 38 cursor: Cursor, 39 present: bool, 40} 41 42pub struct Roster { 43 store: Arc<EdgeStore>, 44 knot: Did<DefaultStr>, 45 registry: Arc<KnotRegistry>, 46 host: KnotHostKey, 47 seen: HashMap<DedupKey, SeenState>, 48} 49 50impl Roster { 51 pub fn new( 52 store: Arc<EdgeStore>, 53 knot: Did<DefaultStr>, 54 registry: Arc<KnotRegistry>, 55 host: KnotHostKey, 56 ) -> Self { 57 Self { 58 store, 59 knot, 60 registry, 61 host, 62 seen: HashMap::new(), 63 } 64 } 65 66 pub fn apply_member(&mut self, op: AclOp, subject: Did<DefaultStr>, cursor: Cursor) { 67 if !self.advance( 68 DedupKey::Member(subject.clone()), 69 cursor, 70 matches!(op, AclOp::Add), 71 ) { 72 return; 73 } 74 match op { 75 AclOp::Add => { 76 if let Some((source, edges)) = 77 knot_acl::member_upsert(&self.knot, &subject, cursor.micros()) 78 { 79 self.store.upsert_source(&source, edges); 80 } 81 } 82 AclOp::Remove => { 83 if let Some(source) = knot_acl::member_source(&self.knot, &subject) { 84 self.store.remove_source(&source); 85 } 86 } 87 } 88 } 89 90 pub fn apply_collaborator( 91 &mut self, 92 op: AclOp, 93 repo: Did<DefaultStr>, 94 subject: Did<DefaultStr>, 95 cursor: Cursor, 96 ) { 97 if !self.registry.repo_on_host(&self.host, &repo) { 98 return; 99 } 100 if !self.advance( 101 DedupKey::Collaborator(repo.clone(), subject.clone()), 102 cursor, 103 matches!(op, AclOp::Add), 104 ) { 105 return; 106 } 107 match op { 108 AclOp::Add => { 109 if let Some((source, edges)) = 110 knot_acl::collaborator_upsert(&repo, &subject, cursor.micros()) 111 { 112 self.store.upsert_source(&source, edges); 113 } 114 } 115 AclOp::Remove => { 116 if let Some(source) = knot_acl::collaborator_source(&repo, &subject) { 117 self.store.remove_source(&source); 118 } 119 } 120 } 121 } 122 123 pub fn max_cursor(&self) -> Cursor { 124 self.seen 125 .values() 126 .map(|state| state.cursor) 127 .max() 128 .unwrap_or(Cursor(0)) 129 } 130 131 pub fn reap_members(&mut self, present: &HashSet<Did<DefaultStr>>, horizon: Cursor) { 132 let stale: Vec<Did<DefaultStr>> = self 133 .seen 134 .iter() 135 .filter_map(|(key, state)| match key { 136 DedupKey::Member(subject) 137 if state.present && state.cursor <= horizon && !present.contains(subject) => 138 { 139 Some(subject.clone()) 140 } 141 _ => None, 142 }) 143 .collect(); 144 stale 145 .into_iter() 146 .for_each(|subject| self.retire_member(subject)); 147 } 148 149 pub fn reap_collaborators( 150 &mut self, 151 repo: &Did<DefaultStr>, 152 present: &HashSet<Did<DefaultStr>>, 153 horizon: Cursor, 154 ) { 155 let stale: Vec<Did<DefaultStr>> = self 156 .seen 157 .iter() 158 .filter_map(|(key, state)| match key { 159 DedupKey::Collaborator(edge_repo, subject) 160 if edge_repo == repo 161 && state.present 162 && state.cursor <= horizon 163 && !present.contains(subject) => 164 { 165 Some(subject.clone()) 166 } 167 _ => None, 168 }) 169 .collect(); 170 stale 171 .into_iter() 172 .for_each(|subject| self.retire_collaborator(repo.clone(), subject)); 173 } 174 175 pub fn purge_legacy(&self) { 176 self.registry 177 .drain_legacy_members(&self.host) 178 .iter() 179 .for_each(|source| self.store.remove_source(source)); 180 181 self.registry 182 .repos(&self.host) 183 .into_iter() 184 .for_each(|repo| { 185 let key = EdgeKey::new(nsid_static(REPO_COLLABORATOR_KIND), SubjectRef::Did(repo)); 186 self.store 187 .sources_for(&key) 188 .into_iter() 189 .filter(|source| knot_acl::decode_knot_owned_source(source).is_none()) 190 .for_each(|source| self.store.remove_source(&source)); 191 }); 192 } 193 194 fn retire_member(&mut self, subject: Did<DefaultStr>) { 195 if let Some(source) = knot_acl::member_source(&self.knot, &subject) { 196 self.store.remove_source(&source); 197 } 198 if let Some(state) = self.seen.get_mut(&DedupKey::Member(subject)) { 199 state.present = false; 200 } 201 } 202 203 fn retire_collaborator(&mut self, repo: Did<DefaultStr>, subject: Did<DefaultStr>) { 204 if let Some(source) = knot_acl::collaborator_source(&repo, &subject) { 205 self.store.remove_source(&source); 206 } 207 if let Some(state) = self.seen.get_mut(&DedupKey::Collaborator(repo, subject)) { 208 state.present = false; 209 } 210 } 211 212 fn advance(&mut self, key: DedupKey, cursor: Cursor, present: bool) -> bool { 213 match self.seen.get(&key) { 214 Some(state) if cursor <= state.cursor => false, 215 _ => { 216 self.seen.insert(key, SeenState { cursor, present }); 217 true 218 } 219 } 220 } 221} 222 223#[cfg(test)] 224mod tests { 225 use super::*; 226 use bobbin_runtime::RuntimeHasher; 227 use bobbin_types::edges::Edge; 228 use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 229 use jacquard_common::types::string::AtUri; 230 231 fn store() -> Arc<EdgeStore> { 232 Arc::new(EdgeStore::new(RuntimeHasher::default())) 233 } 234 235 fn did(s: &str) -> Did<DefaultStr> { 236 Did::new_owned(s).unwrap() 237 } 238 239 fn at(s: &str) -> AtUri<DefaultStr> { 240 AtUri::new_owned(s).unwrap() 241 } 242 243 fn host() -> KnotHostKey { 244 KnotHostKey::new("oyster.cafe") 245 } 246 247 fn add_legacy_edge( 248 store: &EdgeStore, 249 kind: &'static str, 250 subject: &Did<DefaultStr>, 251 source: &AtUri<DefaultStr>, 252 ) { 253 store.upsert_source( 254 source, 255 vec![Edge { 256 kind: nsid_static(kind), 257 subject: SubjectRef::Did(subject.clone()), 258 source: source.clone(), 259 sort_micros: 0, 260 }], 261 ); 262 } 263 264 fn knot() -> Did<DefaultStr> { 265 knot_acl::host_to_knot_did("oyster.cafe").unwrap() 266 } 267 268 fn member_count(store: &EdgeStore, subject: &Did<DefaultStr>) -> u64 { 269 store.count(&EdgeKey::new( 270 nsid_static("sh.tangled.knot.member"), 271 SubjectRef::Did(subject.clone()), 272 )) 273 } 274 275 fn collaborator_count(store: &EdgeStore, repo: &Did<DefaultStr>) -> u64 { 276 store.count(&EdgeKey::new( 277 nsid_static("sh.tangled.repo.collaborator"), 278 SubjectRef::Did(repo.clone()), 279 )) 280 } 281 282 fn empty_registry() -> Arc<KnotRegistry> { 283 Arc::new(KnotRegistry::new()) 284 } 285 286 fn member_roster(store: Arc<EdgeStore>) -> Roster { 287 Roster::new(store, knot(), empty_registry(), host()) 288 } 289 290 fn subjects(items: &[&Did<DefaultStr>]) -> HashSet<Did<DefaultStr>> { 291 items.iter().map(|d| (*d).clone()).collect() 292 } 293 294 #[test] 295 fn member_add_then_remove() { 296 let store = store(); 297 let mut roster = member_roster(store.clone()); 298 let m = did("did:plc:boltless"); 299 roster.apply_member(AclOp::Add, m.clone(), Cursor(1_000_000)); 300 assert_eq!(member_count(&store, &m), 1); 301 roster.apply_member(AclOp::Remove, m.clone(), Cursor(2_000_000)); 302 assert_eq!(member_count(&store, &m), 0); 303 } 304 305 #[test] 306 fn stale_add_cannot_resurrect_removed_member() { 307 let store = store(); 308 let mut roster = member_roster(store.clone()); 309 let m = did("did:plc:boltless"); 310 roster.apply_member(AclOp::Remove, m.clone(), Cursor(5)); 311 roster.apply_member(AclOp::Add, m.clone(), Cursor(1)); 312 assert_eq!(member_count(&store, &m), 0); 313 } 314 315 #[test] 316 fn duplicate_cursor_is_idempotent() { 317 let store = store(); 318 let mut roster = member_roster(store.clone()); 319 let m = did("did:plc:akshay"); 320 roster.apply_member(AclOp::Add, m.clone(), Cursor(10)); 321 roster.apply_member(AclOp::Add, m.clone(), Cursor(10)); 322 assert_eq!(member_count(&store, &m), 1); 323 } 324 325 #[test] 326 fn collaborator_keyed_on_repo_when_hosted() { 327 let store = store(); 328 let repo = did("did:plc:scallop"); 329 let registry = empty_registry(); 330 registry.observe_repo(&host(), repo.clone()); 331 let mut roster = Roster::new(store.clone(), knot(), registry, host()); 332 let subject = did("did:plc:olaren"); 333 roster.apply_collaborator(AclOp::Add, repo.clone(), subject.clone(), Cursor(7)); 334 assert_eq!(collaborator_count(&store, &repo), 1); 335 roster.apply_collaborator(AclOp::Remove, repo.clone(), subject.clone(), Cursor(8)); 336 assert_eq!(collaborator_count(&store, &repo), 0); 337 } 338 339 #[test] 340 fn reap_removes_departed_member() { 341 let store = store(); 342 let mut roster = member_roster(store.clone()); 343 let stayed = did("did:plc:akshay"); 344 let left = did("did:plc:boltless"); 345 roster.apply_member(AclOp::Add, stayed.clone(), Cursor(10)); 346 roster.apply_member(AclOp::Add, left.clone(), Cursor(20)); 347 348 let horizon = roster.max_cursor(); 349 roster.reap_members(&subjects(&[&stayed]), horizon); 350 351 assert_eq!(member_count(&store, &stayed), 1); 352 assert_eq!( 353 member_count(&store, &left), 354 0, 355 "a member absent from the authoritative snapshot is reaped" 356 ); 357 } 358 359 #[test] 360 fn reap_skips_member_added_after_horizon() { 361 let store = store(); 362 let mut roster = member_roster(store.clone()); 363 let m = did("did:plc:boltless"); 364 let horizon = roster.max_cursor(); 365 roster.apply_member(AclOp::Add, m.clone(), Cursor(100)); 366 367 roster.reap_members(&subjects(&[]), horizon); 368 369 assert_eq!( 370 member_count(&store, &m), 371 1, 372 "a member added after the snapshot horizon must survive the reap" 373 ); 374 } 375 376 #[test] 377 fn reap_removes_departed_collaborator() { 378 let store = store(); 379 let repo = did("did:plc:scallop"); 380 let registry = empty_registry(); 381 registry.observe_repo(&host(), repo.clone()); 382 let mut roster = Roster::new(store.clone(), knot(), registry, host()); 383 let left = did("did:plc:olaren"); 384 roster.apply_collaborator(AclOp::Add, repo.clone(), left.clone(), Cursor(7)); 385 386 let horizon = roster.max_cursor(); 387 roster.reap_collaborators(&repo, &subjects(&[]), horizon); 388 389 assert_eq!(collaborator_count(&store, &repo), 0); 390 } 391 392 #[test] 393 fn purge_legacy_strips_pds_collaborator_keeps_knot_owned() { 394 let store = store(); 395 let repo = did("did:plc:scallop"); 396 let registry = empty_registry(); 397 registry.observe_repo(&host(), repo.clone()); 398 let mut roster = Roster::new(store.clone(), knot(), registry, host()); 399 400 roster.apply_collaborator(AclOp::Add, repo.clone(), did("did:plc:olaren"), Cursor(7)); 401 add_legacy_edge( 402 &store, 403 "sh.tangled.repo.collaborator", 404 &repo, 405 &at("at://did:plc:akshay/sh.tangled.repo.collaborator/r1"), 406 ); 407 assert_eq!(collaborator_count(&store, &repo), 2); 408 409 roster.purge_legacy(); 410 assert_eq!( 411 collaborator_count(&store, &repo), 412 1, 413 "only the knot-owned collaborator survives the purge" 414 ); 415 } 416 417 #[test] 418 fn purge_legacy_removes_indexed_member_edges() { 419 let store = store(); 420 let registry = empty_registry(); 421 let roster = Roster::new(store.clone(), knot(), registry.clone(), host()); 422 let member = did("did:plc:boltless"); 423 let source = at("at://did:plc:akshay/sh.tangled.knot.member/r1"); 424 425 add_legacy_edge(&store, "sh.tangled.knot.member", &member, &source); 426 registry.note_legacy_member(source.clone(), &host()); 427 assert_eq!(member_count(&store, &member), 1); 428 429 roster.purge_legacy(); 430 assert_eq!(member_count(&store, &member), 0); 431 } 432 433 #[test] 434 fn collaborator_for_unhosted_repo_is_dropped() { 435 let store = store(); 436 let repo = did("did:plc:scallop"); 437 let mut roster = Roster::new(store.clone(), knot(), empty_registry(), host()); 438 roster.apply_collaborator(AclOp::Add, repo.clone(), did("did:plc:olaren"), Cursor(7)); 439 assert_eq!( 440 collaborator_count(&store, &repo), 441 0, 442 "a knot cannot assert collaborators on a repo it does not host" 443 ); 444 } 445}