Now let's take a silly one
1

Configure Feed

Select the types of activity you want to include in your feed.

at main 10 kB View raw
1use std::path::Path; 2use std::process::Command; 3use std::sync::Arc; 4 5use futures::stream::StreamExt; 6use knot_atproto::Atproto; 7use knot_cob::{CobHome, CobStore}; 8use knot_cobs::{Registration, RegistryChange}; 9use knot_git::{Layout, Repo}; 10use knot_runtime::{ 11 FakeHttp, HttpResponse, K256Signer, ManualClock, SeededEntropy, Signer, UnixMicros, 12}; 13use knot_types::{KnotId, OwnerDid, RepoDid, RepoName, RepoRkey, UnixSeconds}; 14use tempfile::TempDir; 15use tokio::net::TcpListener; 16use url::Url; 17 18const REPO_DID: &str = "did:plc:squid"; 19const REPO_NAME: &str = "anemone"; 20const OWNER_DID: &str = "did:plc:nel"; 21const PDS_HOST: &str = "pds.oyster.cafe"; 22const KNOT_DID: &str = "did:web:nel.pet"; 23const PINNED_DATE: &str = "2026-06-20T12:00:00+00:00"; 24 25fn git(cwd: &Path, env: &[(&str, &str)], args: &[&str]) -> (bool, String) { 26 let mut command = Command::new("git"); 27 command 28 .args(args) 29 .current_dir(cwd) 30 .env("GIT_CONFIG_GLOBAL", "/dev/null") 31 .env("GIT_CONFIG_SYSTEM", "/dev/null") 32 .env("GIT_TERMINAL_PROMPT", "0") 33 .env("GIT_AUTHOR_NAME", "nel") 34 .env("GIT_AUTHOR_EMAIL", "nel@oyster.cafe") 35 .env("GIT_COMMITTER_NAME", "nel") 36 .env("GIT_COMMITTER_EMAIL", "nel@oyster.cafe") 37 .env("GIT_AUTHOR_DATE", PINNED_DATE) 38 .env("GIT_COMMITTER_DATE", PINNED_DATE); 39 env.iter().for_each(|(key, value)| { 40 command.env(key, value); 41 }); 42 let out = command.output().expect("git runs"); 43 ( 44 out.status.success(), 45 format!( 46 "{}{}", 47 String::from_utf8_lossy(&out.stdout), 48 String::from_utf8_lossy(&out.stderr) 49 ), 50 ) 51} 52 53fn keygen(dir: &Path) -> (String, String) { 54 let path = dir.join("client"); 55 let out = Command::new("ssh-keygen") 56 .args([ 57 "-t", 58 "ed25519", 59 "-N", 60 "", 61 "-C", 62 "nel@oyster.cafe", 63 "-f", 64 path.to_str().unwrap(), 65 ]) 66 .output() 67 .expect("ssh-keygen runs"); 68 assert!(out.status.success()); 69 let public_line = std::fs::read_to_string(dir.join("client.pub")) 70 .unwrap() 71 .trim() 72 .to_string(); 73 (path.to_str().unwrap().to_string(), public_line) 74} 75 76fn did_document(signer: &K256Signer, did: &str) -> Vec<u8> { 77 let multikey = knot_types::crypto::multikey(0xe7, signer.public_key().as_bytes()); 78 serde_json::to_vec(&serde_json::json!({ 79 "id": did, 80 "alsoKnownAs": ["at://nel.pet"], 81 "verificationMethod": [{ 82 "id": format!("{did}#atproto"), 83 "type": "Multikey", 84 "controller": did, 85 "publicKeyMultibase": multikey 86 }], 87 "service": [{ 88 "id": "#atproto_pds", 89 "type": "AtprotoPersonalDataServer", 90 "serviceEndpoint": format!("https://{PDS_HOST}") 91 }] 92 })) 93 .unwrap() 94} 95 96fn list_records_body(public_line: &str) -> Vec<u8> { 97 serde_json::to_vec(&serde_json::json!({ 98 "records": [{ 99 "uri": format!("at://{OWNER_DID}/sh.tangled.publicKey/1"), 100 "value": { 101 "$type": "sh.tangled.publicKey", 102 "key": public_line, 103 "name": "laptop", 104 "createdAt": "2026-06-08T00:00:00Z" 105 } 106 }] 107 })) 108 .unwrap() 109} 110 111fn fake_http(published_line: String) -> impl knot_runtime::HttpTransport { 112 let signer = K256Signer::generate(&SeededEntropy::new(1)); 113 FakeHttp::new(move |request| { 114 let host = request.url.host_str().unwrap_or_default().to_string(); 115 let path = request.url.path().to_string(); 116 let body = if host == PDS_HOST { 117 list_records_body(&published_line) 118 } else if path.ends_with(OWNER_DID) { 119 did_document(&signer, OWNER_DID) 120 } else if path.ends_with(REPO_DID) { 121 did_document(&signer, REPO_DID) 122 } else { 123 return Ok(HttpResponse { 124 status: http::StatusCode::NOT_FOUND, 125 headers: http::HeaderMap::new(), 126 body: bytes::Bytes::new(), 127 }); 128 }; 129 Ok(HttpResponse { 130 status: http::StatusCode::OK, 131 headers: http::HeaderMap::new(), 132 body: bytes::Bytes::from(body), 133 }) 134 }) 135} 136 137fn actor_for_seed(seed: u64) -> knot_types::ActorId { 138 knot_types::ActorId::from_secp256k1( 139 K256Signer::generate(&SeededEntropy::new(seed)) 140 .public_key() 141 .as_bytes(), 142 ) 143} 144 145struct Server { 146 _scan: TempDir, 147 layout: Layout, 148 repo_did: RepoDid, 149 port: u16, 150 events: Arc<knot_events::EventLog<ManualClock>>, 151} 152 153async fn spawn(published_line: String) -> Server { 154 let scan = tempfile::tempdir().unwrap(); 155 let meta_path = scan.path().join("meta"); 156 Repo::create(&meta_path).unwrap(); 157 let layout = Layout::new(scan.path().join("repos")); 158 let repo_did = RepoDid::new(REPO_DID).unwrap(); 159 layout.create(&repo_did).unwrap(); 160 161 let signer = K256Signer::generate(&SeededEntropy::new(2)); 162 let meta = Repo::open(&meta_path).unwrap(); 163 CobStore::new(&meta) 164 .create( 165 &CobHome::from(&KnotId::new(KNOT_DID).unwrap()), 166 &RegistryChange::Register(Registration { 167 owner: OwnerDid::new(OWNER_DID).unwrap(), 168 rkey: RepoRkey::new(REPO_NAME).unwrap(), 169 name: RepoName::new(REPO_NAME).unwrap(), 170 repo: repo_did.clone(), 171 created_at: UnixSeconds::new(1), 172 }), 173 &signer, 174 UnixSeconds::new(1), 175 ) 176 .unwrap(); 177 178 let index = Arc::new(knot_index::Index::new(meta_path, layout.clone())); 179 index.rebuild().unwrap(); 180 181 let atproto = Arc::new(Atproto::new( 182 fake_http(published_line), 183 ManualClock::new(UnixMicros::new(1_000_000_000)), 184 KnotId::new(KNOT_DID).unwrap(), 185 Url::parse("https://plc.directory/").unwrap(), 186 )); 187 let key_dir = scan.path().join("hostkey"); 188 std::fs::create_dir_all(&key_dir).unwrap(); 189 let host_key = knot_ssh::load_or_create_host_key(&key_dir.join("host")).unwrap(); 190 let events = Arc::new(knot_events::EventLog::new( 191 ManualClock::new(UnixMicros::new(1_000_000_000)), 192 64, 193 )); 194 let state = Arc::new(knot_ssh::SshState::new( 195 layout.clone(), 196 index, 197 atproto, 198 actor_for_seed(1), 199 Arc::clone(&events), 200 "knot.test".to_string(), 201 "https://tangled.test".to_string(), 202 std::collections::BTreeSet::new(), 203 knot_types::AdmissionPolicy::Closed, 204 1 << 30, 205 std::time::Duration::from_secs(2), 206 )); 207 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 208 let port = listener.local_addr().unwrap().port(); 209 tokio::spawn(async move { 210 let _ = knot_ssh::serve_on_socket(listener, host_key, state).await; 211 }); 212 Server { 213 _scan: scan, 214 layout, 215 repo_did, 216 port, 217 events, 218 } 219} 220 221fn ssh_command(key_path: &str) -> String { 222 format!( 223 "ssh -i {key_path} -o IdentitiesOnly=yes -o StrictHostKeyChecking=no \ 224 -o UserKnownHostsFile=/dev/null -o PreferredAuthentications=publickey -o BatchMode=yes" 225 ) 226} 227 228fn seed_work(work: &Path) -> String { 229 std::fs::create_dir_all(work).unwrap(); 230 git(work, &[], &["init", "-q", "-b", "main"]); 231 std::fs::write(work.join("README.md"), "hello over the simulated ssh\n").unwrap(); 232 git(work, &[], &["add", "-A"]); 233 git(work, &[], &["commit", "-q", "-m", "initial"]); 234 let (ok, head) = git(work, &[], &["rev-parse", "HEAD"]); 235 assert!(ok); 236 head.trim().to_string() 237} 238 239async fn push_once(scratch: &Path) -> (String, Option<knot_types::Oid>, serde_json::Value) { 240 let (key_path, public_line) = keygen(scratch); 241 let server = spawn(public_line).await; 242 let url = format!( 243 "ssh://git@127.0.0.1:{}/{OWNER_DID}/{REPO_NAME}", 244 server.port 245 ); 246 let ssh = ssh_command(&key_path); 247 let work = scratch.join("work"); 248 let head = seed_work(&work); 249 250 let (ok, out) = tokio::task::spawn_blocking(move || { 251 git( 252 &work, 253 &[("GIT_SSH_COMMAND", &ssh)], 254 &["push", "-q", &url, "main"], 255 ) 256 }) 257 .await 258 .unwrap(); 259 assert!(ok, "simulated ssh server must accept push:\n{out}"); 260 261 let stored = server 262 .layout 263 .open(&server.repo_did) 264 .unwrap() 265 .find_ref(&knot_types::RefName::new("refs/heads/main").unwrap()) 266 .unwrap(); 267 let event = poll_for_event(&server.events).await; 268 drop(server); 269 (head, stored, event) 270} 271 272async fn poll_for_event(events: &knot_events::EventLog<ManualClock>) -> serde_json::Value { 273 futures::stream::iter(0..100) 274 .then(|_| async { 275 let hit = events 276 .replay(knot_events::EventCursor::START, 32) 277 .into_iter() 278 .find(|event| event.nsid == "sh.tangled.git.refUpdate") 279 .map(|event| serde_json::to_value(&event).unwrap()["event"].clone()); 280 if hit.is_none() { 281 tokio::time::sleep(std::time::Duration::from_millis(20)).await; 282 } 283 hit 284 }) 285 .filter_map(|hit| async move { hit }) 286 .boxed() 287 .next() 288 .await 289 .expect("refUpdate event must be published within polling window") 290} 291 292#[tokio::test(flavor = "multi_thread", worker_threads = 4)] 293async fn the_simulated_ssh_write_path_lands_a_seed_deterministic_tip() { 294 let first_dir = tempfile::tempdir().unwrap(); 295 let (first_head, first_stored, first_event) = push_once(first_dir.path()).await; 296 297 let second_dir = tempfile::tempdir().unwrap(); 298 let (second_head, second_stored, second_event) = push_once(second_dir.path()).await; 299 300 let tip = knot_types::Oid::from_hex(&first_head).unwrap(); 301 assert_eq!( 302 first_stored, 303 Some(tip), 304 "pushed commit must be the repository's main tip" 305 ); 306 assert_eq!( 307 first_head, second_head, 308 "two independent runs of the simulated ssh push must produce same commit oid" 309 ); 310 assert_eq!( 311 first_stored, second_stored, 312 "assembled-against-doubles ssh write path is logically reproducible" 313 ); 314 315 assert_eq!(first_event["ref"], "refs/heads/main"); 316 assert_eq!(first_event["newSha"], first_head); 317 assert_eq!( 318 first_event["newSha"], second_event["newSha"], 319 "ref-update event the push emits is seed-stable too" 320 ); 321}