Monorepo for Tangled
tangled.org
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_xrpc::{AppState, router};
12use http::{Request, StatusCode};
13use jacquard_common::DefaultStr;
14use jacquard_common::types::did::Did;
15use jacquard_common::types::handle::Handle;
16use jacquard_common::types::nsid::Nsid;
17use jacquard_common::types::recordkey::Rkey;
18use serde_json::{Value, json};
19use tower::ServiceExt;
20use url::Url;
21use url::form_urlencoded::byte_serialize;
22use wiremock::matchers::{method, path, query_param};
23use wiremock::{Mock, MockServer, ResponseTemplate};
24
25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
26
27fn did(s: &str) -> Did<DefaultStr> {
28 Did::new_owned(s).unwrap()
29}
30
31fn rkey(s: &str) -> Rkey<DefaultStr> {
32 Rkey::new_owned(s).unwrap()
33}
34
35fn nsid(s: &'static str) -> Nsid<DefaultStr> {
36 Nsid::new_static(s).unwrap()
37}
38
39fn handle(s: &str) -> Handle<DefaultStr> {
40 Handle::new_owned(s).unwrap()
41}
42
43struct Harness {
44 server: MockServer,
45 state: AppState,
46}
47
48impl Harness {
49 async fn new() -> Self {
50 let server = MockServer::start().await;
51 let coverage = Arc::new(CoverageWatch::new());
52 let state = AppState::new(
53 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
54 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
55 Arc::new(EdgeStore::new(RuntimeHasher::default())),
56 Arc::new(StateIndex::new(RuntimeHasher::default())),
57 Arc::new(StateIndex::new(RuntimeHasher::default())),
58 coverage,
59 Arc::new(
60 KnotProxy::new(
61 KnotProxyConfig::default(),
62 KnotHttpConfig::default(),
63 Arc::new(SystemClock::new()),
64 RuntimeHasher::default(),
65 )
66 .unwrap(),
67 ),
68 Arc::new(
69 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
70 ) as Arc<dyn SearchReader>,
71 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
72 );
73 Self { server, state }
74 }
75
76 async fn mount(
77 &self,
78 did: &Did<DefaultStr>,
79 collection: &Nsid<DefaultStr>,
80 rkey: &Rkey<DefaultStr>,
81 value: Value,
82 ) {
83 let uri = format!(
84 "at://{}/{}/{}",
85 did.as_ref(),
86 collection.as_ref(),
87 rkey.as_ref()
88 );
89 Mock::given(method("GET"))
90 .and(path("/xrpc/com.atproto.repo.getRecord"))
91 .and(query_param("repo", did.as_ref()))
92 .and(query_param("collection", collection.as_ref()))
93 .and(query_param("rkey", rkey.as_ref()))
94 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
95 "uri": uri,
96 "cid": CID,
97 "value": value,
98 })))
99 .mount(&self.server)
100 .await;
101 }
102
103 async fn mount_404(
104 &self,
105 did: &Did<DefaultStr>,
106 collection: &Nsid<DefaultStr>,
107 rkey: &Rkey<DefaultStr>,
108 ) {
109 Mock::given(method("GET"))
110 .and(path("/xrpc/com.atproto.repo.getRecord"))
111 .and(query_param("repo", did.as_ref()))
112 .and(query_param("collection", collection.as_ref()))
113 .and(query_param("rkey", rkey.as_ref()))
114 .respond_with(
115 ResponseTemplate::new(404)
116 .set_body_json(json!({"error": "RecordNotFound", "message": "missing"})),
117 )
118 .mount(&self.server)
119 .await;
120 }
121}
122
123fn enc(s: &str) -> String {
124 byte_serialize(s.as_bytes()).collect()
125}
126
127fn bulk_request(endpoint: &str, key: &str, values: &[&str]) -> Request<Body> {
128 let qs = values
129 .iter()
130 .map(|v| format!("{key}={v}"))
131 .collect::<Vec<_>>()
132 .join("&");
133 Request::builder()
134 .uri(format!("/xrpc/{endpoint}?{qs}"))
135 .body(Body::empty())
136 .unwrap()
137}
138
139fn bulk_request_escaped(endpoint: &str, key: &str, values: &[&str]) -> Request<Body> {
140 let qs = values
141 .iter()
142 .map(|v| format!("{key}={}", enc(v)))
143 .collect::<Vec<_>>()
144 .join("&");
145 Request::builder()
146 .uri(format!("/xrpc/{endpoint}?{qs}"))
147 .body(Body::empty())
148 .unwrap()
149}
150
151async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
152 let status = resp.status();
153 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
154 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
155 (status, parsed)
156}
157
158fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value {
159 json!({
160 "$type": "sh.tangled.repo.issue",
161 "repo": repo_did.as_ref(),
162 "title": title,
163 "createdAt": "2026-05-01T00:00:00Z"
164 })
165}
166
167fn pull_body(target_repo: &Did<DefaultStr>, title: &str) -> Value {
168 json!({
169 "$type": "sh.tangled.repo.pull",
170 "title": title,
171 "createdAt": "2026-05-01T00:00:00Z",
172 "rounds": [],
173 "target": {
174 "branch": "main",
175 "repo": target_repo.as_ref()
176 }
177 })
178}
179
180fn repo_body(name: &str) -> Value {
181 json!({
182 "$type": "sh.tangled.repo",
183 "name": name,
184 "knot": "oyster.cafe",
185 "createdAt": "2026-05-01T00:00:00Z"
186 })
187}
188
189fn profile_body(handle: &Handle<DefaultStr>) -> Value {
190 json!({
191 "$type": "sh.tangled.actor.profile",
192 "bluesky": false,
193 "preferredHandle": handle.as_ref()
194 })
195}
196
197#[tokio::test]
198async fn get_repos_returns_all_resolved_records() {
199 let h = Harness::new().await;
200 h.mount(
201 &did("did:plc:nel"),
202 &nsid("sh.tangled.repo"),
203 &rkey("abalone"),
204 repo_body("abalone"),
205 )
206 .await;
207 h.mount(
208 &did("did:plc:teq"),
209 &nsid("sh.tangled.repo"),
210 &rkey("limpet"),
211 repo_body("limpet"),
212 )
213 .await;
214 let app = router(h.state.clone());
215 let (status, body) = json_response(
216 app.oneshot(bulk_request(
217 "sh.tangled.repo.getRepos",
218 "repos",
219 &[
220 "at://did:plc:nel/sh.tangled.repo/abalone",
221 "at://did:plc:teq/sh.tangled.repo/limpet",
222 ],
223 ))
224 .await
225 .unwrap(),
226 )
227 .await;
228 assert_eq!(status, StatusCode::OK);
229 let items = body["items"].as_array().unwrap();
230 assert_eq!(items.len(), 2);
231 let names: Vec<&str> = items
232 .iter()
233 .map(|v| v["value"]["name"].as_str().unwrap())
234 .collect();
235 assert!(names.contains(&"abalone"));
236 assert!(names.contains(&"limpet"));
237}
238
239#[tokio::test]
240async fn get_profiles_returns_all_resolved_profiles() {
241 let h = Harness::new().await;
242 h.mount(
243 &did("did:plc:nel"),
244 &nsid("sh.tangled.actor.profile"),
245 &rkey("self"),
246 profile_body(&handle("witchcraft.systems")),
247 )
248 .await;
249 h.mount(
250 &did("did:plc:teq"),
251 &nsid("sh.tangled.actor.profile"),
252 &rkey("self"),
253 profile_body(&handle("olaren.dev")),
254 )
255 .await;
256 let app = router(h.state.clone());
257 let (status, body) = json_response(
258 app.oneshot(bulk_request(
259 "sh.tangled.actor.getProfiles",
260 "actors",
261 &[
262 "at://did:plc:nel/sh.tangled.actor.profile/self",
263 "at://did:plc:teq/sh.tangled.actor.profile/self",
264 ],
265 ))
266 .await
267 .unwrap(),
268 )
269 .await;
270 assert_eq!(status, StatusCode::OK);
271 let items = body["items"].as_array().unwrap();
272 assert_eq!(items.len(), 2);
273}
274
275#[tokio::test]
276async fn get_profiles_accepts_percent_escaped_at_uris() {
277 let h = Harness::new().await;
278 h.mount(
279 &did("did:plc:nel"),
280 &nsid("sh.tangled.actor.profile"),
281 &rkey("self"),
282 profile_body(&handle("witchcraft.systems")),
283 )
284 .await;
285 let app = router(h.state.clone());
286 let (status, body) = json_response(
287 app.oneshot(bulk_request_escaped(
288 "sh.tangled.actor.getProfiles",
289 "actors",
290 &["at://did:plc:nel/sh.tangled.actor.profile/self"],
291 ))
292 .await
293 .unwrap(),
294 )
295 .await;
296 assert_eq!(status, StatusCode::OK);
297 assert_eq!(body["items"].as_array().unwrap().len(), 1);
298}
299
300#[tokio::test]
301async fn get_issues_returns_all_resolved_issues() {
302 let h = Harness::new().await;
303 let repo = did("did:plc:abalone");
304 h.mount(
305 &did("did:plc:nel"),
306 &nsid("sh.tangled.repo.issue"),
307 &rkey("i1"),
308 issue_body(&repo, "first"),
309 )
310 .await;
311 h.mount(
312 &did("did:plc:olaren"),
313 &nsid("sh.tangled.repo.issue"),
314 &rkey("i2"),
315 issue_body(&repo, "second"),
316 )
317 .await;
318 let app = router(h.state.clone());
319 let (status, body) = json_response(
320 app.oneshot(bulk_request(
321 "sh.tangled.repo.getIssues",
322 "issues",
323 &[
324 "at://did:plc:nel/sh.tangled.repo.issue/i1",
325 "at://did:plc:olaren/sh.tangled.repo.issue/i2",
326 ],
327 ))
328 .await
329 .unwrap(),
330 )
331 .await;
332 assert_eq!(status, StatusCode::OK);
333 let items = body["items"].as_array().unwrap();
334 assert_eq!(items.len(), 2);
335}
336
337#[tokio::test]
338async fn get_pulls_returns_all_resolved_pulls() {
339 let h = Harness::new().await;
340 let target = did("did:plc:abalone");
341 h.mount(
342 &did("did:plc:nel"),
343 &nsid("sh.tangled.repo.pull"),
344 &rkey("p1"),
345 pull_body(&target, "patch one"),
346 )
347 .await;
348 let app = router(h.state.clone());
349 let (status, body) = json_response(
350 app.oneshot(bulk_request(
351 "sh.tangled.repo.getPulls",
352 "pulls",
353 &["at://did:plc:nel/sh.tangled.repo.pull/p1"],
354 ))
355 .await
356 .unwrap(),
357 )
358 .await;
359 assert_eq!(status, StatusCode::OK);
360 let items = body["items"].as_array().unwrap();
361 assert_eq!(items.len(), 1);
362 assert_eq!(items[0]["value"]["title"], json!("patch one"));
363}
364
365#[tokio::test]
366async fn missing_records_are_dropped_silently() {
367 let h = Harness::new().await;
368 h.mount(
369 &did("did:plc:nel"),
370 &nsid("sh.tangled.repo"),
371 &rkey("abalone"),
372 repo_body("abalone"),
373 )
374 .await;
375 h.mount_404(
376 &did("did:plc:teq"),
377 &nsid("sh.tangled.repo"),
378 &rkey("ghost"),
379 )
380 .await;
381 let app = router(h.state.clone());
382 let (status, body) = json_response(
383 app.oneshot(bulk_request(
384 "sh.tangled.repo.getRepos",
385 "repos",
386 &[
387 "at://did:plc:nel/sh.tangled.repo/abalone",
388 "at://did:plc:teq/sh.tangled.repo/ghost",
389 ],
390 ))
391 .await
392 .unwrap(),
393 )
394 .await;
395 assert_eq!(status, StatusCode::OK);
396 let items = body["items"].as_array().unwrap();
397 assert_eq!(
398 items.len(),
399 1,
400 "missing records must be dropped not fail the bulk call"
401 );
402 assert_eq!(items[0]["value"]["name"], json!("abalone"));
403}
404
405#[tokio::test]
406async fn transient_failure_drops_only_that_record() {
407 let h = Harness::new().await;
408 h.mount(
409 &did("did:plc:nel"),
410 &nsid("sh.tangled.repo"),
411 &rkey("conch"),
412 repo_body("conch"),
413 )
414 .await;
415 Mock::given(method("GET"))
416 .and(path("/xrpc/com.atproto.repo.getRecord"))
417 .and(query_param("repo", "did:plc:teq"))
418 .and(query_param("collection", "sh.tangled.repo"))
419 .and(query_param("rkey", "flaky"))
420 .respond_with(ResponseTemplate::new(503))
421 .mount(&h.server)
422 .await;
423 let app = router(h.state.clone());
424 let (status, body) = json_response(
425 app.oneshot(bulk_request(
426 "sh.tangled.repo.getRepos",
427 "repos",
428 &[
429 "at://did:plc:nel/sh.tangled.repo/conch",
430 "at://did:plc:teq/sh.tangled.repo/flaky",
431 ],
432 ))
433 .await
434 .unwrap(),
435 )
436 .await;
437 assert_eq!(status, StatusCode::OK);
438 let items = body["items"].as_array().unwrap();
439 assert_eq!(
440 items.len(),
441 1,
442 "transient upstream failure drops that record, it must not fail the bulk call"
443 );
444 assert_eq!(items[0]["value"]["name"], json!("conch"));
445}
446
447#[tokio::test]
448async fn wrong_collection_uri_fails_bulk_request() {
449 let h = Harness::new().await;
450 h.mount(
451 &did("did:plc:nel"),
452 &nsid("sh.tangled.repo"),
453 &rkey("conch"),
454 repo_body("conch"),
455 )
456 .await;
457 let app = router(h.state.clone());
458 let (status, body) = json_response(
459 app.oneshot(bulk_request(
460 "sh.tangled.repo.getRepos",
461 "repos",
462 &[
463 "at://did:plc:nel/sh.tangled.repo/conch",
464 "at://did:plc:teq/sh.tangled.repo.issue/whelk",
465 ],
466 ))
467 .await
468 .unwrap(),
469 )
470 .await;
471 assert_eq!(
472 status,
473 StatusCode::BAD_REQUEST,
474 "a wrong-collection uri must fail the whole bulk request"
475 );
476 assert_eq!(body["error"], "InvalidRequest");
477}
478
479#[tokio::test]
480async fn empty_uri_list_is_rejected() {
481 let h = Harness::new().await;
482 let app = router(h.state.clone());
483 let resp = app
484 .oneshot(
485 Request::builder()
486 .uri("/xrpc/sh.tangled.repo.getRepos")
487 .body(Body::empty())
488 .unwrap(),
489 )
490 .await
491 .unwrap();
492 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
493}
494
495#[tokio::test]
496async fn over_limit_uri_list_is_rejected() {
497 let h = Harness::new().await;
498 let uris: Vec<String> = (0..51)
499 .map(|i| format!("at://did:plc:nel/sh.tangled.repo/r{i}"))
500 .collect();
501 let refs: Vec<&str> = uris.iter().map(|s| s.as_str()).collect();
502 let app = router(h.state.clone());
503 let resp = app
504 .oneshot(bulk_request("sh.tangled.repo.getRepos", "repos", &refs))
505 .await
506 .unwrap();
507 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
508}
509
510#[tokio::test]
511async fn malformed_uri_in_list_returns_400() {
512 let h = Harness::new().await;
513 let app = router(h.state.clone());
514 let resp = app
515 .oneshot(bulk_request(
516 "sh.tangled.repo.getRepos",
517 "repos",
518 &["not-a-uri"],
519 ))
520 .await
521 .unwrap();
522 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
523}