This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 14 kB View raw
1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_xrpc::{AppState, router}; 12use http::{Request, StatusCode}; 13use jacquard_common::DefaultStr; 14use jacquard_common::types::did::Did; 15use jacquard_common::types::handle::Handle; 16use jacquard_common::types::nsid::Nsid; 17use jacquard_common::types::recordkey::Rkey; 18use serde_json::{Value, json}; 19use tower::ServiceExt; 20use url::Url; 21use url::form_urlencoded::byte_serialize; 22use wiremock::matchers::{method, path, query_param}; 23use wiremock::{Mock, MockServer, ResponseTemplate}; 24 25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 26 27fn did(s: &str) -> Did<DefaultStr> { 28 Did::new_owned(s).unwrap() 29} 30 31fn rkey(s: &str) -> Rkey<DefaultStr> { 32 Rkey::new_owned(s).unwrap() 33} 34 35fn nsid(s: &'static str) -> Nsid<DefaultStr> { 36 Nsid::new_static(s).unwrap() 37} 38 39fn handle(s: &str) -> Handle<DefaultStr> { 40 Handle::new_owned(s).unwrap() 41} 42 43struct Harness { 44 server: MockServer, 45 state: AppState, 46} 47 48impl Harness { 49 async fn new() -> Self { 50 let server = MockServer::start().await; 51 let coverage = Arc::new(CoverageWatch::new()); 52 let state = AppState::new( 53 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 54 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 55 Arc::new(EdgeStore::new(RuntimeHasher::default())), 56 Arc::new(StateIndex::new(RuntimeHasher::default())), 57 Arc::new(StateIndex::new(RuntimeHasher::default())), 58 coverage, 59 Arc::new( 60 KnotProxy::new( 61 KnotProxyConfig::default(), 62 KnotHttpConfig::default(), 63 Arc::new(SystemClock::new()), 64 RuntimeHasher::default(), 65 ) 66 .unwrap(), 67 ), 68 Arc::new( 69 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 70 ) as Arc<dyn SearchReader>, 71 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 72 ); 73 Self { server, state } 74 } 75 76 async fn mount( 77 &self, 78 did: &Did<DefaultStr>, 79 collection: &Nsid<DefaultStr>, 80 rkey: &Rkey<DefaultStr>, 81 value: Value, 82 ) { 83 let uri = format!( 84 "at://{}/{}/{}", 85 did.as_ref(), 86 collection.as_ref(), 87 rkey.as_ref() 88 ); 89 Mock::given(method("GET")) 90 .and(path("/xrpc/com.atproto.repo.getRecord")) 91 .and(query_param("repo", did.as_ref())) 92 .and(query_param("collection", collection.as_ref())) 93 .and(query_param("rkey", rkey.as_ref())) 94 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 95 "uri": uri, 96 "cid": CID, 97 "value": value, 98 }))) 99 .mount(&self.server) 100 .await; 101 } 102 103 async fn mount_404( 104 &self, 105 did: &Did<DefaultStr>, 106 collection: &Nsid<DefaultStr>, 107 rkey: &Rkey<DefaultStr>, 108 ) { 109 Mock::given(method("GET")) 110 .and(path("/xrpc/com.atproto.repo.getRecord")) 111 .and(query_param("repo", did.as_ref())) 112 .and(query_param("collection", collection.as_ref())) 113 .and(query_param("rkey", rkey.as_ref())) 114 .respond_with( 115 ResponseTemplate::new(404) 116 .set_body_json(json!({"error": "RecordNotFound", "message": "missing"})), 117 ) 118 .mount(&self.server) 119 .await; 120 } 121} 122 123fn enc(s: &str) -> String { 124 byte_serialize(s.as_bytes()).collect() 125} 126 127fn bulk_request(endpoint: &str, key: &str, values: &[&str]) -> Request<Body> { 128 let qs = values 129 .iter() 130 .map(|v| format!("{key}={}", enc(v))) 131 .collect::<Vec<_>>() 132 .join("&"); 133 Request::builder() 134 .uri(format!("/xrpc/{endpoint}?{qs}")) 135 .body(Body::empty()) 136 .unwrap() 137} 138 139async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 140 let status = resp.status(); 141 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 142 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 143 (status, parsed) 144} 145 146fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value { 147 json!({ 148 "$type": "sh.tangled.repo.issue", 149 "repo": repo_did.as_ref(), 150 "title": title, 151 "createdAt": "2026-05-01T00:00:00Z" 152 }) 153} 154 155fn pull_body(target_repo: &Did<DefaultStr>, title: &str) -> Value { 156 json!({ 157 "$type": "sh.tangled.repo.pull", 158 "title": title, 159 "createdAt": "2026-05-01T00:00:00Z", 160 "rounds": [], 161 "target": { 162 "branch": "main", 163 "repo": target_repo.as_ref() 164 } 165 }) 166} 167 168fn repo_body(name: &str) -> Value { 169 json!({ 170 "$type": "sh.tangled.repo", 171 "name": name, 172 "knot": "oyster.cafe", 173 "createdAt": "2026-05-01T00:00:00Z" 174 }) 175} 176 177fn profile_body(handle: &Handle<DefaultStr>) -> Value { 178 json!({ 179 "$type": "sh.tangled.actor.profile", 180 "bluesky": false, 181 "preferredHandle": handle.as_ref() 182 }) 183} 184 185#[tokio::test] 186async fn get_repos_returns_all_resolved_records() { 187 let h = Harness::new().await; 188 h.mount( 189 &did("did:plc:nel"), 190 &nsid("sh.tangled.repo"), 191 &rkey("abalone"), 192 repo_body("abalone"), 193 ) 194 .await; 195 h.mount( 196 &did("did:plc:teq"), 197 &nsid("sh.tangled.repo"), 198 &rkey("limpet"), 199 repo_body("limpet"), 200 ) 201 .await; 202 let app = router(h.state.clone()); 203 let (status, body) = json_response( 204 app.oneshot(bulk_request( 205 "sh.tangled.repo.getRepos", 206 "repos", 207 &[ 208 "at://did:plc:nel/sh.tangled.repo/abalone", 209 "at://did:plc:teq/sh.tangled.repo/limpet", 210 ], 211 )) 212 .await 213 .unwrap(), 214 ) 215 .await; 216 assert_eq!(status, StatusCode::OK); 217 let items = body["items"].as_array().unwrap(); 218 assert_eq!(items.len(), 2); 219 let names: Vec<&str> = items 220 .iter() 221 .map(|v| v["value"]["name"].as_str().unwrap()) 222 .collect(); 223 assert!(names.contains(&"abalone")); 224 assert!(names.contains(&"limpet")); 225} 226 227#[tokio::test] 228async fn get_profiles_returns_all_resolved_profiles() { 229 let h = Harness::new().await; 230 h.mount( 231 &did("did:plc:nel"), 232 &nsid("sh.tangled.actor.profile"), 233 &rkey("self"), 234 profile_body(&handle("witchcraft.systems")), 235 ) 236 .await; 237 h.mount( 238 &did("did:plc:teq"), 239 &nsid("sh.tangled.actor.profile"), 240 &rkey("self"), 241 profile_body(&handle("olaren.dev")), 242 ) 243 .await; 244 let app = router(h.state.clone()); 245 let (status, body) = json_response( 246 app.oneshot(bulk_request( 247 "sh.tangled.actor.getProfiles", 248 "actors", 249 &[ 250 "at://did:plc:nel/sh.tangled.actor.profile/self", 251 "at://did:plc:teq/sh.tangled.actor.profile/self", 252 ], 253 )) 254 .await 255 .unwrap(), 256 ) 257 .await; 258 assert_eq!(status, StatusCode::OK); 259 let items = body["items"].as_array().unwrap(); 260 assert_eq!(items.len(), 2); 261} 262 263#[tokio::test] 264async fn get_issues_returns_all_resolved_issues() { 265 let h = Harness::new().await; 266 let repo = did("did:plc:abalone"); 267 h.mount( 268 &did("did:plc:nel"), 269 &nsid("sh.tangled.repo.issue"), 270 &rkey("i1"), 271 issue_body(&repo, "first"), 272 ) 273 .await; 274 h.mount( 275 &did("did:plc:olaren"), 276 &nsid("sh.tangled.repo.issue"), 277 &rkey("i2"), 278 issue_body(&repo, "second"), 279 ) 280 .await; 281 let app = router(h.state.clone()); 282 let (status, body) = json_response( 283 app.oneshot(bulk_request( 284 "sh.tangled.repo.getIssues", 285 "issues", 286 &[ 287 "at://did:plc:nel/sh.tangled.repo.issue/i1", 288 "at://did:plc:olaren/sh.tangled.repo.issue/i2", 289 ], 290 )) 291 .await 292 .unwrap(), 293 ) 294 .await; 295 assert_eq!(status, StatusCode::OK); 296 let items = body["items"].as_array().unwrap(); 297 assert_eq!(items.len(), 2); 298} 299 300#[tokio::test] 301async fn get_pulls_returns_all_resolved_pulls() { 302 let h = Harness::new().await; 303 let target = did("did:plc:abalone"); 304 h.mount( 305 &did("did:plc:nel"), 306 &nsid("sh.tangled.repo.pull"), 307 &rkey("p1"), 308 pull_body(&target, "patch one"), 309 ) 310 .await; 311 let app = router(h.state.clone()); 312 let (status, body) = json_response( 313 app.oneshot(bulk_request( 314 "sh.tangled.repo.getPulls", 315 "pulls", 316 &["at://did:plc:nel/sh.tangled.repo.pull/p1"], 317 )) 318 .await 319 .unwrap(), 320 ) 321 .await; 322 assert_eq!(status, StatusCode::OK); 323 let items = body["items"].as_array().unwrap(); 324 assert_eq!(items.len(), 1); 325 assert_eq!(items[0]["value"]["title"], json!("patch one")); 326} 327 328#[tokio::test] 329async fn missing_records_are_dropped_silently() { 330 let h = Harness::new().await; 331 h.mount( 332 &did("did:plc:nel"), 333 &nsid("sh.tangled.repo"), 334 &rkey("abalone"), 335 repo_body("abalone"), 336 ) 337 .await; 338 h.mount_404( 339 &did("did:plc:teq"), 340 &nsid("sh.tangled.repo"), 341 &rkey("ghost"), 342 ) 343 .await; 344 let app = router(h.state.clone()); 345 let (status, body) = json_response( 346 app.oneshot(bulk_request( 347 "sh.tangled.repo.getRepos", 348 "repos", 349 &[ 350 "at://did:plc:nel/sh.tangled.repo/abalone", 351 "at://did:plc:teq/sh.tangled.repo/ghost", 352 ], 353 )) 354 .await 355 .unwrap(), 356 ) 357 .await; 358 assert_eq!(status, StatusCode::OK); 359 let items = body["items"].as_array().unwrap(); 360 assert_eq!( 361 items.len(), 362 1, 363 "missing records must be dropped not fail the bulk call" 364 ); 365 assert_eq!(items[0]["value"]["name"], json!("abalone")); 366} 367 368#[tokio::test] 369async fn transient_failure_drops_only_that_record() { 370 let h = Harness::new().await; 371 h.mount( 372 &did("did:plc:nel"), 373 &nsid("sh.tangled.repo"), 374 &rkey("conch"), 375 repo_body("conch"), 376 ) 377 .await; 378 Mock::given(method("GET")) 379 .and(path("/xrpc/com.atproto.repo.getRecord")) 380 .and(query_param("repo", "did:plc:teq")) 381 .and(query_param("collection", "sh.tangled.repo")) 382 .and(query_param("rkey", "flaky")) 383 .respond_with(ResponseTemplate::new(503)) 384 .mount(&h.server) 385 .await; 386 let app = router(h.state.clone()); 387 let (status, body) = json_response( 388 app.oneshot(bulk_request( 389 "sh.tangled.repo.getRepos", 390 "repos", 391 &[ 392 "at://did:plc:nel/sh.tangled.repo/conch", 393 "at://did:plc:teq/sh.tangled.repo/flaky", 394 ], 395 )) 396 .await 397 .unwrap(), 398 ) 399 .await; 400 assert_eq!(status, StatusCode::OK); 401 let items = body["items"].as_array().unwrap(); 402 assert_eq!( 403 items.len(), 404 1, 405 "transient upstream failure drops that record, it must not fail the bulk call" 406 ); 407 assert_eq!(items[0]["value"]["name"], json!("conch")); 408} 409 410#[tokio::test] 411async fn wrong_collection_uri_fails_bulk_request() { 412 let h = Harness::new().await; 413 h.mount( 414 &did("did:plc:nel"), 415 &nsid("sh.tangled.repo"), 416 &rkey("conch"), 417 repo_body("conch"), 418 ) 419 .await; 420 let app = router(h.state.clone()); 421 let (status, body) = json_response( 422 app.oneshot(bulk_request( 423 "sh.tangled.repo.getRepos", 424 "repos", 425 &[ 426 "at://did:plc:nel/sh.tangled.repo/conch", 427 "at://did:plc:teq/sh.tangled.repo.issue/whelk", 428 ], 429 )) 430 .await 431 .unwrap(), 432 ) 433 .await; 434 assert_eq!( 435 status, 436 StatusCode::BAD_REQUEST, 437 "a wrong-collection uri must fail the whole bulk request" 438 ); 439 assert_eq!(body["error"], "InvalidRequest"); 440} 441 442#[tokio::test] 443async fn empty_uri_list_is_rejected() { 444 let h = Harness::new().await; 445 let app = router(h.state.clone()); 446 let resp = app 447 .oneshot( 448 Request::builder() 449 .uri("/xrpc/sh.tangled.repo.getRepos") 450 .body(Body::empty()) 451 .unwrap(), 452 ) 453 .await 454 .unwrap(); 455 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 456} 457 458#[tokio::test] 459async fn over_limit_uri_list_is_rejected() { 460 let h = Harness::new().await; 461 let uris: Vec<String> = (0..51) 462 .map(|i| format!("at://did:plc:nel/sh.tangled.repo/r{i}")) 463 .collect(); 464 let refs: Vec<&str> = uris.iter().map(|s| s.as_str()).collect(); 465 let app = router(h.state.clone()); 466 let resp = app 467 .oneshot(bulk_request("sh.tangled.repo.getRepos", "repos", &refs)) 468 .await 469 .unwrap(); 470 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 471} 472 473#[tokio::test] 474async fn malformed_uri_in_list_returns_400() { 475 let h = Harness::new().await; 476 let app = router(h.state.clone()); 477 let resp = app 478 .oneshot(bulk_request( 479 "sh.tangled.repo.getRepos", 480 "repos", 481 &["not-a-uri"], 482 )) 483 .await 484 .unwrap(); 485 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 486}