Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use std::sync::Arc; 2 3use axum::body::{Body, to_bytes}; 4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex}; 5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 6use bobbin_record_lru::{CacheCapacity, LruRecordStore}; 7use bobbin_resolver::RepoIdResolver; 8use bobbin_runtime::{RuntimeHasher, SystemClock}; 9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader}; 10use bobbin_slingshot_client::SlingshotClient; 11use bobbin_xrpc::{AppState, router}; 12use http::{Request, StatusCode}; 13use jacquard_common::DefaultStr; 14use jacquard_common::types::did::Did; 15use jacquard_common::types::handle::Handle; 16use jacquard_common::types::nsid::Nsid; 17use jacquard_common::types::recordkey::Rkey; 18use serde_json::{Value, json}; 19use tower::ServiceExt; 20use url::Url; 21use url::form_urlencoded::byte_serialize; 22use wiremock::matchers::{method, path, query_param}; 23use wiremock::{Mock, MockServer, ResponseTemplate}; 24 25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i"; 26 27fn did(s: &str) -> Did<DefaultStr> { 28 Did::new_owned(s).unwrap() 29} 30 31fn rkey(s: &str) -> Rkey<DefaultStr> { 32 Rkey::new_owned(s).unwrap() 33} 34 35fn nsid(s: &'static str) -> Nsid<DefaultStr> { 36 Nsid::new_static(s).unwrap() 37} 38 39fn handle(s: &str) -> Handle<DefaultStr> { 40 Handle::new_owned(s).unwrap() 41} 42 43struct Harness { 44 server: MockServer, 45 state: AppState, 46} 47 48impl Harness { 49 async fn new() -> Self { 50 let server = MockServer::start().await; 51 let coverage = Arc::new(CoverageWatch::new()); 52 let state = AppState::new( 53 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))), 54 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(), 55 Arc::new(EdgeStore::new(RuntimeHasher::default())), 56 Arc::new(StateIndex::new(RuntimeHasher::default())), 57 Arc::new(StateIndex::new(RuntimeHasher::default())), 58 coverage, 59 Arc::new( 60 KnotProxy::new( 61 KnotProxyConfig::default(), 62 KnotHttpConfig::default(), 63 Arc::new(SystemClock::new()), 64 RuntimeHasher::default(), 65 ) 66 .unwrap(), 67 ), 68 Arc::new( 69 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(), 70 ) as Arc<dyn SearchReader>, 71 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())), 72 ); 73 Self { server, state } 74 } 75 76 async fn mount( 77 &self, 78 did: &Did<DefaultStr>, 79 collection: &Nsid<DefaultStr>, 80 rkey: &Rkey<DefaultStr>, 81 value: Value, 82 ) { 83 let uri = format!( 84 "at://{}/{}/{}", 85 did.as_ref(), 86 collection.as_ref(), 87 rkey.as_ref() 88 ); 89 Mock::given(method("GET")) 90 .and(path("/xrpc/com.atproto.repo.getRecord")) 91 .and(query_param("repo", did.as_ref())) 92 .and(query_param("collection", collection.as_ref())) 93 .and(query_param("rkey", rkey.as_ref())) 94 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 95 "uri": uri, 96 "cid": CID, 97 "value": value, 98 }))) 99 .mount(&self.server) 100 .await; 101 } 102 103 async fn mount_404( 104 &self, 105 did: &Did<DefaultStr>, 106 collection: &Nsid<DefaultStr>, 107 rkey: &Rkey<DefaultStr>, 108 ) { 109 Mock::given(method("GET")) 110 .and(path("/xrpc/com.atproto.repo.getRecord")) 111 .and(query_param("repo", did.as_ref())) 112 .and(query_param("collection", collection.as_ref())) 113 .and(query_param("rkey", rkey.as_ref())) 114 .respond_with( 115 ResponseTemplate::new(404) 116 .set_body_json(json!({"error": "RecordNotFound", "message": "missing"})), 117 ) 118 .mount(&self.server) 119 .await; 120 } 121} 122 123fn enc(s: &str) -> String { 124 byte_serialize(s.as_bytes()).collect() 125} 126 127fn bulk_request(endpoint: &str, key: &str, values: &[&str]) -> Request<Body> { 128 let qs = values 129 .iter() 130 .map(|v| format!("{key}={v}")) 131 .collect::<Vec<_>>() 132 .join("&"); 133 Request::builder() 134 .uri(format!("/xrpc/{endpoint}?{qs}")) 135 .body(Body::empty()) 136 .unwrap() 137} 138 139fn bulk_request_escaped(endpoint: &str, key: &str, values: &[&str]) -> Request<Body> { 140 let qs = values 141 .iter() 142 .map(|v| format!("{key}={}", enc(v))) 143 .collect::<Vec<_>>() 144 .join("&"); 145 Request::builder() 146 .uri(format!("/xrpc/{endpoint}?{qs}")) 147 .body(Body::empty()) 148 .unwrap() 149} 150 151async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) { 152 let status = resp.status(); 153 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap(); 154 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body"); 155 (status, parsed) 156} 157 158fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value { 159 json!({ 160 "$type": "sh.tangled.repo.issue", 161 "repo": repo_did.as_ref(), 162 "title": title, 163 "createdAt": "2026-05-01T00:00:00Z" 164 }) 165} 166 167fn pull_body(target_repo: &Did<DefaultStr>, title: &str) -> Value { 168 json!({ 169 "$type": "sh.tangled.repo.pull", 170 "title": title, 171 "createdAt": "2026-05-01T00:00:00Z", 172 "rounds": [], 173 "target": { 174 "branch": "main", 175 "repo": target_repo.as_ref() 176 } 177 }) 178} 179 180fn repo_body(name: &str) -> Value { 181 json!({ 182 "$type": "sh.tangled.repo", 183 "name": name, 184 "knot": "oyster.cafe", 185 "createdAt": "2026-05-01T00:00:00Z" 186 }) 187} 188 189fn profile_body(handle: &Handle<DefaultStr>) -> Value { 190 json!({ 191 "$type": "sh.tangled.actor.profile", 192 "bluesky": false, 193 "preferredHandle": handle.as_ref() 194 }) 195} 196 197#[tokio::test] 198async fn get_repos_returns_all_resolved_records() { 199 let h = Harness::new().await; 200 h.mount( 201 &did("did:plc:nel"), 202 &nsid("sh.tangled.repo"), 203 &rkey("abalone"), 204 repo_body("abalone"), 205 ) 206 .await; 207 h.mount( 208 &did("did:plc:teq"), 209 &nsid("sh.tangled.repo"), 210 &rkey("limpet"), 211 repo_body("limpet"), 212 ) 213 .await; 214 let app = router(h.state.clone()); 215 let (status, body) = json_response( 216 app.oneshot(bulk_request( 217 "sh.tangled.repo.getRepos", 218 "repos", 219 &[ 220 "at://did:plc:nel/sh.tangled.repo/abalone", 221 "at://did:plc:teq/sh.tangled.repo/limpet", 222 ], 223 )) 224 .await 225 .unwrap(), 226 ) 227 .await; 228 assert_eq!(status, StatusCode::OK); 229 let items = body["items"].as_array().unwrap(); 230 assert_eq!(items.len(), 2); 231 let names: Vec<&str> = items 232 .iter() 233 .map(|v| v["value"]["name"].as_str().unwrap()) 234 .collect(); 235 assert!(names.contains(&"abalone")); 236 assert!(names.contains(&"limpet")); 237} 238 239#[tokio::test] 240async fn get_profiles_returns_all_resolved_profiles() { 241 let h = Harness::new().await; 242 h.mount( 243 &did("did:plc:nel"), 244 &nsid("sh.tangled.actor.profile"), 245 &rkey("self"), 246 profile_body(&handle("witchcraft.systems")), 247 ) 248 .await; 249 h.mount( 250 &did("did:plc:teq"), 251 &nsid("sh.tangled.actor.profile"), 252 &rkey("self"), 253 profile_body(&handle("olaren.dev")), 254 ) 255 .await; 256 let app = router(h.state.clone()); 257 let (status, body) = json_response( 258 app.oneshot(bulk_request( 259 "sh.tangled.actor.getProfiles", 260 "actors", 261 &[ 262 "at://did:plc:nel/sh.tangled.actor.profile/self", 263 "at://did:plc:teq/sh.tangled.actor.profile/self", 264 ], 265 )) 266 .await 267 .unwrap(), 268 ) 269 .await; 270 assert_eq!(status, StatusCode::OK); 271 let items = body["items"].as_array().unwrap(); 272 assert_eq!(items.len(), 2); 273} 274 275#[tokio::test] 276async fn get_profiles_accepts_percent_escaped_at_uris() { 277 let h = Harness::new().await; 278 h.mount( 279 &did("did:plc:nel"), 280 &nsid("sh.tangled.actor.profile"), 281 &rkey("self"), 282 profile_body(&handle("witchcraft.systems")), 283 ) 284 .await; 285 let app = router(h.state.clone()); 286 let (status, body) = json_response( 287 app.oneshot(bulk_request_escaped( 288 "sh.tangled.actor.getProfiles", 289 "actors", 290 &["at://did:plc:nel/sh.tangled.actor.profile/self"], 291 )) 292 .await 293 .unwrap(), 294 ) 295 .await; 296 assert_eq!(status, StatusCode::OK); 297 assert_eq!(body["items"].as_array().unwrap().len(), 1); 298} 299 300#[tokio::test] 301async fn get_issues_returns_all_resolved_issues() { 302 let h = Harness::new().await; 303 let repo = did("did:plc:abalone"); 304 h.mount( 305 &did("did:plc:nel"), 306 &nsid("sh.tangled.repo.issue"), 307 &rkey("i1"), 308 issue_body(&repo, "first"), 309 ) 310 .await; 311 h.mount( 312 &did("did:plc:olaren"), 313 &nsid("sh.tangled.repo.issue"), 314 &rkey("i2"), 315 issue_body(&repo, "second"), 316 ) 317 .await; 318 let app = router(h.state.clone()); 319 let (status, body) = json_response( 320 app.oneshot(bulk_request( 321 "sh.tangled.repo.getIssues", 322 "issues", 323 &[ 324 "at://did:plc:nel/sh.tangled.repo.issue/i1", 325 "at://did:plc:olaren/sh.tangled.repo.issue/i2", 326 ], 327 )) 328 .await 329 .unwrap(), 330 ) 331 .await; 332 assert_eq!(status, StatusCode::OK); 333 let items = body["items"].as_array().unwrap(); 334 assert_eq!(items.len(), 2); 335} 336 337#[tokio::test] 338async fn get_pulls_returns_all_resolved_pulls() { 339 let h = Harness::new().await; 340 let target = did("did:plc:abalone"); 341 h.mount( 342 &did("did:plc:nel"), 343 &nsid("sh.tangled.repo.pull"), 344 &rkey("p1"), 345 pull_body(&target, "patch one"), 346 ) 347 .await; 348 let app = router(h.state.clone()); 349 let (status, body) = json_response( 350 app.oneshot(bulk_request( 351 "sh.tangled.repo.getPulls", 352 "pulls", 353 &["at://did:plc:nel/sh.tangled.repo.pull/p1"], 354 )) 355 .await 356 .unwrap(), 357 ) 358 .await; 359 assert_eq!(status, StatusCode::OK); 360 let items = body["items"].as_array().unwrap(); 361 assert_eq!(items.len(), 1); 362 assert_eq!(items[0]["value"]["title"], json!("patch one")); 363} 364 365#[tokio::test] 366async fn missing_records_are_dropped_silently() { 367 let h = Harness::new().await; 368 h.mount( 369 &did("did:plc:nel"), 370 &nsid("sh.tangled.repo"), 371 &rkey("abalone"), 372 repo_body("abalone"), 373 ) 374 .await; 375 h.mount_404( 376 &did("did:plc:teq"), 377 &nsid("sh.tangled.repo"), 378 &rkey("ghost"), 379 ) 380 .await; 381 let app = router(h.state.clone()); 382 let (status, body) = json_response( 383 app.oneshot(bulk_request( 384 "sh.tangled.repo.getRepos", 385 "repos", 386 &[ 387 "at://did:plc:nel/sh.tangled.repo/abalone", 388 "at://did:plc:teq/sh.tangled.repo/ghost", 389 ], 390 )) 391 .await 392 .unwrap(), 393 ) 394 .await; 395 assert_eq!(status, StatusCode::OK); 396 let items = body["items"].as_array().unwrap(); 397 assert_eq!( 398 items.len(), 399 1, 400 "missing records must be dropped not fail the bulk call" 401 ); 402 assert_eq!(items[0]["value"]["name"], json!("abalone")); 403} 404 405#[tokio::test] 406async fn transient_failure_drops_only_that_record() { 407 let h = Harness::new().await; 408 h.mount( 409 &did("did:plc:nel"), 410 &nsid("sh.tangled.repo"), 411 &rkey("conch"), 412 repo_body("conch"), 413 ) 414 .await; 415 Mock::given(method("GET")) 416 .and(path("/xrpc/com.atproto.repo.getRecord")) 417 .and(query_param("repo", "did:plc:teq")) 418 .and(query_param("collection", "sh.tangled.repo")) 419 .and(query_param("rkey", "flaky")) 420 .respond_with(ResponseTemplate::new(503)) 421 .mount(&h.server) 422 .await; 423 let app = router(h.state.clone()); 424 let (status, body) = json_response( 425 app.oneshot(bulk_request( 426 "sh.tangled.repo.getRepos", 427 "repos", 428 &[ 429 "at://did:plc:nel/sh.tangled.repo/conch", 430 "at://did:plc:teq/sh.tangled.repo/flaky", 431 ], 432 )) 433 .await 434 .unwrap(), 435 ) 436 .await; 437 assert_eq!(status, StatusCode::OK); 438 let items = body["items"].as_array().unwrap(); 439 assert_eq!( 440 items.len(), 441 1, 442 "transient upstream failure drops that record, it must not fail the bulk call" 443 ); 444 assert_eq!(items[0]["value"]["name"], json!("conch")); 445} 446 447#[tokio::test] 448async fn wrong_collection_uri_fails_bulk_request() { 449 let h = Harness::new().await; 450 h.mount( 451 &did("did:plc:nel"), 452 &nsid("sh.tangled.repo"), 453 &rkey("conch"), 454 repo_body("conch"), 455 ) 456 .await; 457 let app = router(h.state.clone()); 458 let (status, body) = json_response( 459 app.oneshot(bulk_request( 460 "sh.tangled.repo.getRepos", 461 "repos", 462 &[ 463 "at://did:plc:nel/sh.tangled.repo/conch", 464 "at://did:plc:teq/sh.tangled.repo.issue/whelk", 465 ], 466 )) 467 .await 468 .unwrap(), 469 ) 470 .await; 471 assert_eq!( 472 status, 473 StatusCode::BAD_REQUEST, 474 "a wrong-collection uri must fail the whole bulk request" 475 ); 476 assert_eq!(body["error"], "InvalidRequest"); 477} 478 479#[tokio::test] 480async fn empty_uri_list_is_rejected() { 481 let h = Harness::new().await; 482 let app = router(h.state.clone()); 483 let resp = app 484 .oneshot( 485 Request::builder() 486 .uri("/xrpc/sh.tangled.repo.getRepos") 487 .body(Body::empty()) 488 .unwrap(), 489 ) 490 .await 491 .unwrap(); 492 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 493} 494 495#[tokio::test] 496async fn over_limit_uri_list_is_rejected() { 497 let h = Harness::new().await; 498 let uris: Vec<String> = (0..51) 499 .map(|i| format!("at://did:plc:nel/sh.tangled.repo/r{i}")) 500 .collect(); 501 let refs: Vec<&str> = uris.iter().map(|s| s.as_str()).collect(); 502 let app = router(h.state.clone()); 503 let resp = app 504 .oneshot(bulk_request("sh.tangled.repo.getRepos", "repos", &refs)) 505 .await 506 .unwrap(); 507 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 508} 509 510#[tokio::test] 511async fn malformed_uri_in_list_returns_400() { 512 let h = Harness::new().await; 513 let app = router(h.state.clone()); 514 let resp = app 515 .oneshot(bulk_request( 516 "sh.tangled.repo.getRepos", 517 "repos", 518 &["not-a-uri"], 519 )) 520 .await 521 .unwrap(); 522 assert_eq!(resp.status(), StatusCode::BAD_REQUEST); 523}