Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1use std::sync::atomic::{AtomicU64, Ordering}; 2 3use bobbin_runtime::RuntimeHasher; 4use bobbin_types::ids::RepoIdent; 5use jacquard_common::DefaultStr; 6use jacquard_common::types::did::Did; 7use jacquard_common::types::recordkey::Rkey; 8use scc::HashMap as SccMap; 9 10pub struct WarmingShadowBuffer { 11 pending: SccMap<RepoIdent, u64, RuntimeHasher>, 12 enqueued_total: AtomicU64, 13 drained_via_observe_total: AtomicU64, 14 max_concurrent: AtomicU64, 15 current_concurrent: AtomicU64, 16 distinct_keys_seen: AtomicU64, 17} 18 19#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] 20pub struct WarmingShadowSnapshot { 21 pub enqueued_total: u64, 22 pub drained_via_observe_total: u64, 23 pub max_concurrent: u64, 24 pub residual: u64, 25 pub distinct_keys_seen: u64, 26} 27 28impl WarmingShadowBuffer { 29 pub fn new(hasher: RuntimeHasher) -> Self { 30 Self { 31 pending: SccMap::with_hasher(hasher), 32 enqueued_total: AtomicU64::new(0), 33 drained_via_observe_total: AtomicU64::new(0), 34 max_concurrent: AtomicU64::new(0), 35 current_concurrent: AtomicU64::new(0), 36 distinct_keys_seen: AtomicU64::new(0), 37 } 38 } 39 40 pub async fn note_unresolved(&self, owner: Did<DefaultStr>, rkey: Rkey<DefaultStr>) { 41 let key = RepoIdent::new(owner, rkey); 42 let mut entry = self.pending.entry_async(key).await.or_insert_with(|| { 43 self.distinct_keys_seen.fetch_add(1, Ordering::Relaxed); 44 0 45 }); 46 *entry.get_mut() += 1; 47 self.enqueued_total.fetch_add(1, Ordering::Relaxed); 48 let cur = self.current_concurrent.fetch_add(1, Ordering::Relaxed) + 1; 49 self.max_concurrent.fetch_max(cur, Ordering::Relaxed); 50 } 51 52 pub async fn note_observed(&self, owner: &Did<DefaultStr>, rkey: &Rkey<DefaultStr>) { 53 let key = RepoIdent::new(owner.clone(), rkey.clone()); 54 let Some(mut entry) = self.pending.get_async(&key).await else { 55 return; 56 }; 57 let count = std::mem::replace(entry.get_mut(), 0); 58 if count > 0 { 59 self.drained_via_observe_total 60 .fetch_add(count, Ordering::Relaxed); 61 self.current_concurrent.fetch_sub(count, Ordering::Relaxed); 62 } 63 } 64 65 pub fn snapshot(&self) -> WarmingShadowSnapshot { 66 WarmingShadowSnapshot { 67 enqueued_total: self.enqueued_total.load(Ordering::Relaxed), 68 drained_via_observe_total: self.drained_via_observe_total.load(Ordering::Relaxed), 69 max_concurrent: self.max_concurrent.load(Ordering::Relaxed), 70 residual: self.current_concurrent.load(Ordering::Relaxed), 71 distinct_keys_seen: self.distinct_keys_seen.load(Ordering::Relaxed), 72 } 73 } 74} 75 76#[cfg(test)] 77mod tests { 78 use super::*; 79 use jacquard_common::types::did::Did; 80 use jacquard_common::types::recordkey::Rkey; 81 82 fn d(s: &str) -> Did<DefaultStr> { 83 Did::new_owned(s).unwrap() 84 } 85 86 fn r(s: &str) -> Rkey<DefaultStr> { 87 Rkey::new_owned(s).unwrap() 88 } 89 90 #[tokio::test] 91 async fn enqueue_then_observe_drains_to_zero() { 92 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default()); 93 shadow 94 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz")) 95 .await; 96 shadow 97 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz")) 98 .await; 99 shadow 100 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz")) 101 .await; 102 let s = shadow.snapshot(); 103 assert_eq!(s.enqueued_total, 2); 104 assert_eq!(s.drained_via_observe_total, 2); 105 assert_eq!(s.max_concurrent, 2); 106 assert_eq!(s.residual, 0); 107 assert_eq!(s.distinct_keys_seen, 1); 108 } 109 110 #[tokio::test] 111 async fn distinct_keys_tracked_separately() { 112 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default()); 113 shadow 114 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz")) 115 .await; 116 shadow 117 .note_unresolved(d("did:plc:olaren"), r("abcabcabcabd1")) 118 .await; 119 let s = shadow.snapshot(); 120 assert_eq!(s.enqueued_total, 2); 121 assert_eq!(s.distinct_keys_seen, 2); 122 assert_eq!(s.max_concurrent, 2); 123 assert_eq!(s.residual, 2); 124 } 125 126 #[tokio::test] 127 async fn observe_without_prior_enqueue_is_a_noop() { 128 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default()); 129 shadow 130 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz")) 131 .await; 132 let s = shadow.snapshot(); 133 assert_eq!(s.enqueued_total, 0); 134 assert_eq!(s.drained_via_observe_total, 0); 135 assert_eq!(s.distinct_keys_seen, 0); 136 } 137 138 #[tokio::test] 139 async fn second_observe_after_drain_does_not_double_count() { 140 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default()); 141 shadow 142 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz")) 143 .await; 144 shadow 145 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz")) 146 .await; 147 shadow 148 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz")) 149 .await; 150 let s = shadow.snapshot(); 151 assert_eq!(s.drained_via_observe_total, 1); 152 assert_eq!(s.residual, 0); 153 } 154 155 #[tokio::test] 156 async fn residual_reflects_unobserved_keys() { 157 let shadow = WarmingShadowBuffer::new(RuntimeHasher::default()); 158 shadow 159 .note_unresolved(d("did:plc:nel"), r("abcabcabcabcz")) 160 .await; 161 shadow 162 .note_unresolved(d("did:plc:olaren"), r("abcabcabcabd1")) 163 .await; 164 shadow 165 .note_observed(&d("did:plc:nel"), &r("abcabcabcabcz")) 166 .await; 167 let s = shadow.snapshot(); 168 assert_eq!(s.enqueued_total, 2); 169 assert_eq!(s.drained_via_observe_total, 1); 170 assert_eq!(s.residual, 1); 171 } 172}