Now let's take a silly one
1use knot_events::{RECONSTRUCT_WINDOW_SECS, Seed};
2use knot_git::{Layout, ReflogUpdate};
3use knot_types::{OwnerDid, RepoDid, RepoRkey};
4
5pub struct RepoContext {
6 pub repo: RepoDid,
7 pub owner: OwnerDid,
8 pub rkey: RepoRkey,
9}
10
11struct Candidate {
12 repo_index: usize,
13 update: ReflogUpdate,
14}
15
16pub fn pipeline_seeds(
17 layout: &Layout,
18 knot: &str,
19 repos: &[RepoContext],
20 now_seconds: i64,
21 capacity: usize,
22) -> Vec<Seed> {
23 let since = now_seconds.saturating_sub(RECONSTRUCT_WINDOW_SECS);
24 let mut candidates: Vec<Candidate> = repos
25 .iter()
26 .enumerate()
27 .flat_map(|(repo_index, context)| match layout.open(&context.repo) {
28 Ok(opened) => opened
29 .reflog_updates_since(since)
30 .into_iter()
31 .map(|update| Candidate { repo_index, update })
32 .collect::<Vec<_>>(),
33 Err(_) => Vec::new(),
34 })
35 .collect();
36 candidates.sort_by(|left, right| {
37 let left_repo = repos[left.repo_index].repo.as_str();
38 let right_repo = repos[right.repo_index].repo.as_str();
39 left.update
40 .seconds
41 .cmp(&right.update.seconds)
42 .then_with(|| left_repo.cmp(right_repo))
43 .then_with(|| left.update.name.as_str().cmp(right.update.name.as_str()))
44 .then_with(|| left.update.new.to_hex().cmp(&right.update.new.to_hex()))
45 });
46 let start = candidates.len().saturating_sub(capacity.max(1));
47 candidates
48 .split_off(start)
49 .into_iter()
50 .enumerate()
51 .filter_map(|(order, candidate)| {
52 let context = &repos[candidate.repo_index];
53 let opened = layout.open(&context.repo).ok()?;
54 let pipeline = knot_postreceive::build_pipeline(
55 &opened,
56 &candidate.update.name,
57 candidate.update.old,
58 candidate.update.new,
59 &context.repo,
60 &context.owner,
61 Some(context.rkey.as_str()),
62 knot,
63 )?;
64 Some(Seed::from_event(
65 candidate.update.seconds,
66 order as u64,
67 &pipeline,
68 ))
69 })
70 .collect()
71}
72
73#[cfg(test)]
74mod tests {
75 use knot_events::{EventCursor, EventLog};
76 use knot_git::{EntryKind, Identity, Layout, NewCommit, RefUpdate, StagedAction, StagedChange};
77 use knot_runtime::{Clock, SystemClock};
78 use knot_types::{Oid, OwnerDid, RefName, RepoDid, RepoRkey, UnixSeconds};
79
80 use super::{RepoContext, pipeline_seeds};
81
82 const WORKFLOW: &str = "engine: nixery.dev/x\nwhen:\n - event: push\n branch: [main]\n";
83 const EMPTY_TREE: &str = "4b825dc642cb6eb9a060e54bf8d69288fbee4904";
84
85 fn identity() -> Identity {
86 Identity {
87 name: "nel".to_string(),
88 email: "nel@oyster.cafe".to_string(),
89 time: UnixSeconds::new(1_700_000_000),
90 offset_seconds: 0,
91 }
92 }
93
94 #[test]
95 fn a_missed_pipeline_is_reconstructed_from_the_reflog_after_restart() {
96 let scan = tempfile::tempdir().unwrap();
97 let layout = Layout::new(scan.path())
98 .with_default_branch("main")
99 .unwrap();
100 let did = RepoDid::new("did:plc:squid").unwrap();
101 let bare = layout.create(&did).unwrap();
102
103 let tree = bare
104 .write_staged_tree(
105 Oid::from_hex(EMPTY_TREE).unwrap(),
106 &[StagedChange {
107 path: ".tangled/workflows/ci.yml".to_string(),
108 action: StagedAction::Put {
109 content: WORKFLOW.as_bytes().to_vec(),
110 kind: EntryKind::Blob,
111 },
112 }],
113 )
114 .unwrap();
115 let tip = bare
116 .write_commit(&NewCommit {
117 tree,
118 parents: Vec::new(),
119 author: identity(),
120 committer: identity(),
121 message: "add ci".to_string(),
122 extra_headers: Vec::new(),
123 })
124 .unwrap();
125 bare.update_ref(&RefUpdate::Create {
126 name: RefName::new("refs/heads/main").unwrap(),
127 new: tip,
128 })
129 .unwrap();
130
131 let context = RepoContext {
132 repo: did.clone(),
133 owner: OwnerDid::new("did:web:olaren.dev").unwrap(),
134 rkey: RepoRkey::new("anemone").unwrap(),
135 };
136 let now_seconds = (SystemClock.now_unix_micros().get() / 1_000_000) as i64;
137 let seeds = pipeline_seeds(&layout, "knot.test", &[context], now_seconds, 32);
138 assert_eq!(
139 seeds.len(),
140 1,
141 "push that compiled workflow yields exactly one reconstructed pipeline"
142 );
143
144 let log = EventLog::new(SystemClock, 32);
145 log.seed(seeds);
146 let replayed = log.replay(EventCursor::START, 32);
147 assert_eq!(replayed.len(), 1);
148 let event = &replayed[0];
149 assert_eq!(event.nsid, "sh.tangled.pipeline");
150 let payload = &event.payload;
151 assert_eq!(payload["$type"], "sh.tangled.pipeline");
152 assert_eq!(payload["workflows"][0]["name"], "ci.yml");
153 assert_eq!(payload["triggerMetadata"]["kind"], "push");
154 assert_eq!(payload["triggerMetadata"]["repo"]["knot"], "knot.test");
155 assert_eq!(payload["triggerMetadata"]["repo"]["repoDid"], did.as_str());
156 assert_eq!(payload["triggerMetadata"]["push"]["ref"], "refs/heads/main");
157 assert_eq!(
158 payload["triggerMetadata"]["push"]["newSha"],
159 tip.to_hex().to_string()
160 );
161 }
162
163 #[test]
164 fn pipeline_seeds_are_bounded_by_the_ring_capacity() {
165 let scan = tempfile::tempdir().unwrap();
166 let layout = Layout::new(scan.path())
167 .with_default_branch("main")
168 .unwrap();
169 let did = RepoDid::new("did:plc:limpet").unwrap();
170 let bare = layout.create(&did).unwrap();
171 let main = RefName::new("refs/heads/main").unwrap();
172
173 let mut prev: Option<Oid> = None;
174 (0u8..4).for_each(|index| {
175 let tree = bare
176 .write_staged_tree(
177 Oid::from_hex(EMPTY_TREE).unwrap(),
178 &[
179 StagedChange {
180 path: ".tangled/workflows/ci.yml".to_string(),
181 action: StagedAction::Put {
182 content: WORKFLOW.as_bytes().to_vec(),
183 kind: EntryKind::Blob,
184 },
185 },
186 StagedChange {
187 path: format!("file{index}.txt"),
188 action: StagedAction::Put {
189 content: vec![index],
190 kind: EntryKind::Blob,
191 },
192 },
193 ],
194 )
195 .unwrap();
196 let tip = bare
197 .write_commit(&NewCommit {
198 tree,
199 parents: prev.iter().copied().collect(),
200 author: identity(),
201 committer: identity(),
202 message: format!("commit {index}"),
203 extra_headers: Vec::new(),
204 })
205 .unwrap();
206 let update = match prev {
207 None => RefUpdate::Create {
208 name: main.clone(),
209 new: tip,
210 },
211 Some(old) => RefUpdate::Update {
212 name: main.clone(),
213 old,
214 new: tip,
215 },
216 };
217 bare.update_ref(&update).unwrap();
218 prev = Some(tip);
219 });
220
221 let context = RepoContext {
222 repo: did.clone(),
223 owner: OwnerDid::new("did:web:olaren.dev").unwrap(),
224 rkey: RepoRkey::new("anemone").unwrap(),
225 };
226 let now_seconds = (SystemClock.now_unix_micros().get() / 1_000_000) as i64;
227 let seeds = pipeline_seeds(&layout, "knot.test", &[context], now_seconds, 2);
228 assert_eq!(
229 seeds.len(),
230 2,
231 "four reflogged workflow pushes are truncated to ring capacity"
232 );
233 }
234}