Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 8.4 kB View raw
1use knot_events::{RECONSTRUCT_WINDOW_SECS, Seed}; 2use knot_git::{Layout, ReflogUpdate}; 3use knot_types::{OwnerDid, RepoDid, RepoRkey}; 4 5pub struct RepoContext { 6 pub repo: RepoDid, 7 pub owner: OwnerDid, 8 pub rkey: RepoRkey, 9} 10 11struct Candidate { 12 repo_index: usize, 13 update: ReflogUpdate, 14} 15 16pub fn pipeline_seeds( 17 layout: &Layout, 18 knot: &str, 19 repos: &[RepoContext], 20 now_seconds: i64, 21 capacity: usize, 22) -> Vec<Seed> { 23 let since = now_seconds.saturating_sub(RECONSTRUCT_WINDOW_SECS); 24 let mut candidates: Vec<Candidate> = repos 25 .iter() 26 .enumerate() 27 .flat_map(|(repo_index, context)| match layout.open(&context.repo) { 28 Ok(opened) => opened 29 .reflog_updates_since(since) 30 .into_iter() 31 .map(|update| Candidate { repo_index, update }) 32 .collect::<Vec<_>>(), 33 Err(_) => Vec::new(), 34 }) 35 .collect(); 36 candidates.sort_by(|left, right| { 37 let left_repo = repos[left.repo_index].repo.as_str(); 38 let right_repo = repos[right.repo_index].repo.as_str(); 39 left.update 40 .seconds 41 .cmp(&right.update.seconds) 42 .then_with(|| left_repo.cmp(right_repo)) 43 .then_with(|| left.update.name.as_str().cmp(right.update.name.as_str())) 44 .then_with(|| left.update.new.to_hex().cmp(&right.update.new.to_hex())) 45 }); 46 let start = candidates.len().saturating_sub(capacity.max(1)); 47 candidates 48 .split_off(start) 49 .into_iter() 50 .enumerate() 51 .filter_map(|(order, candidate)| { 52 let context = &repos[candidate.repo_index]; 53 let opened = layout.open(&context.repo).ok()?; 54 let pipeline = knot_postreceive::build_pipeline( 55 &opened, 56 &candidate.update.name, 57 candidate.update.old, 58 candidate.update.new, 59 &context.repo, 60 &context.owner, 61 Some(context.rkey.as_str()), 62 knot, 63 )?; 64 Some(Seed::from_event( 65 candidate.update.seconds, 66 order as u64, 67 &pipeline, 68 )) 69 }) 70 .collect() 71} 72 73#[cfg(test)] 74mod tests { 75 use knot_events::{EventCursor, EventLog}; 76 use knot_git::{EntryKind, Identity, Layout, NewCommit, RefUpdate, StagedAction, StagedChange}; 77 use knot_runtime::{Clock, SystemClock}; 78 use knot_types::{Oid, OwnerDid, RefName, RepoDid, RepoRkey, UnixSeconds}; 79 80 use super::{RepoContext, pipeline_seeds}; 81 82 const WORKFLOW: &str = "engine: nixery.dev/x\nwhen:\n - event: push\n branch: [main]\n"; 83 const EMPTY_TREE: &str = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"; 84 85 fn identity() -> Identity { 86 Identity { 87 name: "nel".to_string(), 88 email: "nel@oyster.cafe".to_string(), 89 time: UnixSeconds::new(1_700_000_000), 90 offset_seconds: 0, 91 } 92 } 93 94 #[test] 95 fn a_missed_pipeline_is_reconstructed_from_the_reflog_after_restart() { 96 let scan = tempfile::tempdir().unwrap(); 97 let layout = Layout::new(scan.path()) 98 .with_default_branch("main") 99 .unwrap(); 100 let did = RepoDid::new("did:plc:squid").unwrap(); 101 let bare = layout.create(&did).unwrap(); 102 103 let tree = bare 104 .write_staged_tree( 105 Oid::from_hex(EMPTY_TREE).unwrap(), 106 &[StagedChange { 107 path: ".tangled/workflows/ci.yml".to_string(), 108 action: StagedAction::Put { 109 content: WORKFLOW.as_bytes().to_vec(), 110 kind: EntryKind::Blob, 111 }, 112 }], 113 ) 114 .unwrap(); 115 let tip = bare 116 .write_commit(&NewCommit { 117 tree, 118 parents: Vec::new(), 119 author: identity(), 120 committer: identity(), 121 message: "add ci".to_string(), 122 extra_headers: Vec::new(), 123 }) 124 .unwrap(); 125 bare.update_ref(&RefUpdate::Create { 126 name: RefName::new("refs/heads/main").unwrap(), 127 new: tip, 128 }) 129 .unwrap(); 130 131 let context = RepoContext { 132 repo: did.clone(), 133 owner: OwnerDid::new("did:web:olaren.dev").unwrap(), 134 rkey: RepoRkey::new("anemone").unwrap(), 135 }; 136 let now_seconds = (SystemClock.now_unix_micros().get() / 1_000_000) as i64; 137 let seeds = pipeline_seeds(&layout, "knot.test", &[context], now_seconds, 32); 138 assert_eq!( 139 seeds.len(), 140 1, 141 "push that compiled workflow yields exactly one reconstructed pipeline" 142 ); 143 144 let log = EventLog::new(SystemClock, 32); 145 log.seed(seeds); 146 let replayed = log.replay(EventCursor::START, 32); 147 assert_eq!(replayed.len(), 1); 148 let event = &replayed[0]; 149 assert_eq!(event.nsid, "sh.tangled.pipeline"); 150 let payload = &event.payload; 151 assert_eq!(payload["$type"], "sh.tangled.pipeline"); 152 assert_eq!(payload["workflows"][0]["name"], "ci.yml"); 153 assert_eq!(payload["triggerMetadata"]["kind"], "push"); 154 assert_eq!(payload["triggerMetadata"]["repo"]["knot"], "knot.test"); 155 assert_eq!(payload["triggerMetadata"]["repo"]["repoDid"], did.as_str()); 156 assert_eq!(payload["triggerMetadata"]["push"]["ref"], "refs/heads/main"); 157 assert_eq!( 158 payload["triggerMetadata"]["push"]["newSha"], 159 tip.to_hex().to_string() 160 ); 161 } 162 163 #[test] 164 fn pipeline_seeds_are_bounded_by_the_ring_capacity() { 165 let scan = tempfile::tempdir().unwrap(); 166 let layout = Layout::new(scan.path()) 167 .with_default_branch("main") 168 .unwrap(); 169 let did = RepoDid::new("did:plc:limpet").unwrap(); 170 let bare = layout.create(&did).unwrap(); 171 let main = RefName::new("refs/heads/main").unwrap(); 172 173 let mut prev: Option<Oid> = None; 174 (0u8..4).for_each(|index| { 175 let tree = bare 176 .write_staged_tree( 177 Oid::from_hex(EMPTY_TREE).unwrap(), 178 &[ 179 StagedChange { 180 path: ".tangled/workflows/ci.yml".to_string(), 181 action: StagedAction::Put { 182 content: WORKFLOW.as_bytes().to_vec(), 183 kind: EntryKind::Blob, 184 }, 185 }, 186 StagedChange { 187 path: format!("file{index}.txt"), 188 action: StagedAction::Put { 189 content: vec![index], 190 kind: EntryKind::Blob, 191 }, 192 }, 193 ], 194 ) 195 .unwrap(); 196 let tip = bare 197 .write_commit(&NewCommit { 198 tree, 199 parents: prev.iter().copied().collect(), 200 author: identity(), 201 committer: identity(), 202 message: format!("commit {index}"), 203 extra_headers: Vec::new(), 204 }) 205 .unwrap(); 206 let update = match prev { 207 None => RefUpdate::Create { 208 name: main.clone(), 209 new: tip, 210 }, 211 Some(old) => RefUpdate::Update { 212 name: main.clone(), 213 old, 214 new: tip, 215 }, 216 }; 217 bare.update_ref(&update).unwrap(); 218 prev = Some(tip); 219 }); 220 221 let context = RepoContext { 222 repo: did.clone(), 223 owner: OwnerDid::new("did:web:olaren.dev").unwrap(), 224 rkey: RepoRkey::new("anemone").unwrap(), 225 }; 226 let now_seconds = (SystemClock.now_unix_micros().get() / 1_000_000) as i64; 227 let seeds = pipeline_seeds(&layout, "knot.test", &[context], now_seconds, 2); 228 assert_eq!( 229 seeds.len(), 230 2, 231 "four reflogged workflow pushes are truncated to ring capacity" 232 ); 233 } 234}