This repository has no description
1use std::sync::Arc;
2
3use axum::body::{Body, to_bytes};
4use bobbin_edge_index::{CoverageWatch, EdgeStore, StateIndex};
5use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig};
6use bobbin_record_lru::{CacheCapacity, LruRecordStore};
7use bobbin_resolver::RepoIdResolver;
8use bobbin_runtime::{RuntimeHasher, SystemClock};
9use bobbin_search::{DEFAULT_WRITER_HEAP_BYTES, SearchIndex, SearchReader};
10use bobbin_slingshot_client::SlingshotClient;
11use bobbin_xrpc::{AppState, router};
12use http::{Request, StatusCode};
13use jacquard_common::DefaultStr;
14use jacquard_common::types::did::Did;
15use jacquard_common::types::handle::Handle;
16use jacquard_common::types::nsid::Nsid;
17use jacquard_common::types::recordkey::Rkey;
18use serde_json::{Value, json};
19use tower::ServiceExt;
20use url::Url;
21use url::form_urlencoded::byte_serialize;
22use wiremock::matchers::{method, path, query_param};
23use wiremock::{Mock, MockServer, ResponseTemplate};
24
25const CID: &str = "bafyreieqygohnz2zqyvtvktbjpvhutphobcmbsnt4q5lc36ri7vpcmoz4i";
26
27fn did(s: &str) -> Did<DefaultStr> {
28 Did::new_owned(s).unwrap()
29}
30
31fn rkey(s: &str) -> Rkey<DefaultStr> {
32 Rkey::new_owned(s).unwrap()
33}
34
35fn nsid(s: &'static str) -> Nsid<DefaultStr> {
36 Nsid::new_static(s).unwrap()
37}
38
39fn handle(s: &str) -> Handle<DefaultStr> {
40 Handle::new_owned(s).unwrap()
41}
42
43struct Harness {
44 server: MockServer,
45 state: AppState,
46}
47
48impl Harness {
49 async fn new() -> Self {
50 let server = MockServer::start().await;
51 let coverage = Arc::new(CoverageWatch::new());
52 let state = AppState::new(
53 Arc::new(LruRecordStore::new(CacheCapacity::from_bytes(64 * 1024))),
54 SlingshotClient::with_default_http(Url::parse(&server.uri()).unwrap()).unwrap(),
55 Arc::new(EdgeStore::new(RuntimeHasher::default())),
56 Arc::new(StateIndex::new(RuntimeHasher::default())),
57 Arc::new(StateIndex::new(RuntimeHasher::default())),
58 coverage,
59 Arc::new(
60 KnotProxy::new(
61 KnotProxyConfig::default(),
62 KnotHttpConfig::default(),
63 Arc::new(SystemClock::new()),
64 RuntimeHasher::default(),
65 )
66 .unwrap(),
67 ),
68 Arc::new(
69 SearchIndex::new(DEFAULT_WRITER_HEAP_BYTES, Arc::new(SystemClock::new())).unwrap(),
70 ) as Arc<dyn SearchReader>,
71 Arc::new(RepoIdResolver::detached(RuntimeHasher::default())),
72 );
73 Self { server, state }
74 }
75
76 async fn mount(
77 &self,
78 did: &Did<DefaultStr>,
79 collection: &Nsid<DefaultStr>,
80 rkey: &Rkey<DefaultStr>,
81 value: Value,
82 ) {
83 let uri = format!(
84 "at://{}/{}/{}",
85 did.as_ref(),
86 collection.as_ref(),
87 rkey.as_ref()
88 );
89 Mock::given(method("GET"))
90 .and(path("/xrpc/com.atproto.repo.getRecord"))
91 .and(query_param("repo", did.as_ref()))
92 .and(query_param("collection", collection.as_ref()))
93 .and(query_param("rkey", rkey.as_ref()))
94 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
95 "uri": uri,
96 "cid": CID,
97 "value": value,
98 })))
99 .mount(&self.server)
100 .await;
101 }
102
103 async fn mount_404(
104 &self,
105 did: &Did<DefaultStr>,
106 collection: &Nsid<DefaultStr>,
107 rkey: &Rkey<DefaultStr>,
108 ) {
109 Mock::given(method("GET"))
110 .and(path("/xrpc/com.atproto.repo.getRecord"))
111 .and(query_param("repo", did.as_ref()))
112 .and(query_param("collection", collection.as_ref()))
113 .and(query_param("rkey", rkey.as_ref()))
114 .respond_with(
115 ResponseTemplate::new(404)
116 .set_body_json(json!({"error": "RecordNotFound", "message": "missing"})),
117 )
118 .mount(&self.server)
119 .await;
120 }
121}
122
123fn enc(s: &str) -> String {
124 byte_serialize(s.as_bytes()).collect()
125}
126
127fn bulk_request(endpoint: &str, key: &str, values: &[&str]) -> Request<Body> {
128 let qs = values
129 .iter()
130 .map(|v| format!("{key}={}", enc(v)))
131 .collect::<Vec<_>>()
132 .join("&");
133 Request::builder()
134 .uri(format!("/xrpc/{endpoint}?{qs}"))
135 .body(Body::empty())
136 .unwrap()
137}
138
139async fn json_response(resp: axum::response::Response) -> (StatusCode, Value) {
140 let status = resp.status();
141 let bytes = to_bytes(resp.into_body(), 1 << 20).await.unwrap();
142 let parsed: Value = serde_json::from_slice(&bytes).expect("JSON body");
143 (status, parsed)
144}
145
146fn issue_body(repo_did: &Did<DefaultStr>, title: &str) -> Value {
147 json!({
148 "$type": "sh.tangled.repo.issue",
149 "repo": repo_did.as_ref(),
150 "title": title,
151 "createdAt": "2026-05-01T00:00:00Z"
152 })
153}
154
155fn pull_body(target_repo: &Did<DefaultStr>, title: &str) -> Value {
156 json!({
157 "$type": "sh.tangled.repo.pull",
158 "title": title,
159 "createdAt": "2026-05-01T00:00:00Z",
160 "rounds": [],
161 "target": {
162 "branch": "main",
163 "repo": target_repo.as_ref()
164 }
165 })
166}
167
168fn repo_body(name: &str) -> Value {
169 json!({
170 "$type": "sh.tangled.repo",
171 "name": name,
172 "knot": "oyster.cafe",
173 "createdAt": "2026-05-01T00:00:00Z"
174 })
175}
176
177fn profile_body(handle: &Handle<DefaultStr>) -> Value {
178 json!({
179 "$type": "sh.tangled.actor.profile",
180 "bluesky": false,
181 "preferredHandle": handle.as_ref()
182 })
183}
184
185#[tokio::test]
186async fn get_repos_returns_all_resolved_records() {
187 let h = Harness::new().await;
188 h.mount(
189 &did("did:plc:nel"),
190 &nsid("sh.tangled.repo"),
191 &rkey("abalone"),
192 repo_body("abalone"),
193 )
194 .await;
195 h.mount(
196 &did("did:plc:teq"),
197 &nsid("sh.tangled.repo"),
198 &rkey("limpet"),
199 repo_body("limpet"),
200 )
201 .await;
202 let app = router(h.state.clone());
203 let (status, body) = json_response(
204 app.oneshot(bulk_request(
205 "sh.tangled.repo.getRepos",
206 "repos",
207 &[
208 "at://did:plc:nel/sh.tangled.repo/abalone",
209 "at://did:plc:teq/sh.tangled.repo/limpet",
210 ],
211 ))
212 .await
213 .unwrap(),
214 )
215 .await;
216 assert_eq!(status, StatusCode::OK);
217 let items = body["items"].as_array().unwrap();
218 assert_eq!(items.len(), 2);
219 let names: Vec<&str> = items
220 .iter()
221 .map(|v| v["value"]["name"].as_str().unwrap())
222 .collect();
223 assert!(names.contains(&"abalone"));
224 assert!(names.contains(&"limpet"));
225}
226
227#[tokio::test]
228async fn get_profiles_returns_all_resolved_profiles() {
229 let h = Harness::new().await;
230 h.mount(
231 &did("did:plc:nel"),
232 &nsid("sh.tangled.actor.profile"),
233 &rkey("self"),
234 profile_body(&handle("witchcraft.systems")),
235 )
236 .await;
237 h.mount(
238 &did("did:plc:teq"),
239 &nsid("sh.tangled.actor.profile"),
240 &rkey("self"),
241 profile_body(&handle("olaren.dev")),
242 )
243 .await;
244 let app = router(h.state.clone());
245 let (status, body) = json_response(
246 app.oneshot(bulk_request(
247 "sh.tangled.actor.getProfiles",
248 "actors",
249 &[
250 "at://did:plc:nel/sh.tangled.actor.profile/self",
251 "at://did:plc:teq/sh.tangled.actor.profile/self",
252 ],
253 ))
254 .await
255 .unwrap(),
256 )
257 .await;
258 assert_eq!(status, StatusCode::OK);
259 let items = body["items"].as_array().unwrap();
260 assert_eq!(items.len(), 2);
261}
262
263#[tokio::test]
264async fn get_issues_returns_all_resolved_issues() {
265 let h = Harness::new().await;
266 let repo = did("did:plc:abalone");
267 h.mount(
268 &did("did:plc:nel"),
269 &nsid("sh.tangled.repo.issue"),
270 &rkey("i1"),
271 issue_body(&repo, "first"),
272 )
273 .await;
274 h.mount(
275 &did("did:plc:olaren"),
276 &nsid("sh.tangled.repo.issue"),
277 &rkey("i2"),
278 issue_body(&repo, "second"),
279 )
280 .await;
281 let app = router(h.state.clone());
282 let (status, body) = json_response(
283 app.oneshot(bulk_request(
284 "sh.tangled.repo.getIssues",
285 "issues",
286 &[
287 "at://did:plc:nel/sh.tangled.repo.issue/i1",
288 "at://did:plc:olaren/sh.tangled.repo.issue/i2",
289 ],
290 ))
291 .await
292 .unwrap(),
293 )
294 .await;
295 assert_eq!(status, StatusCode::OK);
296 let items = body["items"].as_array().unwrap();
297 assert_eq!(items.len(), 2);
298}
299
300#[tokio::test]
301async fn get_pulls_returns_all_resolved_pulls() {
302 let h = Harness::new().await;
303 let target = did("did:plc:abalone");
304 h.mount(
305 &did("did:plc:nel"),
306 &nsid("sh.tangled.repo.pull"),
307 &rkey("p1"),
308 pull_body(&target, "patch one"),
309 )
310 .await;
311 let app = router(h.state.clone());
312 let (status, body) = json_response(
313 app.oneshot(bulk_request(
314 "sh.tangled.repo.getPulls",
315 "pulls",
316 &["at://did:plc:nel/sh.tangled.repo.pull/p1"],
317 ))
318 .await
319 .unwrap(),
320 )
321 .await;
322 assert_eq!(status, StatusCode::OK);
323 let items = body["items"].as_array().unwrap();
324 assert_eq!(items.len(), 1);
325 assert_eq!(items[0]["value"]["title"], json!("patch one"));
326}
327
328#[tokio::test]
329async fn missing_records_are_dropped_silently() {
330 let h = Harness::new().await;
331 h.mount(
332 &did("did:plc:nel"),
333 &nsid("sh.tangled.repo"),
334 &rkey("abalone"),
335 repo_body("abalone"),
336 )
337 .await;
338 h.mount_404(
339 &did("did:plc:teq"),
340 &nsid("sh.tangled.repo"),
341 &rkey("ghost"),
342 )
343 .await;
344 let app = router(h.state.clone());
345 let (status, body) = json_response(
346 app.oneshot(bulk_request(
347 "sh.tangled.repo.getRepos",
348 "repos",
349 &[
350 "at://did:plc:nel/sh.tangled.repo/abalone",
351 "at://did:plc:teq/sh.tangled.repo/ghost",
352 ],
353 ))
354 .await
355 .unwrap(),
356 )
357 .await;
358 assert_eq!(status, StatusCode::OK);
359 let items = body["items"].as_array().unwrap();
360 assert_eq!(
361 items.len(),
362 1,
363 "missing records must be dropped not fail the bulk call"
364 );
365 assert_eq!(items[0]["value"]["name"], json!("abalone"));
366}
367
368#[tokio::test]
369async fn transient_failure_drops_only_that_record() {
370 let h = Harness::new().await;
371 h.mount(
372 &did("did:plc:nel"),
373 &nsid("sh.tangled.repo"),
374 &rkey("conch"),
375 repo_body("conch"),
376 )
377 .await;
378 Mock::given(method("GET"))
379 .and(path("/xrpc/com.atproto.repo.getRecord"))
380 .and(query_param("repo", "did:plc:teq"))
381 .and(query_param("collection", "sh.tangled.repo"))
382 .and(query_param("rkey", "flaky"))
383 .respond_with(ResponseTemplate::new(503))
384 .mount(&h.server)
385 .await;
386 let app = router(h.state.clone());
387 let (status, body) = json_response(
388 app.oneshot(bulk_request(
389 "sh.tangled.repo.getRepos",
390 "repos",
391 &[
392 "at://did:plc:nel/sh.tangled.repo/conch",
393 "at://did:plc:teq/sh.tangled.repo/flaky",
394 ],
395 ))
396 .await
397 .unwrap(),
398 )
399 .await;
400 assert_eq!(status, StatusCode::OK);
401 let items = body["items"].as_array().unwrap();
402 assert_eq!(
403 items.len(),
404 1,
405 "transient upstream failure drops that record, it must not fail the bulk call"
406 );
407 assert_eq!(items[0]["value"]["name"], json!("conch"));
408}
409
410#[tokio::test]
411async fn wrong_collection_uri_fails_bulk_request() {
412 let h = Harness::new().await;
413 h.mount(
414 &did("did:plc:nel"),
415 &nsid("sh.tangled.repo"),
416 &rkey("conch"),
417 repo_body("conch"),
418 )
419 .await;
420 let app = router(h.state.clone());
421 let (status, body) = json_response(
422 app.oneshot(bulk_request(
423 "sh.tangled.repo.getRepos",
424 "repos",
425 &[
426 "at://did:plc:nel/sh.tangled.repo/conch",
427 "at://did:plc:teq/sh.tangled.repo.issue/whelk",
428 ],
429 ))
430 .await
431 .unwrap(),
432 )
433 .await;
434 assert_eq!(
435 status,
436 StatusCode::BAD_REQUEST,
437 "a wrong-collection uri must fail the whole bulk request"
438 );
439 assert_eq!(body["error"], "InvalidRequest");
440}
441
442#[tokio::test]
443async fn empty_uri_list_is_rejected() {
444 let h = Harness::new().await;
445 let app = router(h.state.clone());
446 let resp = app
447 .oneshot(
448 Request::builder()
449 .uri("/xrpc/sh.tangled.repo.getRepos")
450 .body(Body::empty())
451 .unwrap(),
452 )
453 .await
454 .unwrap();
455 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
456}
457
458#[tokio::test]
459async fn over_limit_uri_list_is_rejected() {
460 let h = Harness::new().await;
461 let uris: Vec<String> = (0..51)
462 .map(|i| format!("at://did:plc:nel/sh.tangled.repo/r{i}"))
463 .collect();
464 let refs: Vec<&str> = uris.iter().map(|s| s.as_str()).collect();
465 let app = router(h.state.clone());
466 let resp = app
467 .oneshot(bulk_request("sh.tangled.repo.getRepos", "repos", &refs))
468 .await
469 .unwrap();
470 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
471}
472
473#[tokio::test]
474async fn malformed_uri_in_list_returns_400() {
475 let h = Harness::new().await;
476 let app = router(h.state.clone());
477 let resp = app
478 .oneshot(bulk_request(
479 "sh.tangled.repo.getRepos",
480 "repos",
481 &["not-a-uri"],
482 ))
483 .await
484 .unwrap();
485 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
486}