Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 33 kB View raw
1use std::collections::{BTreeSet, HashMap, VecDeque}; 2use std::fmt::Display; 3use std::net::IpAddr; 4use std::sync::{Arc, Mutex}; 5 6use serde::{Serialize, Serializer}; 7use serde_json::Value; 8use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch}; 9 10use knot_runtime::Clock; 11use knot_types::{AccountDid, Oid, OwnerDid, RefName, RepoDid, Tid}; 12 13pub const RECONSTRUCT_WINDOW_SECS: i64 = 30 * 24 * 60 * 60; 14 15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)] 16#[serde(transparent)] 17pub struct EventCursor(i64); 18 19impl EventCursor { 20 pub const START: Self = Self(0); 21 22 pub fn new(nanos: i64) -> Self { 23 Self(nanos) 24 } 25 26 pub fn get(self) -> i64 { 27 self.0 28 } 29 30 fn from_unix_micros(micros: u64) -> Self { 31 Self((micros as i64).saturating_mul(1_000)) 32 } 33} 34 35pub trait Publish: Serialize { 36 const NSID: &'static str; 37} 38 39#[derive(Debug, Clone, Serialize)] 40pub struct Event { 41 pub rkey: Tid, 42 pub nsid: &'static str, 43 #[serde(rename = "event")] 44 pub payload: Value, 45 pub created: EventCursor, 46} 47 48fn or_empty<T: Display, S: Serializer>( 49 value: &Option<T>, 50 serializer: S, 51) -> Result<S::Ok, S::Error> { 52 match value { 53 Some(inner) => serializer.collect_str(inner), 54 None => serializer.serialize_str(""), 55 } 56} 57 58fn via_display<T: Display, S: Serializer>(value: &T, serializer: S) -> Result<S::Ok, S::Error> { 59 serializer.collect_str(value) 60} 61 62#[derive(Debug, Clone, Serialize)] 63pub struct GitRefUpdate { 64 #[serde(rename = "$type")] 65 record_type: &'static str, 66 #[serde(rename = "committerDid")] 67 committer_did: AccountDid, 68 meta: Option<RefUpdateMeta>, 69 #[serde(rename = "newSha", serialize_with = "or_empty")] 70 new_sha: Option<Oid>, 71 #[serde(rename = "oldSha", serialize_with = "or_empty")] 72 old_sha: Option<Oid>, 73 #[serde(rename = "ownerDid", skip_serializing_if = "Option::is_none")] 74 owner_did: Option<OwnerDid>, 75 #[serde(rename = "ref", serialize_with = "or_empty")] 76 ref_name: Option<RefName>, 77 repo: RepoDid, 78} 79 80impl GitRefUpdate { 81 pub fn new(repo: RepoDid, owner: Option<OwnerDid>, committer: AccountDid) -> Self { 82 Self { 83 record_type: Self::NSID, 84 committer_did: committer, 85 meta: None, 86 new_sha: None, 87 old_sha: None, 88 owner_did: owner, 89 ref_name: None, 90 repo, 91 } 92 } 93 94 pub fn on_ref(mut self, ref_name: RefName, old: Option<Oid>, new: Option<Oid>) -> Self { 95 self.ref_name = Some(ref_name); 96 self.old_sha = old; 97 self.new_sha = new; 98 self 99 } 100 101 pub fn with_meta(mut self, meta: RefUpdateMeta) -> Self { 102 self.meta = Some(meta); 103 self 104 } 105} 106 107impl Publish for GitRefUpdate { 108 const NSID: &'static str = "sh.tangled.git.refUpdate"; 109} 110 111#[derive(Debug, Clone, Serialize)] 112pub struct RefUpdateMeta { 113 #[serde(rename = "isDefaultRef")] 114 is_default_ref: bool, 115 #[serde(rename = "commitCount")] 116 commit_count: CommitCountBreakdown, 117 #[serde(rename = "langBreakdown", skip_serializing_if = "Option::is_none")] 118 lang_breakdown: Option<LangBreakdown>, 119} 120 121impl RefUpdateMeta { 122 pub fn new( 123 is_default_ref: bool, 124 by_email: Vec<(String, i64)>, 125 languages: Vec<(String, i64)>, 126 ) -> Self { 127 let by_email = by_email 128 .into_iter() 129 .map(|(email, count)| EmailCommitCount { email, count }) 130 .collect::<Vec<_>>(); 131 let inputs = languages 132 .into_iter() 133 .map(|(lang, size)| LanguageSize { lang, size }) 134 .collect::<Vec<_>>(); 135 Self { 136 is_default_ref, 137 commit_count: CommitCountBreakdown { 138 by_email: (!by_email.is_empty()).then_some(by_email), 139 }, 140 lang_breakdown: (!inputs.is_empty()).then_some(LangBreakdown { 141 inputs: Some(inputs), 142 }), 143 } 144 } 145} 146 147#[derive(Debug, Clone, Serialize)] 148struct CommitCountBreakdown { 149 #[serde(rename = "byEmail", skip_serializing_if = "Option::is_none")] 150 by_email: Option<Vec<EmailCommitCount>>, 151} 152 153#[derive(Debug, Clone, Serialize)] 154struct EmailCommitCount { 155 email: String, 156 count: i64, 157} 158 159#[derive(Debug, Clone, Serialize)] 160struct LangBreakdown { 161 #[serde(skip_serializing_if = "Option::is_none")] 162 inputs: Option<Vec<LanguageSize>>, 163} 164 165#[derive(Debug, Clone, Serialize)] 166struct LanguageSize { 167 lang: String, 168 size: i64, 169} 170 171#[derive(Debug, Clone, Serialize)] 172pub struct Pipeline { 173 #[serde(rename = "$type")] 174 record_type: &'static str, 175 #[serde(rename = "triggerMetadata")] 176 trigger_metadata: TriggerMetadata, 177 workflows: Vec<WorkflowSpec>, 178} 179 180impl Pipeline { 181 pub fn new(trigger_metadata: TriggerMetadata, workflows: Vec<WorkflowSpec>) -> Self { 182 Self { 183 record_type: Self::NSID, 184 trigger_metadata, 185 workflows, 186 } 187 } 188} 189 190impl Publish for Pipeline { 191 const NSID: &'static str = "sh.tangled.pipeline"; 192} 193 194#[derive(Debug, Clone, Serialize)] 195pub struct TriggerMetadata { 196 kind: &'static str, 197 #[serde(skip_serializing_if = "Option::is_none")] 198 push: Option<PushTriggerData>, 199 repo: TriggerRepo, 200} 201 202impl TriggerMetadata { 203 pub fn push(push: PushTriggerData, repo: TriggerRepo) -> Self { 204 Self { 205 kind: "push", 206 push: Some(push), 207 repo, 208 } 209 } 210} 211 212#[derive(Debug, Clone, Serialize)] 213pub struct PushTriggerData { 214 #[serde(rename = "ref", serialize_with = "via_display")] 215 ref_name: RefName, 216 #[serde(rename = "newSha", serialize_with = "via_display")] 217 new_sha: Oid, 218 #[serde(rename = "oldSha", serialize_with = "via_display")] 219 old_sha: Oid, 220} 221 222impl PushTriggerData { 223 pub fn new(ref_name: RefName, old_sha: Oid, new_sha: Oid) -> Self { 224 Self { 225 ref_name, 226 new_sha, 227 old_sha, 228 } 229 } 230} 231 232#[derive(Debug, Clone, Serialize)] 233pub struct TriggerRepo { 234 knot: String, 235 did: OwnerDid, 236 #[serde(rename = "repoDid", skip_serializing_if = "Option::is_none")] 237 repo_did: Option<RepoDid>, 238 #[serde(skip_serializing_if = "Option::is_none")] 239 repo: Option<String>, 240 #[serde(rename = "defaultBranch")] 241 default_branch: String, 242} 243 244impl TriggerRepo { 245 pub fn new( 246 knot: String, 247 did: OwnerDid, 248 repo_did: Option<RepoDid>, 249 repo: Option<String>, 250 default_branch: String, 251 ) -> Self { 252 Self { 253 knot, 254 did, 255 repo_did, 256 repo, 257 default_branch, 258 } 259 } 260} 261 262#[derive(Debug, Clone, Serialize)] 263pub struct WorkflowSpec { 264 name: String, 265 engine: String, 266 clone: CloneSpec, 267 raw: String, 268} 269 270impl WorkflowSpec { 271 pub fn new(name: String, engine: String, clone: CloneSpec, raw: String) -> Self { 272 Self { 273 name, 274 engine, 275 clone, 276 raw, 277 } 278 } 279} 280 281#[derive(Debug, Clone, Serialize)] 282pub struct CloneSpec { 283 skip: bool, 284 depth: i64, 285 submodules: bool, 286} 287 288impl CloneSpec { 289 pub fn new(skip: bool, depth: i64, submodules: bool) -> Self { 290 Self { 291 skip, 292 depth, 293 submodules, 294 } 295 } 296} 297 298#[derive(Debug, Clone, Copy, Serialize)] 299#[serde(rename_all = "lowercase")] 300enum AclOp { 301 Add, 302 Remove, 303} 304 305#[derive(Debug, Clone, Serialize)] 306pub struct KnotMemberUpdate { 307 op: AclOp, 308 subject: AccountDid, 309} 310 311impl KnotMemberUpdate { 312 pub fn added(subject: AccountDid) -> Self { 313 Self { 314 op: AclOp::Add, 315 subject, 316 } 317 } 318 319 pub fn removed(subject: AccountDid) -> Self { 320 Self { 321 op: AclOp::Remove, 322 subject, 323 } 324 } 325} 326 327impl Publish for KnotMemberUpdate { 328 const NSID: &'static str = "sh.tangled.knot.memberUpdate"; 329} 330 331#[derive(Debug, Clone, Serialize)] 332pub struct RepoCollaboratorUpdate { 333 op: AclOp, 334 subject: AccountDid, 335 repo: RepoDid, 336} 337 338impl RepoCollaboratorUpdate { 339 pub fn added(subject: AccountDid, repo: RepoDid) -> Self { 340 Self { 341 op: AclOp::Add, 342 subject, 343 repo, 344 } 345 } 346 347 pub fn removed(subject: AccountDid, repo: RepoDid) -> Self { 348 Self { 349 op: AclOp::Remove, 350 subject, 351 repo, 352 } 353 } 354} 355 356impl Publish for RepoCollaboratorUpdate { 357 const NSID: &'static str = "sh.tangled.repo.collaboratorUpdate"; 358} 359 360struct Ring { 361 events: VecDeque<Event>, 362 last_micros: u64, 363 pending: BTreeSet<EventCursor>, 364} 365 366struct Inner { 367 capacity: usize, 368 ring: Mutex<Ring>, 369 head: watch::Sender<EventCursor>, 370} 371 372impl Inner { 373 fn lock(&self) -> std::sync::MutexGuard<'_, Ring> { 374 self.ring 375 .lock() 376 .unwrap_or_else(|poisoned| poisoned.into_inner()) 377 } 378 379 fn settle(&self, ring: std::sync::MutexGuard<'_, Ring>) { 380 let head = stable_head(&ring); 381 drop(ring); 382 self.head.send_if_modified(|current| { 383 let changed = *current != head; 384 *current = head; 385 changed 386 }); 387 } 388} 389 390fn stable_head(ring: &Ring) -> EventCursor { 391 let stable = match ring.pending.iter().next().copied() { 392 Some(horizon) => ring.events.partition_point(|event| event.created < horizon), 393 None => ring.events.len(), 394 }; 395 stable 396 .checked_sub(1) 397 .and_then(|index| ring.events.get(index)) 398 .map(|event| event.created) 399 .unwrap_or(EventCursor::START) 400} 401 402fn insert_sorted(ring: &mut Ring, event: Event, capacity: usize) { 403 let position = ring 404 .events 405 .partition_point(|existing| existing.created < event.created); 406 ring.events.insert(position, event); 407 while ring.events.len() > capacity { 408 ring.events.pop_front(); 409 } 410} 411 412pub struct Seed { 413 seconds: i64, 414 order: u64, 415 nsid: &'static str, 416 payload: Value, 417} 418 419impl Seed { 420 pub fn from_event<P: Publish>(seconds: i64, order: u64, payload: &P) -> Self { 421 Self { 422 seconds, 423 order, 424 nsid: P::NSID, 425 payload: serde_json::to_value(payload).expect("event payload serializes to JSON"), 426 } 427 } 428} 429 430fn end_of_second_micros(seconds: i64) -> u64 { 431 (seconds.max(0) as u64) 432 .saturating_mul(1_000_000) 433 .saturating_add(999_999) 434} 435 436pub struct EventLog<C> { 437 clock: C, 438 inner: Arc<Inner>, 439} 440 441impl<C: Clock> EventLog<C> { 442 pub fn new(clock: C, capacity: usize) -> Self { 443 Self { 444 clock, 445 inner: Arc::new(Inner { 446 capacity: capacity.max(1), 447 ring: Mutex::new(Ring { 448 events: VecDeque::new(), 449 last_micros: 0, 450 pending: BTreeSet::new(), 451 }), 452 head: watch::Sender::new(EventCursor::START), 453 }), 454 } 455 } 456 457 fn next_cursor(&self, ring: &mut Ring) -> (u64, EventCursor) { 458 let micros = self.clock.now_unix_micros().get().max(ring.last_micros + 1); 459 ring.last_micros = micros; 460 (micros, EventCursor::from_unix_micros(micros)) 461 } 462 463 pub fn publish<P: Publish>(&self, payload: &P) -> EventCursor { 464 let payload = serde_json::to_value(payload).expect("event payload serializes to JSON"); 465 let mut ring = self.inner.lock(); 466 let (micros, created) = self.next_cursor(&mut ring); 467 insert_sorted( 468 &mut ring, 469 Event { 470 rkey: Tid::from_time(micros, 0), 471 nsid: P::NSID, 472 payload, 473 created, 474 }, 475 self.inner.capacity, 476 ); 477 self.inner.settle(ring); 478 created 479 } 480 481 pub fn capacity(&self) -> usize { 482 self.inner.capacity 483 } 484 485 pub fn seed(&self, seeds: Vec<Seed>) { 486 let mut seeds = seeds; 487 seeds.sort_by(|left, right| { 488 left.seconds 489 .cmp(&right.seconds) 490 .then_with(|| left.order.cmp(&right.order)) 491 }); 492 let mut ring = self.inner.lock(); 493 let mut floor = 0u64; 494 seeds.into_iter().for_each(|seed| { 495 let micros = end_of_second_micros(seed.seconds).max(floor + 1); 496 floor = micros; 497 insert_sorted( 498 &mut ring, 499 Event { 500 rkey: Tid::from_time(micros, 0), 501 nsid: seed.nsid, 502 payload: seed.payload, 503 created: EventCursor::from_unix_micros(micros), 504 }, 505 self.inner.capacity, 506 ); 507 }); 508 ring.last_micros = ring.last_micros.max(floor); 509 self.inner.settle(ring); 510 } 511 512 pub fn reserve(&self) -> Reservation { 513 let mut ring = self.inner.lock(); 514 let (micros, cursor) = self.next_cursor(&mut ring); 515 ring.pending.insert(cursor); 516 drop(ring); 517 Reservation { 518 inner: Arc::clone(&self.inner), 519 cursor, 520 micros, 521 fulfilled: false, 522 } 523 } 524 525 pub fn replay(&self, after: EventCursor, limit: usize) -> Vec<Event> { 526 let ring = self.inner.lock(); 527 let horizon = ring.pending.iter().next().copied(); 528 ring.events 529 .iter() 530 .filter(|event| { 531 event.created > after && horizon.is_none_or(|horizon| event.created < horizon) 532 }) 533 .take(limit) 534 .cloned() 535 .collect() 536 } 537 538 pub fn subscribe(&self) -> watch::Receiver<EventCursor> { 539 self.inner.head.subscribe() 540 } 541} 542 543pub struct Reservation { 544 inner: Arc<Inner>, 545 cursor: EventCursor, 546 micros: u64, 547 fulfilled: bool, 548} 549 550impl Reservation { 551 pub fn cursor(&self) -> EventCursor { 552 self.cursor 553 } 554 555 pub fn fulfill<P: Publish>(mut self, payload: &P) { 556 let payload = serde_json::to_value(payload).expect("event payload serializes to JSON"); 557 let mut ring = self.inner.lock(); 558 insert_sorted( 559 &mut ring, 560 Event { 561 rkey: Tid::from_time(self.micros, 0), 562 nsid: P::NSID, 563 payload, 564 created: self.cursor, 565 }, 566 self.inner.capacity, 567 ); 568 ring.pending.remove(&self.cursor); 569 self.inner.settle(ring); 570 self.fulfilled = true; 571 } 572} 573 574impl Drop for Reservation { 575 fn drop(&mut self) { 576 if self.fulfilled { 577 return; 578 } 579 let mut ring = self.inner.lock(); 580 ring.pending.remove(&self.cursor); 581 self.inner.settle(ring); 582 } 583} 584 585pub struct SubscriberGate { 586 global: Arc<Semaphore>, 587 per_peer_max: usize, 588 peers: Mutex<HashMap<IpAddr, usize>>, 589} 590 591impl SubscriberGate { 592 pub fn new(global_max: usize, per_peer_max: usize) -> Self { 593 Self { 594 global: Arc::new(Semaphore::new(global_max.max(1))), 595 per_peer_max: per_peer_max.max(1), 596 peers: Mutex::new(HashMap::new()), 597 } 598 } 599 600 pub fn try_admit(self: &Arc<Self>, peer: IpAddr) -> Option<SubscriberPermit> { 601 let global = Arc::clone(&self.global).try_acquire_owned().ok()?; 602 let mut peers = self 603 .peers 604 .lock() 605 .unwrap_or_else(|poisoned| poisoned.into_inner()); 606 let current = peers.get(&peer).copied().unwrap_or(0); 607 if current >= self.per_peer_max { 608 return None; 609 } 610 peers.insert(peer, current + 1); 611 Some(SubscriberPermit { 612 _global: global, 613 gate: Arc::clone(self), 614 peer, 615 }) 616 } 617} 618 619pub struct SubscriberPermit { 620 _global: OwnedSemaphorePermit, 621 gate: Arc<SubscriberGate>, 622 peer: IpAddr, 623} 624 625impl Drop for SubscriberPermit { 626 fn drop(&mut self) { 627 let mut peers = self 628 .gate 629 .peers 630 .lock() 631 .unwrap_or_else(|poisoned| poisoned.into_inner()); 632 if let Some(count) = peers.get_mut(&self.peer) { 633 *count -= 1; 634 if *count == 0 { 635 peers.remove(&self.peer); 636 } 637 } 638 } 639} 640 641#[cfg(test)] 642mod tests { 643 use super::*; 644 645 use knot_runtime::{ManualClock, UnixMicros}; 646 647 fn log(capacity: usize) -> EventLog<ManualClock> { 648 EventLog::new( 649 ManualClock::new(UnixMicros::new(1_700_000_000_000_000)), 650 capacity, 651 ) 652 } 653 654 fn update() -> GitRefUpdate { 655 GitRefUpdate::new( 656 RepoDid::new("did:plc:limpet").unwrap(), 657 Some(OwnerDid::new("did:web:olaren.dev").unwrap()), 658 AccountDid::new("did:plc:nel").unwrap(), 659 ) 660 } 661 662 #[test] 663 fn a_frozen_clock_still_yields_strictly_increasing_cursors_and_distinct_rkeys() { 664 let log = log(8); 665 let cursors: Vec<_> = (0..3).map(|_| log.publish(&update())).collect(); 666 assert!(cursors.windows(2).all(|pair| pair[0] < pair[1])); 667 let events = log.replay(EventCursor::START, 8); 668 let rkeys: std::collections::BTreeSet<_> = events 669 .iter() 670 .map(|event| event.rkey.as_str().to_string()) 671 .collect(); 672 assert_eq!(rkeys.len(), 3); 673 } 674 675 #[test] 676 fn the_ring_evicts_the_oldest_event_past_capacity() { 677 let log = log(2); 678 let first = log.publish(&update()); 679 log.publish(&update()); 680 log.publish(&update()); 681 let replayed = log.replay(EventCursor::START, 8); 682 assert_eq!(replayed.len(), 2); 683 assert!(replayed.iter().all(|event| event.created > first)); 684 } 685 686 #[test] 687 fn replay_honors_the_cursor_and_the_limit() { 688 let log = log(8); 689 let cursors: Vec<_> = (0..4).map(|_| log.publish(&update())).collect(); 690 let after_second = log.replay(cursors[1], 8); 691 assert_eq!( 692 after_second 693 .iter() 694 .map(|event| event.created) 695 .collect::<Vec<_>>(), 696 cursors[2..].to_vec() 697 ); 698 assert_eq!(log.replay(EventCursor::START, 2).len(), 2); 699 assert!(log.replay(cursors[3], 8).is_empty()); 700 } 701 702 #[test] 703 fn seeded_events_reach_a_stale_cursor_consumer_after_restart() { 704 let log = log(8); 705 log.seed(vec![ 706 Seed::from_event(1_000, 0, &update()), 707 Seed::from_event(2_000, 1, &update()), 708 ]); 709 let live = log.publish(&update()); 710 let all = log.replay(EventCursor::START, 8); 711 assert_eq!( 712 all.len(), 713 3, 714 "consumer at start of stream sees both reconstructed events and live one" 715 ); 716 assert!( 717 all[0].created < all[1].created && all[1].created < all[2].created, 718 "seeded events keep their derived order" 719 ); 720 assert_eq!( 721 all[2].created, live, 722 "post-restart live event sorts after every reconstructed one" 723 ); 724 assert!( 725 all[0].created > EventCursor::START, 726 "reconstructed event carries wall-second of push it replays" 727 ); 728 assert_eq!(all[0].nsid, "sh.tangled.git.refUpdate"); 729 let after_first = log.replay(all[0].created, 8); 730 assert_eq!( 731 after_first.len(), 732 2, 733 "consumer whose durable cursor sits at first reconstructed event receives the rest" 734 ); 735 assert_eq!(after_first[0].created, all[1].created); 736 } 737 738 #[test] 739 fn a_seed_for_an_old_second_is_not_redelivered_to_a_caught_up_consumer() { 740 let log = log(8); 741 let live = log.publish(&update()); 742 let drained = log.replay(EventCursor::START, 8); 743 assert_eq!(drained.len(), 1); 744 let cursor = drained[0].created; 745 assert_eq!(cursor, live); 746 log.seed(vec![Seed::from_event(1_000, 0, &update())]); 747 assert!( 748 log.replay(cursor, 8).is_empty(), 749 "seed whose push second predates caught-up consumer's cursor is not redelivered, \ 750 so restart never replays whole window to consumer that already drained it" 751 ); 752 assert_eq!( 753 log.replay(EventCursor::START, 8).len(), 754 2, 755 "consumer still at start of stream recovers it" 756 ); 757 } 758 759 #[test] 760 fn a_boundary_second_seed_reaches_a_drained_consumer() { 761 let log = log(8); 762 let live = log.publish(&update()); 763 let cursor = log.replay(EventCursor::START, 8)[0].created; 764 assert_eq!(cursor, live); 765 log.seed(vec![Seed::from_event(1_700_000_000, 0, &update())]); 766 let after = log.replay(cursor, 8); 767 assert_eq!( 768 after.len(), 769 1, 770 "seed in same wall-second as consumer's cursor lands at end of that \ 771 second, above cursor, so genuinely missed pipeline near restart still arrives" 772 ); 773 assert!(after[0].created > cursor); 774 } 775 776 #[test] 777 fn a_live_event_sorts_after_a_seed_in_the_same_wall_second() { 778 let log = log(8); 779 log.seed(vec![Seed::from_event(1_700_000_000, 0, &update())]); 780 let live = log.publish(&update()); 781 let all = log.replay(EventCursor::START, 8); 782 assert_eq!(all.len(), 2); 783 assert!( 784 all[0].created < live, 785 "seeding bumps cursor floor so live event never sorts below same-second seed" 786 ); 787 assert_eq!( 788 all[1].created, live, 789 "post-seed live event sorts after seed even when clock reads earlier micro" 790 ); 791 } 792 793 #[test] 794 fn a_subscriber_observes_the_head_advance() { 795 let log = log(8); 796 let mut head = log.subscribe(); 797 assert_eq!(*head.borrow_and_update(), EventCursor::START); 798 let created = log.publish(&update()); 799 assert!(head.has_changed().unwrap()); 800 assert_eq!(*head.borrow_and_update(), created); 801 } 802 803 #[test] 804 fn the_wire_event_matches_the_eventstream_shape() { 805 let log = log(8); 806 log.publish(&update()); 807 let event = log.replay(EventCursor::START, 1).remove(0); 808 let wire = serde_json::to_value(&event).unwrap(); 809 assert_eq!(wire["nsid"], "sh.tangled.git.refUpdate"); 810 assert_eq!(wire["created"], serde_json::json!(event.created.get())); 811 assert_eq!(event.created.get() % 1_000, 0); 812 assert_eq!(wire["rkey"].as_str().unwrap().len(), 13); 813 let payload = &wire["event"]; 814 assert_eq!(payload["$type"], "sh.tangled.git.refUpdate"); 815 assert_eq!(payload["committerDid"], "did:plc:nel"); 816 assert_eq!(payload["ownerDid"], "did:web:olaren.dev"); 817 assert_eq!(payload["repo"], "did:plc:limpet"); 818 assert_eq!(payload["meta"], serde_json::Value::Null); 819 assert_eq!(payload["newSha"], ""); 820 assert_eq!(payload["oldSha"], ""); 821 assert_eq!(payload["ref"], ""); 822 } 823 824 fn peer(last: u8) -> IpAddr { 825 IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, last)) 826 } 827 828 #[test] 829 fn the_global_cap_bounds_total_subscribers_across_peers() { 830 let gate = Arc::new(SubscriberGate::new(2, 8)); 831 let first = gate 832 .try_admit(peer(1)) 833 .expect("first subscriber is admitted"); 834 let _second = gate 835 .try_admit(peer(2)) 836 .expect("second subscriber is admitted"); 837 assert!( 838 gate.try_admit(peer(3)).is_none(), 839 "third subscriber is refused once global cap is reached" 840 ); 841 drop(first); 842 assert!( 843 gate.try_admit(peer(3)).is_some(), 844 "freeing global slot admits waiting subscriber" 845 ); 846 } 847 848 #[test] 849 fn the_per_peer_cap_refuses_a_single_flooding_peer() { 850 let gate = Arc::new(SubscriberGate::new(16, 2)); 851 let _first = gate.try_admit(peer(1)).expect("first socket is admitted"); 852 let second = gate.try_admit(peer(1)).expect("second socket is admitted"); 853 assert!( 854 gate.try_admit(peer(1)).is_none(), 855 "third socket from same peer is refused at per-peer cap" 856 ); 857 assert!( 858 gate.try_admit(peer(2)).is_some(), 859 "different peer keeps its own budget" 860 ); 861 drop(second); 862 assert!( 863 gate.try_admit(peer(1)).is_some(), 864 "freed per-peer slot is reusable" 865 ); 866 } 867 868 #[test] 869 fn a_drained_peer_leaves_no_entry_behind() { 870 let gate = Arc::new(SubscriberGate::new(16, 2)); 871 let permit = gate.try_admit(peer(1)).expect("admitted"); 872 drop(permit); 873 assert!( 874 gate.peers 875 .lock() 876 .unwrap_or_else(|poisoned| poisoned.into_inner()) 877 .is_empty(), 878 "peer map prunes peer once its last socket closes" 879 ); 880 } 881 882 #[test] 883 fn a_ref_update_carries_its_computed_meta_on_the_wire() { 884 let log = log(8); 885 log.publish(&update().with_meta(RefUpdateMeta::new( 886 true, 887 vec![("nel@oyster.cafe".to_string(), 3)], 888 vec![("Rust".to_string(), 1234)], 889 ))); 890 let event = log.replay(EventCursor::START, 1).remove(0); 891 let meta = &serde_json::to_value(&event).unwrap()["event"]["meta"]; 892 assert_eq!(meta["isDefaultRef"], true); 893 assert_eq!( 894 meta["commitCount"]["byEmail"][0]["email"], 895 "nel@oyster.cafe" 896 ); 897 assert_eq!(meta["commitCount"]["byEmail"][0]["count"], 3); 898 assert_eq!(meta["langBreakdown"]["inputs"][0]["lang"], "Rust"); 899 assert_eq!(meta["langBreakdown"]["inputs"][0]["size"], 1234); 900 } 901 902 #[test] 903 fn an_empty_breakdown_omits_the_optional_meta_arrays() { 904 let log = log(8); 905 log.publish(&update().with_meta(RefUpdateMeta::new(false, Vec::new(), Vec::new()))); 906 let event = log.replay(EventCursor::START, 1).remove(0); 907 let meta = &serde_json::to_value(&event).unwrap()["event"]["meta"]; 908 assert_eq!(meta["isDefaultRef"], false); 909 assert!(meta["commitCount"].get("byEmail").is_none()); 910 assert!(meta.get("langBreakdown").is_none()); 911 } 912 913 #[test] 914 fn a_pipeline_event_matches_the_lexicon_shape() { 915 let log = log(8); 916 let trigger = TriggerMetadata::push( 917 PushTriggerData::new( 918 RefName::new("refs/heads/main").unwrap(), 919 Oid::null(), 920 Oid::from_hex("3333333333333333333333333333333333333333").unwrap(), 921 ), 922 TriggerRepo::new( 923 "knot.test".to_string(), 924 OwnerDid::new("did:web:olaren.dev").unwrap(), 925 Some(RepoDid::new("did:plc:limpet").unwrap()), 926 Some("anemone".to_string()), 927 "main".to_string(), 928 ), 929 ); 930 let workflows = vec![WorkflowSpec::new( 931 "ci.yml".to_string(), 932 "nixery.dev/x".to_string(), 933 CloneSpec::new(false, 1, true), 934 "engine: nixery.dev/x\n".to_string(), 935 )]; 936 log.publish(&Pipeline::new(trigger, workflows)); 937 let event = log.replay(EventCursor::START, 1).remove(0); 938 let wire = serde_json::to_value(&event).unwrap(); 939 assert_eq!(wire["nsid"], "sh.tangled.pipeline"); 940 let payload = &wire["event"]; 941 assert_eq!(payload["$type"], "sh.tangled.pipeline"); 942 let meta = &payload["triggerMetadata"]; 943 assert_eq!(meta["kind"], "push"); 944 assert_eq!(meta["push"]["ref"], "refs/heads/main"); 945 assert_eq!(meta["push"]["oldSha"], "0".repeat(40)); 946 assert_eq!(meta["push"]["newSha"], "3".repeat(40)); 947 assert_eq!(meta["repo"]["knot"], "knot.test"); 948 assert_eq!(meta["repo"]["did"], "did:web:olaren.dev"); 949 assert_eq!(meta["repo"]["repoDid"], "did:plc:limpet"); 950 assert_eq!(meta["repo"]["repo"], "anemone"); 951 assert_eq!(meta["repo"]["defaultBranch"], "main"); 952 let workflow = &payload["workflows"][0]; 953 assert_eq!(workflow["name"], "ci.yml"); 954 assert_eq!(workflow["engine"], "nixery.dev/x"); 955 assert_eq!(workflow["clone"]["depth"], 1); 956 assert_eq!(workflow["clone"]["submodules"], true); 957 assert_eq!(workflow["clone"]["skip"], false); 958 } 959 960 #[test] 961 fn a_member_update_matches_the_eventstream_shape() { 962 let log = log(8); 963 log.publish(&KnotMemberUpdate::added( 964 AccountDid::new("did:plc:nel").unwrap(), 965 )); 966 log.publish(&KnotMemberUpdate::removed( 967 AccountDid::new("did:plc:olaren").unwrap(), 968 )); 969 let events = log.replay(EventCursor::START, 8); 970 let added = serde_json::to_value(&events[0]).unwrap(); 971 assert_eq!(added["nsid"], "sh.tangled.knot.memberUpdate"); 972 assert_eq!(added["event"]["op"], "add"); 973 assert_eq!(added["event"]["subject"], "did:plc:nel"); 974 assert!(added["event"].get("$type").is_none()); 975 let removed = serde_json::to_value(&events[1]).unwrap(); 976 assert_eq!(removed["event"]["op"], "remove"); 977 assert_eq!(removed["event"]["subject"], "did:plc:olaren"); 978 } 979 980 #[test] 981 fn a_collaborator_update_matches_the_eventstream_shape() { 982 let log = log(8); 983 log.publish(&RepoCollaboratorUpdate::added( 984 AccountDid::new("did:plc:nel").unwrap(), 985 RepoDid::new("did:plc:limpet").unwrap(), 986 )); 987 log.publish(&RepoCollaboratorUpdate::removed( 988 AccountDid::new("did:plc:nel").unwrap(), 989 RepoDid::new("did:plc:limpet").unwrap(), 990 )); 991 let events = log.replay(EventCursor::START, 8); 992 let added = serde_json::to_value(&events[0]).unwrap(); 993 assert_eq!(added["nsid"], "sh.tangled.repo.collaboratorUpdate"); 994 assert_eq!(added["event"]["op"], "add"); 995 assert_eq!(added["event"]["subject"], "did:plc:nel"); 996 assert_eq!(added["event"]["repo"], "did:plc:limpet"); 997 assert!(added["event"].get("$type").is_none()); 998 let removed = serde_json::to_value(&events[1]).unwrap(); 999 assert_eq!(removed["event"]["op"], "remove"); 1000 assert_eq!(removed["event"]["repo"], "did:plc:limpet"); 1001 } 1002 1003 fn cursors(log: &EventLog<ManualClock>) -> Vec<EventCursor> { 1004 log.replay(EventCursor::START, 64) 1005 .iter() 1006 .map(|event| event.created) 1007 .collect() 1008 } 1009 1010 #[test] 1011 fn a_reservation_holds_back_later_events_until_it_is_fulfilled() { 1012 let log = log(8); 1013 let early = log.publish(&update()); 1014 let reservation = log.reserve(); 1015 let later = log.publish(&update()); 1016 assert!(reservation.cursor() > early && reservation.cursor() < later); 1017 assert_eq!( 1018 cursors(&log), 1019 vec![early], 1020 "event published after reservation waits behind it" 1021 ); 1022 let mid = reservation.cursor(); 1023 reservation.fulfill(&update()); 1024 assert_eq!( 1025 cursors(&log), 1026 vec![early, mid, later], 1027 "fulfilling reservation releases it and event queued behind it, in cursor order" 1028 ); 1029 } 1030 1031 #[test] 1032 fn out_of_order_fulfillment_still_replays_in_cursor_order() { 1033 let log = log(8); 1034 let first = log.reserve(); 1035 let second = log.reserve(); 1036 let (c1, c2) = (first.cursor(), second.cursor()); 1037 assert!(c1 < c2); 1038 second.fulfill(&update()); 1039 assert!( 1040 cursors(&log).is_empty(), 1041 "later reservation stays hidden while earlier one is outstanding" 1042 ); 1043 first.fulfill(&update()); 1044 assert_eq!( 1045 cursors(&log), 1046 vec![c1, c2], 1047 "both surface in cursor order regardless of fulfillment order" 1048 ); 1049 } 1050 1051 #[test] 1052 fn a_dropped_reservation_unblocks_the_horizon_without_an_event() { 1053 let log = log(8); 1054 let reservation = log.reserve(); 1055 let later = log.publish(&update()); 1056 assert!( 1057 cursors(&log).is_empty(), 1058 "later event waits behind unfulfilled reservation" 1059 ); 1060 drop(reservation); 1061 assert_eq!( 1062 cursors(&log), 1063 vec![later], 1064 "dropping reservation surfaces queued event and leaves no gap" 1065 ); 1066 } 1067 1068 #[test] 1069 fn the_head_holds_at_the_last_stable_event_until_a_reservation_is_fulfilled() { 1070 let log = log(8); 1071 let mut head = log.subscribe(); 1072 let early = log.publish(&update()); 1073 assert_eq!(*head.borrow_and_update(), early); 1074 let reservation = log.reserve(); 1075 let later = log.publish(&update()); 1076 assert_eq!( 1077 *head.borrow_and_update(), 1078 early, 1079 "head holds while lower-cursor reservation is pending" 1080 ); 1081 reservation.fulfill(&update()); 1082 assert_eq!( 1083 *head.borrow_and_update(), 1084 later, 1085 "fulfilling reservation advances head past released events" 1086 ); 1087 } 1088 1089 #[test] 1090 fn an_anonymous_owner_is_omitted_from_the_wire() { 1091 let log = log(8); 1092 log.publish(&GitRefUpdate::new( 1093 RepoDid::new("did:plc:limpet").unwrap(), 1094 None, 1095 AccountDid::new("did:plc:nel").unwrap(), 1096 )); 1097 let event = log.replay(EventCursor::START, 1).remove(0); 1098 let wire = serde_json::to_value(&event).unwrap(); 1099 assert!(wire["event"].get("ownerDid").is_none()); 1100 } 1101}