Now let's take a silly one
1use std::collections::{BTreeSet, HashMap, VecDeque};
2use std::fmt::Display;
3use std::net::IpAddr;
4use std::sync::{Arc, Mutex};
5
6use serde::{Serialize, Serializer};
7use serde_json::Value;
8use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
9
10use knot_runtime::Clock;
11use knot_types::{AccountDid, Oid, OwnerDid, RefName, RepoDid, Tid};
12
13pub const RECONSTRUCT_WINDOW_SECS: i64 = 30 * 24 * 60 * 60;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
16#[serde(transparent)]
17pub struct EventCursor(i64);
18
19impl EventCursor {
20 pub const START: Self = Self(0);
21
22 pub fn new(nanos: i64) -> Self {
23 Self(nanos)
24 }
25
26 pub fn get(self) -> i64 {
27 self.0
28 }
29
30 fn from_unix_micros(micros: u64) -> Self {
31 Self((micros as i64).saturating_mul(1_000))
32 }
33}
34
35pub trait Publish: Serialize {
36 const NSID: &'static str;
37}
38
39#[derive(Debug, Clone, Serialize)]
40pub struct Event {
41 pub rkey: Tid,
42 pub nsid: &'static str,
43 #[serde(rename = "event")]
44 pub payload: Value,
45 pub created: EventCursor,
46}
47
48fn or_empty<T: Display, S: Serializer>(
49 value: &Option<T>,
50 serializer: S,
51) -> Result<S::Ok, S::Error> {
52 match value {
53 Some(inner) => serializer.collect_str(inner),
54 None => serializer.serialize_str(""),
55 }
56}
57
58fn via_display<T: Display, S: Serializer>(value: &T, serializer: S) -> Result<S::Ok, S::Error> {
59 serializer.collect_str(value)
60}
61
62#[derive(Debug, Clone, Serialize)]
63pub struct GitRefUpdate {
64 #[serde(rename = "$type")]
65 record_type: &'static str,
66 #[serde(rename = "committerDid")]
67 committer_did: AccountDid,
68 meta: Option<RefUpdateMeta>,
69 #[serde(rename = "newSha", serialize_with = "or_empty")]
70 new_sha: Option<Oid>,
71 #[serde(rename = "oldSha", serialize_with = "or_empty")]
72 old_sha: Option<Oid>,
73 #[serde(rename = "ownerDid", skip_serializing_if = "Option::is_none")]
74 owner_did: Option<OwnerDid>,
75 #[serde(rename = "ref", serialize_with = "or_empty")]
76 ref_name: Option<RefName>,
77 repo: RepoDid,
78}
79
80impl GitRefUpdate {
81 pub fn new(repo: RepoDid, owner: Option<OwnerDid>, committer: AccountDid) -> Self {
82 Self {
83 record_type: Self::NSID,
84 committer_did: committer,
85 meta: None,
86 new_sha: None,
87 old_sha: None,
88 owner_did: owner,
89 ref_name: None,
90 repo,
91 }
92 }
93
94 pub fn on_ref(mut self, ref_name: RefName, old: Option<Oid>, new: Option<Oid>) -> Self {
95 self.ref_name = Some(ref_name);
96 self.old_sha = old;
97 self.new_sha = new;
98 self
99 }
100
101 pub fn with_meta(mut self, meta: RefUpdateMeta) -> Self {
102 self.meta = Some(meta);
103 self
104 }
105}
106
107impl Publish for GitRefUpdate {
108 const NSID: &'static str = "sh.tangled.git.refUpdate";
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct RefUpdateMeta {
113 #[serde(rename = "isDefaultRef")]
114 is_default_ref: bool,
115 #[serde(rename = "commitCount")]
116 commit_count: CommitCountBreakdown,
117 #[serde(rename = "langBreakdown", skip_serializing_if = "Option::is_none")]
118 lang_breakdown: Option<LangBreakdown>,
119}
120
121impl RefUpdateMeta {
122 pub fn new(
123 is_default_ref: bool,
124 by_email: Vec<(String, i64)>,
125 languages: Vec<(String, i64)>,
126 ) -> Self {
127 let by_email = by_email
128 .into_iter()
129 .map(|(email, count)| EmailCommitCount { email, count })
130 .collect::<Vec<_>>();
131 let inputs = languages
132 .into_iter()
133 .map(|(lang, size)| LanguageSize { lang, size })
134 .collect::<Vec<_>>();
135 Self {
136 is_default_ref,
137 commit_count: CommitCountBreakdown {
138 by_email: (!by_email.is_empty()).then_some(by_email),
139 },
140 lang_breakdown: (!inputs.is_empty()).then_some(LangBreakdown {
141 inputs: Some(inputs),
142 }),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize)]
148struct CommitCountBreakdown {
149 #[serde(rename = "byEmail", skip_serializing_if = "Option::is_none")]
150 by_email: Option<Vec<EmailCommitCount>>,
151}
152
153#[derive(Debug, Clone, Serialize)]
154struct EmailCommitCount {
155 email: String,
156 count: i64,
157}
158
159#[derive(Debug, Clone, Serialize)]
160struct LangBreakdown {
161 #[serde(skip_serializing_if = "Option::is_none")]
162 inputs: Option<Vec<LanguageSize>>,
163}
164
165#[derive(Debug, Clone, Serialize)]
166struct LanguageSize {
167 lang: String,
168 size: i64,
169}
170
171#[derive(Debug, Clone, Serialize)]
172pub struct Pipeline {
173 #[serde(rename = "$type")]
174 record_type: &'static str,
175 #[serde(rename = "triggerMetadata")]
176 trigger_metadata: TriggerMetadata,
177 workflows: Vec<WorkflowSpec>,
178}
179
180impl Pipeline {
181 pub fn new(trigger_metadata: TriggerMetadata, workflows: Vec<WorkflowSpec>) -> Self {
182 Self {
183 record_type: Self::NSID,
184 trigger_metadata,
185 workflows,
186 }
187 }
188}
189
190impl Publish for Pipeline {
191 const NSID: &'static str = "sh.tangled.pipeline";
192}
193
194#[derive(Debug, Clone, Serialize)]
195pub struct TriggerMetadata {
196 kind: &'static str,
197 #[serde(skip_serializing_if = "Option::is_none")]
198 push: Option<PushTriggerData>,
199 repo: TriggerRepo,
200}
201
202impl TriggerMetadata {
203 pub fn push(push: PushTriggerData, repo: TriggerRepo) -> Self {
204 Self {
205 kind: "push",
206 push: Some(push),
207 repo,
208 }
209 }
210}
211
212#[derive(Debug, Clone, Serialize)]
213pub struct PushTriggerData {
214 #[serde(rename = "ref", serialize_with = "via_display")]
215 ref_name: RefName,
216 #[serde(rename = "newSha", serialize_with = "via_display")]
217 new_sha: Oid,
218 #[serde(rename = "oldSha", serialize_with = "via_display")]
219 old_sha: Oid,
220}
221
222impl PushTriggerData {
223 pub fn new(ref_name: RefName, old_sha: Oid, new_sha: Oid) -> Self {
224 Self {
225 ref_name,
226 new_sha,
227 old_sha,
228 }
229 }
230}
231
232#[derive(Debug, Clone, Serialize)]
233pub struct TriggerRepo {
234 knot: String,
235 did: OwnerDid,
236 #[serde(rename = "repoDid", skip_serializing_if = "Option::is_none")]
237 repo_did: Option<RepoDid>,
238 #[serde(skip_serializing_if = "Option::is_none")]
239 repo: Option<String>,
240 #[serde(rename = "defaultBranch")]
241 default_branch: String,
242}
243
244impl TriggerRepo {
245 pub fn new(
246 knot: String,
247 did: OwnerDid,
248 repo_did: Option<RepoDid>,
249 repo: Option<String>,
250 default_branch: String,
251 ) -> Self {
252 Self {
253 knot,
254 did,
255 repo_did,
256 repo,
257 default_branch,
258 }
259 }
260}
261
262#[derive(Debug, Clone, Serialize)]
263pub struct WorkflowSpec {
264 name: String,
265 engine: String,
266 clone: CloneSpec,
267 raw: String,
268}
269
270impl WorkflowSpec {
271 pub fn new(name: String, engine: String, clone: CloneSpec, raw: String) -> Self {
272 Self {
273 name,
274 engine,
275 clone,
276 raw,
277 }
278 }
279}
280
281#[derive(Debug, Clone, Serialize)]
282pub struct CloneSpec {
283 skip: bool,
284 depth: i64,
285 submodules: bool,
286}
287
288impl CloneSpec {
289 pub fn new(skip: bool, depth: i64, submodules: bool) -> Self {
290 Self {
291 skip,
292 depth,
293 submodules,
294 }
295 }
296}
297
298#[derive(Debug, Clone, Copy, Serialize)]
299#[serde(rename_all = "lowercase")]
300enum AclOp {
301 Add,
302 Remove,
303}
304
305#[derive(Debug, Clone, Serialize)]
306pub struct KnotMemberUpdate {
307 op: AclOp,
308 subject: AccountDid,
309}
310
311impl KnotMemberUpdate {
312 pub fn added(subject: AccountDid) -> Self {
313 Self {
314 op: AclOp::Add,
315 subject,
316 }
317 }
318
319 pub fn removed(subject: AccountDid) -> Self {
320 Self {
321 op: AclOp::Remove,
322 subject,
323 }
324 }
325}
326
327impl Publish for KnotMemberUpdate {
328 const NSID: &'static str = "sh.tangled.knot.memberUpdate";
329}
330
331#[derive(Debug, Clone, Serialize)]
332pub struct RepoCollaboratorUpdate {
333 op: AclOp,
334 subject: AccountDid,
335 repo: RepoDid,
336}
337
338impl RepoCollaboratorUpdate {
339 pub fn added(subject: AccountDid, repo: RepoDid) -> Self {
340 Self {
341 op: AclOp::Add,
342 subject,
343 repo,
344 }
345 }
346
347 pub fn removed(subject: AccountDid, repo: RepoDid) -> Self {
348 Self {
349 op: AclOp::Remove,
350 subject,
351 repo,
352 }
353 }
354}
355
356impl Publish for RepoCollaboratorUpdate {
357 const NSID: &'static str = "sh.tangled.repo.collaboratorUpdate";
358}
359
360struct Ring {
361 events: VecDeque<Event>,
362 last_micros: u64,
363 pending: BTreeSet<EventCursor>,
364}
365
366struct Inner {
367 capacity: usize,
368 ring: Mutex<Ring>,
369 head: watch::Sender<EventCursor>,
370}
371
372impl Inner {
373 fn lock(&self) -> std::sync::MutexGuard<'_, Ring> {
374 self.ring
375 .lock()
376 .unwrap_or_else(|poisoned| poisoned.into_inner())
377 }
378
379 fn settle(&self, ring: std::sync::MutexGuard<'_, Ring>) {
380 let head = stable_head(&ring);
381 drop(ring);
382 self.head.send_if_modified(|current| {
383 let changed = *current != head;
384 *current = head;
385 changed
386 });
387 }
388}
389
390fn stable_head(ring: &Ring) -> EventCursor {
391 let stable = match ring.pending.iter().next().copied() {
392 Some(horizon) => ring.events.partition_point(|event| event.created < horizon),
393 None => ring.events.len(),
394 };
395 stable
396 .checked_sub(1)
397 .and_then(|index| ring.events.get(index))
398 .map(|event| event.created)
399 .unwrap_or(EventCursor::START)
400}
401
402fn insert_sorted(ring: &mut Ring, event: Event, capacity: usize) {
403 let position = ring
404 .events
405 .partition_point(|existing| existing.created < event.created);
406 ring.events.insert(position, event);
407 while ring.events.len() > capacity {
408 ring.events.pop_front();
409 }
410}
411
412pub struct Seed {
413 seconds: i64,
414 order: u64,
415 nsid: &'static str,
416 payload: Value,
417}
418
419impl Seed {
420 pub fn from_event<P: Publish>(seconds: i64, order: u64, payload: &P) -> Self {
421 Self {
422 seconds,
423 order,
424 nsid: P::NSID,
425 payload: serde_json::to_value(payload).expect("event payload serializes to JSON"),
426 }
427 }
428}
429
430fn end_of_second_micros(seconds: i64) -> u64 {
431 (seconds.max(0) as u64)
432 .saturating_mul(1_000_000)
433 .saturating_add(999_999)
434}
435
436pub struct EventLog<C> {
437 clock: C,
438 inner: Arc<Inner>,
439}
440
441impl<C: Clock> EventLog<C> {
442 pub fn new(clock: C, capacity: usize) -> Self {
443 Self {
444 clock,
445 inner: Arc::new(Inner {
446 capacity: capacity.max(1),
447 ring: Mutex::new(Ring {
448 events: VecDeque::new(),
449 last_micros: 0,
450 pending: BTreeSet::new(),
451 }),
452 head: watch::Sender::new(EventCursor::START),
453 }),
454 }
455 }
456
457 fn next_cursor(&self, ring: &mut Ring) -> (u64, EventCursor) {
458 let micros = self.clock.now_unix_micros().get().max(ring.last_micros + 1);
459 ring.last_micros = micros;
460 (micros, EventCursor::from_unix_micros(micros))
461 }
462
463 pub fn publish<P: Publish>(&self, payload: &P) -> EventCursor {
464 let payload = serde_json::to_value(payload).expect("event payload serializes to JSON");
465 let mut ring = self.inner.lock();
466 let (micros, created) = self.next_cursor(&mut ring);
467 insert_sorted(
468 &mut ring,
469 Event {
470 rkey: Tid::from_time(micros, 0),
471 nsid: P::NSID,
472 payload,
473 created,
474 },
475 self.inner.capacity,
476 );
477 self.inner.settle(ring);
478 created
479 }
480
481 pub fn capacity(&self) -> usize {
482 self.inner.capacity
483 }
484
485 pub fn seed(&self, seeds: Vec<Seed>) {
486 let mut seeds = seeds;
487 seeds.sort_by(|left, right| {
488 left.seconds
489 .cmp(&right.seconds)
490 .then_with(|| left.order.cmp(&right.order))
491 });
492 let mut ring = self.inner.lock();
493 let mut floor = 0u64;
494 seeds.into_iter().for_each(|seed| {
495 let micros = end_of_second_micros(seed.seconds).max(floor + 1);
496 floor = micros;
497 insert_sorted(
498 &mut ring,
499 Event {
500 rkey: Tid::from_time(micros, 0),
501 nsid: seed.nsid,
502 payload: seed.payload,
503 created: EventCursor::from_unix_micros(micros),
504 },
505 self.inner.capacity,
506 );
507 });
508 ring.last_micros = ring.last_micros.max(floor);
509 self.inner.settle(ring);
510 }
511
512 pub fn reserve(&self) -> Reservation {
513 let mut ring = self.inner.lock();
514 let (micros, cursor) = self.next_cursor(&mut ring);
515 ring.pending.insert(cursor);
516 drop(ring);
517 Reservation {
518 inner: Arc::clone(&self.inner),
519 cursor,
520 micros,
521 fulfilled: false,
522 }
523 }
524
525 pub fn replay(&self, after: EventCursor, limit: usize) -> Vec<Event> {
526 let ring = self.inner.lock();
527 let horizon = ring.pending.iter().next().copied();
528 ring.events
529 .iter()
530 .filter(|event| {
531 event.created > after && horizon.is_none_or(|horizon| event.created < horizon)
532 })
533 .take(limit)
534 .cloned()
535 .collect()
536 }
537
538 pub fn subscribe(&self) -> watch::Receiver<EventCursor> {
539 self.inner.head.subscribe()
540 }
541}
542
543pub struct Reservation {
544 inner: Arc<Inner>,
545 cursor: EventCursor,
546 micros: u64,
547 fulfilled: bool,
548}
549
550impl Reservation {
551 pub fn cursor(&self) -> EventCursor {
552 self.cursor
553 }
554
555 pub fn fulfill<P: Publish>(mut self, payload: &P) {
556 let payload = serde_json::to_value(payload).expect("event payload serializes to JSON");
557 let mut ring = self.inner.lock();
558 insert_sorted(
559 &mut ring,
560 Event {
561 rkey: Tid::from_time(self.micros, 0),
562 nsid: P::NSID,
563 payload,
564 created: self.cursor,
565 },
566 self.inner.capacity,
567 );
568 ring.pending.remove(&self.cursor);
569 self.inner.settle(ring);
570 self.fulfilled = true;
571 }
572}
573
574impl Drop for Reservation {
575 fn drop(&mut self) {
576 if self.fulfilled {
577 return;
578 }
579 let mut ring = self.inner.lock();
580 ring.pending.remove(&self.cursor);
581 self.inner.settle(ring);
582 }
583}
584
585pub struct SubscriberGate {
586 global: Arc<Semaphore>,
587 per_peer_max: usize,
588 peers: Mutex<HashMap<IpAddr, usize>>,
589}
590
591impl SubscriberGate {
592 pub fn new(global_max: usize, per_peer_max: usize) -> Self {
593 Self {
594 global: Arc::new(Semaphore::new(global_max.max(1))),
595 per_peer_max: per_peer_max.max(1),
596 peers: Mutex::new(HashMap::new()),
597 }
598 }
599
600 pub fn try_admit(self: &Arc<Self>, peer: IpAddr) -> Option<SubscriberPermit> {
601 let global = Arc::clone(&self.global).try_acquire_owned().ok()?;
602 let mut peers = self
603 .peers
604 .lock()
605 .unwrap_or_else(|poisoned| poisoned.into_inner());
606 let current = peers.get(&peer).copied().unwrap_or(0);
607 if current >= self.per_peer_max {
608 return None;
609 }
610 peers.insert(peer, current + 1);
611 Some(SubscriberPermit {
612 _global: global,
613 gate: Arc::clone(self),
614 peer,
615 })
616 }
617}
618
619pub struct SubscriberPermit {
620 _global: OwnedSemaphorePermit,
621 gate: Arc<SubscriberGate>,
622 peer: IpAddr,
623}
624
625impl Drop for SubscriberPermit {
626 fn drop(&mut self) {
627 let mut peers = self
628 .gate
629 .peers
630 .lock()
631 .unwrap_or_else(|poisoned| poisoned.into_inner());
632 if let Some(count) = peers.get_mut(&self.peer) {
633 *count -= 1;
634 if *count == 0 {
635 peers.remove(&self.peer);
636 }
637 }
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644
645 use knot_runtime::{ManualClock, UnixMicros};
646
647 fn log(capacity: usize) -> EventLog<ManualClock> {
648 EventLog::new(
649 ManualClock::new(UnixMicros::new(1_700_000_000_000_000)),
650 capacity,
651 )
652 }
653
654 fn update() -> GitRefUpdate {
655 GitRefUpdate::new(
656 RepoDid::new("did:plc:limpet").unwrap(),
657 Some(OwnerDid::new("did:web:olaren.dev").unwrap()),
658 AccountDid::new("did:plc:nel").unwrap(),
659 )
660 }
661
662 #[test]
663 fn a_frozen_clock_still_yields_strictly_increasing_cursors_and_distinct_rkeys() {
664 let log = log(8);
665 let cursors: Vec<_> = (0..3).map(|_| log.publish(&update())).collect();
666 assert!(cursors.windows(2).all(|pair| pair[0] < pair[1]));
667 let events = log.replay(EventCursor::START, 8);
668 let rkeys: std::collections::BTreeSet<_> = events
669 .iter()
670 .map(|event| event.rkey.as_str().to_string())
671 .collect();
672 assert_eq!(rkeys.len(), 3);
673 }
674
675 #[test]
676 fn the_ring_evicts_the_oldest_event_past_capacity() {
677 let log = log(2);
678 let first = log.publish(&update());
679 log.publish(&update());
680 log.publish(&update());
681 let replayed = log.replay(EventCursor::START, 8);
682 assert_eq!(replayed.len(), 2);
683 assert!(replayed.iter().all(|event| event.created > first));
684 }
685
686 #[test]
687 fn replay_honors_the_cursor_and_the_limit() {
688 let log = log(8);
689 let cursors: Vec<_> = (0..4).map(|_| log.publish(&update())).collect();
690 let after_second = log.replay(cursors[1], 8);
691 assert_eq!(
692 after_second
693 .iter()
694 .map(|event| event.created)
695 .collect::<Vec<_>>(),
696 cursors[2..].to_vec()
697 );
698 assert_eq!(log.replay(EventCursor::START, 2).len(), 2);
699 assert!(log.replay(cursors[3], 8).is_empty());
700 }
701
702 #[test]
703 fn seeded_events_reach_a_stale_cursor_consumer_after_restart() {
704 let log = log(8);
705 log.seed(vec![
706 Seed::from_event(1_000, 0, &update()),
707 Seed::from_event(2_000, 1, &update()),
708 ]);
709 let live = log.publish(&update());
710 let all = log.replay(EventCursor::START, 8);
711 assert_eq!(
712 all.len(),
713 3,
714 "consumer at start of stream sees both reconstructed events and live one"
715 );
716 assert!(
717 all[0].created < all[1].created && all[1].created < all[2].created,
718 "seeded events keep their derived order"
719 );
720 assert_eq!(
721 all[2].created, live,
722 "post-restart live event sorts after every reconstructed one"
723 );
724 assert!(
725 all[0].created > EventCursor::START,
726 "reconstructed event carries wall-second of push it replays"
727 );
728 assert_eq!(all[0].nsid, "sh.tangled.git.refUpdate");
729 let after_first = log.replay(all[0].created, 8);
730 assert_eq!(
731 after_first.len(),
732 2,
733 "consumer whose durable cursor sits at first reconstructed event receives the rest"
734 );
735 assert_eq!(after_first[0].created, all[1].created);
736 }
737
738 #[test]
739 fn a_seed_for_an_old_second_is_not_redelivered_to_a_caught_up_consumer() {
740 let log = log(8);
741 let live = log.publish(&update());
742 let drained = log.replay(EventCursor::START, 8);
743 assert_eq!(drained.len(), 1);
744 let cursor = drained[0].created;
745 assert_eq!(cursor, live);
746 log.seed(vec![Seed::from_event(1_000, 0, &update())]);
747 assert!(
748 log.replay(cursor, 8).is_empty(),
749 "seed whose push second predates caught-up consumer's cursor is not redelivered, \
750 so restart never replays whole window to consumer that already drained it"
751 );
752 assert_eq!(
753 log.replay(EventCursor::START, 8).len(),
754 2,
755 "consumer still at start of stream recovers it"
756 );
757 }
758
759 #[test]
760 fn a_boundary_second_seed_reaches_a_drained_consumer() {
761 let log = log(8);
762 let live = log.publish(&update());
763 let cursor = log.replay(EventCursor::START, 8)[0].created;
764 assert_eq!(cursor, live);
765 log.seed(vec![Seed::from_event(1_700_000_000, 0, &update())]);
766 let after = log.replay(cursor, 8);
767 assert_eq!(
768 after.len(),
769 1,
770 "seed in same wall-second as consumer's cursor lands at end of that \
771 second, above cursor, so genuinely missed pipeline near restart still arrives"
772 );
773 assert!(after[0].created > cursor);
774 }
775
776 #[test]
777 fn a_live_event_sorts_after_a_seed_in_the_same_wall_second() {
778 let log = log(8);
779 log.seed(vec![Seed::from_event(1_700_000_000, 0, &update())]);
780 let live = log.publish(&update());
781 let all = log.replay(EventCursor::START, 8);
782 assert_eq!(all.len(), 2);
783 assert!(
784 all[0].created < live,
785 "seeding bumps cursor floor so live event never sorts below same-second seed"
786 );
787 assert_eq!(
788 all[1].created, live,
789 "post-seed live event sorts after seed even when clock reads earlier micro"
790 );
791 }
792
793 #[test]
794 fn a_subscriber_observes_the_head_advance() {
795 let log = log(8);
796 let mut head = log.subscribe();
797 assert_eq!(*head.borrow_and_update(), EventCursor::START);
798 let created = log.publish(&update());
799 assert!(head.has_changed().unwrap());
800 assert_eq!(*head.borrow_and_update(), created);
801 }
802
803 #[test]
804 fn the_wire_event_matches_the_eventstream_shape() {
805 let log = log(8);
806 log.publish(&update());
807 let event = log.replay(EventCursor::START, 1).remove(0);
808 let wire = serde_json::to_value(&event).unwrap();
809 assert_eq!(wire["nsid"], "sh.tangled.git.refUpdate");
810 assert_eq!(wire["created"], serde_json::json!(event.created.get()));
811 assert_eq!(event.created.get() % 1_000, 0);
812 assert_eq!(wire["rkey"].as_str().unwrap().len(), 13);
813 let payload = &wire["event"];
814 assert_eq!(payload["$type"], "sh.tangled.git.refUpdate");
815 assert_eq!(payload["committerDid"], "did:plc:nel");
816 assert_eq!(payload["ownerDid"], "did:web:olaren.dev");
817 assert_eq!(payload["repo"], "did:plc:limpet");
818 assert_eq!(payload["meta"], serde_json::Value::Null);
819 assert_eq!(payload["newSha"], "");
820 assert_eq!(payload["oldSha"], "");
821 assert_eq!(payload["ref"], "");
822 }
823
824 fn peer(last: u8) -> IpAddr {
825 IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, last))
826 }
827
828 #[test]
829 fn the_global_cap_bounds_total_subscribers_across_peers() {
830 let gate = Arc::new(SubscriberGate::new(2, 8));
831 let first = gate
832 .try_admit(peer(1))
833 .expect("first subscriber is admitted");
834 let _second = gate
835 .try_admit(peer(2))
836 .expect("second subscriber is admitted");
837 assert!(
838 gate.try_admit(peer(3)).is_none(),
839 "third subscriber is refused once global cap is reached"
840 );
841 drop(first);
842 assert!(
843 gate.try_admit(peer(3)).is_some(),
844 "freeing global slot admits waiting subscriber"
845 );
846 }
847
848 #[test]
849 fn the_per_peer_cap_refuses_a_single_flooding_peer() {
850 let gate = Arc::new(SubscriberGate::new(16, 2));
851 let _first = gate.try_admit(peer(1)).expect("first socket is admitted");
852 let second = gate.try_admit(peer(1)).expect("second socket is admitted");
853 assert!(
854 gate.try_admit(peer(1)).is_none(),
855 "third socket from same peer is refused at per-peer cap"
856 );
857 assert!(
858 gate.try_admit(peer(2)).is_some(),
859 "different peer keeps its own budget"
860 );
861 drop(second);
862 assert!(
863 gate.try_admit(peer(1)).is_some(),
864 "freed per-peer slot is reusable"
865 );
866 }
867
868 #[test]
869 fn a_drained_peer_leaves_no_entry_behind() {
870 let gate = Arc::new(SubscriberGate::new(16, 2));
871 let permit = gate.try_admit(peer(1)).expect("admitted");
872 drop(permit);
873 assert!(
874 gate.peers
875 .lock()
876 .unwrap_or_else(|poisoned| poisoned.into_inner())
877 .is_empty(),
878 "peer map prunes peer once its last socket closes"
879 );
880 }
881
882 #[test]
883 fn a_ref_update_carries_its_computed_meta_on_the_wire() {
884 let log = log(8);
885 log.publish(&update().with_meta(RefUpdateMeta::new(
886 true,
887 vec![("nel@oyster.cafe".to_string(), 3)],
888 vec![("Rust".to_string(), 1234)],
889 )));
890 let event = log.replay(EventCursor::START, 1).remove(0);
891 let meta = &serde_json::to_value(&event).unwrap()["event"]["meta"];
892 assert_eq!(meta["isDefaultRef"], true);
893 assert_eq!(
894 meta["commitCount"]["byEmail"][0]["email"],
895 "nel@oyster.cafe"
896 );
897 assert_eq!(meta["commitCount"]["byEmail"][0]["count"], 3);
898 assert_eq!(meta["langBreakdown"]["inputs"][0]["lang"], "Rust");
899 assert_eq!(meta["langBreakdown"]["inputs"][0]["size"], 1234);
900 }
901
902 #[test]
903 fn an_empty_breakdown_omits_the_optional_meta_arrays() {
904 let log = log(8);
905 log.publish(&update().with_meta(RefUpdateMeta::new(false, Vec::new(), Vec::new())));
906 let event = log.replay(EventCursor::START, 1).remove(0);
907 let meta = &serde_json::to_value(&event).unwrap()["event"]["meta"];
908 assert_eq!(meta["isDefaultRef"], false);
909 assert!(meta["commitCount"].get("byEmail").is_none());
910 assert!(meta.get("langBreakdown").is_none());
911 }
912
913 #[test]
914 fn a_pipeline_event_matches_the_lexicon_shape() {
915 let log = log(8);
916 let trigger = TriggerMetadata::push(
917 PushTriggerData::new(
918 RefName::new("refs/heads/main").unwrap(),
919 Oid::null(),
920 Oid::from_hex("3333333333333333333333333333333333333333").unwrap(),
921 ),
922 TriggerRepo::new(
923 "knot.test".to_string(),
924 OwnerDid::new("did:web:olaren.dev").unwrap(),
925 Some(RepoDid::new("did:plc:limpet").unwrap()),
926 Some("anemone".to_string()),
927 "main".to_string(),
928 ),
929 );
930 let workflows = vec![WorkflowSpec::new(
931 "ci.yml".to_string(),
932 "nixery.dev/x".to_string(),
933 CloneSpec::new(false, 1, true),
934 "engine: nixery.dev/x\n".to_string(),
935 )];
936 log.publish(&Pipeline::new(trigger, workflows));
937 let event = log.replay(EventCursor::START, 1).remove(0);
938 let wire = serde_json::to_value(&event).unwrap();
939 assert_eq!(wire["nsid"], "sh.tangled.pipeline");
940 let payload = &wire["event"];
941 assert_eq!(payload["$type"], "sh.tangled.pipeline");
942 let meta = &payload["triggerMetadata"];
943 assert_eq!(meta["kind"], "push");
944 assert_eq!(meta["push"]["ref"], "refs/heads/main");
945 assert_eq!(meta["push"]["oldSha"], "0".repeat(40));
946 assert_eq!(meta["push"]["newSha"], "3".repeat(40));
947 assert_eq!(meta["repo"]["knot"], "knot.test");
948 assert_eq!(meta["repo"]["did"], "did:web:olaren.dev");
949 assert_eq!(meta["repo"]["repoDid"], "did:plc:limpet");
950 assert_eq!(meta["repo"]["repo"], "anemone");
951 assert_eq!(meta["repo"]["defaultBranch"], "main");
952 let workflow = &payload["workflows"][0];
953 assert_eq!(workflow["name"], "ci.yml");
954 assert_eq!(workflow["engine"], "nixery.dev/x");
955 assert_eq!(workflow["clone"]["depth"], 1);
956 assert_eq!(workflow["clone"]["submodules"], true);
957 assert_eq!(workflow["clone"]["skip"], false);
958 }
959
960 #[test]
961 fn a_member_update_matches_the_eventstream_shape() {
962 let log = log(8);
963 log.publish(&KnotMemberUpdate::added(
964 AccountDid::new("did:plc:nel").unwrap(),
965 ));
966 log.publish(&KnotMemberUpdate::removed(
967 AccountDid::new("did:plc:olaren").unwrap(),
968 ));
969 let events = log.replay(EventCursor::START, 8);
970 let added = serde_json::to_value(&events[0]).unwrap();
971 assert_eq!(added["nsid"], "sh.tangled.knot.memberUpdate");
972 assert_eq!(added["event"]["op"], "add");
973 assert_eq!(added["event"]["subject"], "did:plc:nel");
974 assert!(added["event"].get("$type").is_none());
975 let removed = serde_json::to_value(&events[1]).unwrap();
976 assert_eq!(removed["event"]["op"], "remove");
977 assert_eq!(removed["event"]["subject"], "did:plc:olaren");
978 }
979
980 #[test]
981 fn a_collaborator_update_matches_the_eventstream_shape() {
982 let log = log(8);
983 log.publish(&RepoCollaboratorUpdate::added(
984 AccountDid::new("did:plc:nel").unwrap(),
985 RepoDid::new("did:plc:limpet").unwrap(),
986 ));
987 log.publish(&RepoCollaboratorUpdate::removed(
988 AccountDid::new("did:plc:nel").unwrap(),
989 RepoDid::new("did:plc:limpet").unwrap(),
990 ));
991 let events = log.replay(EventCursor::START, 8);
992 let added = serde_json::to_value(&events[0]).unwrap();
993 assert_eq!(added["nsid"], "sh.tangled.repo.collaboratorUpdate");
994 assert_eq!(added["event"]["op"], "add");
995 assert_eq!(added["event"]["subject"], "did:plc:nel");
996 assert_eq!(added["event"]["repo"], "did:plc:limpet");
997 assert!(added["event"].get("$type").is_none());
998 let removed = serde_json::to_value(&events[1]).unwrap();
999 assert_eq!(removed["event"]["op"], "remove");
1000 assert_eq!(removed["event"]["repo"], "did:plc:limpet");
1001 }
1002
1003 fn cursors(log: &EventLog<ManualClock>) -> Vec<EventCursor> {
1004 log.replay(EventCursor::START, 64)
1005 .iter()
1006 .map(|event| event.created)
1007 .collect()
1008 }
1009
1010 #[test]
1011 fn a_reservation_holds_back_later_events_until_it_is_fulfilled() {
1012 let log = log(8);
1013 let early = log.publish(&update());
1014 let reservation = log.reserve();
1015 let later = log.publish(&update());
1016 assert!(reservation.cursor() > early && reservation.cursor() < later);
1017 assert_eq!(
1018 cursors(&log),
1019 vec![early],
1020 "event published after reservation waits behind it"
1021 );
1022 let mid = reservation.cursor();
1023 reservation.fulfill(&update());
1024 assert_eq!(
1025 cursors(&log),
1026 vec![early, mid, later],
1027 "fulfilling reservation releases it and event queued behind it, in cursor order"
1028 );
1029 }
1030
1031 #[test]
1032 fn out_of_order_fulfillment_still_replays_in_cursor_order() {
1033 let log = log(8);
1034 let first = log.reserve();
1035 let second = log.reserve();
1036 let (c1, c2) = (first.cursor(), second.cursor());
1037 assert!(c1 < c2);
1038 second.fulfill(&update());
1039 assert!(
1040 cursors(&log).is_empty(),
1041 "later reservation stays hidden while earlier one is outstanding"
1042 );
1043 first.fulfill(&update());
1044 assert_eq!(
1045 cursors(&log),
1046 vec![c1, c2],
1047 "both surface in cursor order regardless of fulfillment order"
1048 );
1049 }
1050
1051 #[test]
1052 fn a_dropped_reservation_unblocks_the_horizon_without_an_event() {
1053 let log = log(8);
1054 let reservation = log.reserve();
1055 let later = log.publish(&update());
1056 assert!(
1057 cursors(&log).is_empty(),
1058 "later event waits behind unfulfilled reservation"
1059 );
1060 drop(reservation);
1061 assert_eq!(
1062 cursors(&log),
1063 vec![later],
1064 "dropping reservation surfaces queued event and leaves no gap"
1065 );
1066 }
1067
1068 #[test]
1069 fn the_head_holds_at_the_last_stable_event_until_a_reservation_is_fulfilled() {
1070 let log = log(8);
1071 let mut head = log.subscribe();
1072 let early = log.publish(&update());
1073 assert_eq!(*head.borrow_and_update(), early);
1074 let reservation = log.reserve();
1075 let later = log.publish(&update());
1076 assert_eq!(
1077 *head.borrow_and_update(),
1078 early,
1079 "head holds while lower-cursor reservation is pending"
1080 );
1081 reservation.fulfill(&update());
1082 assert_eq!(
1083 *head.borrow_and_update(),
1084 later,
1085 "fulfilling reservation advances head past released events"
1086 );
1087 }
1088
1089 #[test]
1090 fn an_anonymous_owner_is_omitted_from_the_wire() {
1091 let log = log(8);
1092 log.publish(&GitRefUpdate::new(
1093 RepoDid::new("did:plc:limpet").unwrap(),
1094 None,
1095 AccountDid::new("did:plc:nel").unwrap(),
1096 ));
1097 let event = log.replay(EventCursor::START, 1).remove(0);
1098 let wire = serde_json::to_value(&event).unwrap();
1099 assert!(wire["event"].get("ownerDid").is_none());
1100 }
1101}