Monorepo for Tangled
tangled.org
1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::Duration;
5
6use bobbin_knot_proxy::{KnotHost, KnotHostError, PrivateAddressFilter, PrivateHostReason};
7use bobbin_runtime::{HttpRequest, HttpResponseHead, HttpTransport, NetworkError, ReqwestHttp};
8use bytes::{Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use futures::TryStreamExt;
11use http::{HeaderMap, StatusCode};
12use jacquard_common::DefaultStr;
13use jacquard_common::types::did::Did;
14use jacquard_common::types::nsid::Nsid;
15use serde::Deserialize;
16use thiserror::Error;
17use url::Url;
18
19const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION"));
20const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
21const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
22const MAX_BODY_BYTES: u64 = 4 * 1024 * 1024;
23const LIST_PAGE_LIMIT: i64 = 1000;
24const MAX_LIST_PAGES: usize = 256;
25
26const VERSION_NSID: &str = "sh.tangled.knot.version";
27const LIST_MEMBERS_NSID: &str = "sh.tangled.knot.listMembers";
28const LIST_COLLABORATORS_NSID: &str = "sh.tangled.repo.listCollaborators";
29
30#[derive(Clone)]
31pub struct KnotClient {
32 http: Arc<dyn HttpTransport>,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq, Deserialize)]
36#[serde(rename_all = "camelCase")]
37pub struct AclEntry {
38 pub subject: Did<DefaultStr>,
39 pub created_at: DateTime<Utc>,
40}
41
42#[derive(Clone, Copy, Debug, Eq, PartialEq)]
43pub enum Completeness {
44 Complete,
45 Truncated,
46}
47
48#[derive(Clone, Debug, Eq, PartialEq)]
49pub struct AclListing {
50 pub entries: Vec<AclEntry>,
51 pub completeness: Completeness,
52}
53
54#[derive(Debug, Error)]
55pub enum KnotClientError {
56 #[error("knot host: {0}")]
57 Host(#[from] KnotHostError),
58 #[error("blocked: knot {host} resolves to {reason} address space")]
59 PrivateHost {
60 host: String,
61 reason: PrivateHostReason,
62 },
63 #[error("http client build: {0}")]
64 Build(String),
65 #[error("network: {0}")]
66 Network(#[from] NetworkError),
67 #[error("xrpc not found")]
68 NotFound,
69 #[error("upstream returned status {0}")]
70 Upstream(StatusCode),
71 #[error("response body exceeded {limit} bytes")]
72 BodyTooLarge { limit: u64 },
73 #[error("decode response: {0}")]
74 Decode(#[from] serde_json::Error),
75}
76
77pub fn knot_endpoint(
78 host: &str,
79 dev: bool,
80 allow_private: bool,
81) -> Result<KnotHost, KnotClientError> {
82 let raw = if dev {
83 format!("http://{host}")
84 } else {
85 host.to_owned()
86 };
87 let knot = KnotHost::parse(&raw)?;
88 if !allow_private && let Some(reason) = knot.private_literal_reason() {
89 return Err(KnotClientError::PrivateHost {
90 host: host.to_owned(),
91 reason,
92 });
93 }
94 Ok(knot)
95}
96
97fn default_http_client(allow_private: bool) -> Result<reqwest::Client, reqwest::Error> {
98 reqwest::Client::builder()
99 .user_agent(USER_AGENT)
100 .timeout(REQUEST_TIMEOUT)
101 .connect_timeout(CONNECT_TIMEOUT)
102 .redirect(reqwest::redirect::Policy::none())
103 .dns_resolver(Arc::new(PrivateAddressFilter::new(allow_private)))
104 .build()
105}
106
107fn nsid(s: &'static str) -> Nsid<DefaultStr> {
108 Nsid::new_static(s).expect("static nsid literal must validate")
109}
110
111pub(crate) fn authority(host: &KnotHost) -> String {
112 let url = host.url();
113 match (url.host_str(), url.port()) {
114 (Some(h), Some(p)) => format!("{h}:{p}"),
115 (Some(h), None) => h.to_owned(),
116 (None, _) => String::new(),
117 }
118}
119
120impl KnotClient {
121 pub fn new(http: Arc<dyn HttpTransport>) -> Self {
122 Self { http }
123 }
124
125 pub fn with_default_http(allow_private: bool) -> Result<Self, KnotClientError> {
126 let client = default_http_client(allow_private)
127 .map_err(|e| KnotClientError::Build(e.to_string()))?;
128 Ok(Self::new(ReqwestHttp::shared(client)))
129 }
130
131 pub async fn capabilities(&self, host: &KnotHost) -> Result<Vec<String>, KnotClientError> {
132 let mut url = host.xrpc_url(&nsid(VERSION_NSID));
133 url.set_query(None);
134 let bytes = self.get_json(url).await?;
135 let resp: VersionWire = serde_json::from_slice(&bytes)?;
136 Ok(resp.capabilities.unwrap_or_default())
137 }
138
139 pub async fn list_members(&self, host: &KnotHost) -> Result<AclListing, KnotClientError> {
140 let subject = authority(host);
141 self.drain(host, LIST_MEMBERS_NSID, subject, None, 0, Vec::new())
142 .await
143 }
144
145 pub async fn list_collaborators(
146 &self,
147 host: &KnotHost,
148 repo: &Did<DefaultStr>,
149 ) -> Result<AclListing, KnotClientError> {
150 self.drain(
151 host,
152 LIST_COLLABORATORS_NSID,
153 repo.as_ref().to_owned(),
154 None,
155 0,
156 Vec::new(),
157 )
158 .await
159 }
160
161 fn drain<'a>(
162 &'a self,
163 host: &'a KnotHost,
164 endpoint: &'static str,
165 subject: String,
166 cursor: Option<String>,
167 page: usize,
168 mut acc: Vec<AclEntry>,
169 ) -> Pin<Box<dyn Future<Output = Result<AclListing, KnotClientError>> + Send + 'a>> {
170 Box::pin(async move {
171 if page >= MAX_LIST_PAGES {
172 tracing::warn!(
173 host = %authority(host),
174 endpoint,
175 pages = page,
176 "knot list truncated at page cap"
177 );
178 return Ok(AclListing {
179 entries: acc,
180 completeness: Completeness::Truncated,
181 });
182 }
183 let resp = self
184 .fetch_page(host, endpoint, &subject, cursor.as_deref())
185 .await?;
186 acc.extend(resp.items);
187 match resp.cursor.filter(|c| !c.is_empty()) {
188 Some(next) => {
189 self.drain(host, endpoint, subject, Some(next), page + 1, acc)
190 .await
191 }
192 None => Ok(AclListing {
193 entries: acc,
194 completeness: Completeness::Complete,
195 }),
196 }
197 })
198 }
199
200 async fn fetch_page(
201 &self,
202 host: &KnotHost,
203 endpoint: &'static str,
204 subject: &str,
205 cursor: Option<&str>,
206 ) -> Result<ListWire, KnotClientError> {
207 let mut url = host.xrpc_url(&nsid(endpoint));
208 {
209 let mut q = url.query_pairs_mut();
210 q.clear();
211 q.append_pair("subject", subject);
212 q.append_pair("limit", &LIST_PAGE_LIMIT.to_string());
213 if let Some(c) = cursor {
214 q.append_pair("cursor", c);
215 }
216 }
217 let bytes = self.get_json(url).await?;
218 Ok(serde_json::from_slice(&bytes)?)
219 }
220
221 async fn get_json(&self, url: Url) -> Result<Bytes, KnotClientError> {
222 let resp = self
223 .http
224 .execute(HttpRequest {
225 url,
226 headers: HeaderMap::new(),
227 })
228 .await?;
229 match resp.status {
230 StatusCode::OK => read_bounded(resp).await,
231 StatusCode::NOT_FOUND => Err(KnotClientError::NotFound),
232 other => Err(KnotClientError::Upstream(other)),
233 }
234 }
235}
236
237#[derive(Deserialize)]
238struct VersionWire {
239 #[serde(default)]
240 capabilities: Option<Vec<String>>,
241}
242
243#[derive(Deserialize)]
244struct ListWire {
245 #[serde(default)]
246 items: Vec<AclEntry>,
247 #[serde(default)]
248 cursor: Option<String>,
249}
250
251async fn read_bounded(resp: HttpResponseHead) -> Result<Bytes, KnotClientError> {
252 if resp.content_length.is_some_and(|len| len > MAX_BODY_BYTES) {
253 return Err(KnotClientError::BodyTooLarge {
254 limit: MAX_BODY_BYTES,
255 });
256 }
257 let buf = resp
258 .body
259 .map_err(KnotClientError::Network)
260 .try_fold(BytesMut::new(), |mut acc, chunk| async move {
261 if (acc.len() as u64).saturating_add(chunk.len() as u64) > MAX_BODY_BYTES {
262 return Err(KnotClientError::BodyTooLarge {
263 limit: MAX_BODY_BYTES,
264 });
265 }
266 acc.extend_from_slice(&chunk);
267 Ok(acc)
268 })
269 .await?;
270 Ok(buf.freeze())
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use serde_json::json;
277 use wiremock::matchers::{method, path, query_param};
278 use wiremock::{Mock, MockServer, ResponseTemplate};
279
280 fn did(s: &str) -> Did<DefaultStr> {
281 Did::new_owned(s).unwrap()
282 }
283
284 fn client() -> KnotClient {
285 KnotClient::new(ReqwestHttp::shared(default_http_client(true).unwrap()))
286 }
287
288 fn endpoint(server: &MockServer) -> KnotHost {
289 KnotHost::parse(&server.uri()).unwrap()
290 }
291
292 #[tokio::test]
293 async fn capabilities_returns_declared_tokens() {
294 let server = MockServer::start().await;
295 Mock::given(method("GET"))
296 .and(path("/xrpc/sh.tangled.knot.version"))
297 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
298 "version": "1.0.0 (deadbeef)",
299 "capabilities": ["knot-acl"]
300 })))
301 .mount(&server)
302 .await;
303
304 let caps = client().capabilities(&endpoint(&server)).await.unwrap();
305 assert_eq!(caps, vec!["knot-acl".to_owned()]);
306 }
307
308 #[tokio::test]
309 async fn capabilities_empty_when_field_absent() {
310 let server = MockServer::start().await;
311 Mock::given(method("GET"))
312 .and(path("/xrpc/sh.tangled.knot.version"))
313 .respond_with(
314 ResponseTemplate::new(200).set_body_json(json!({ "version": "1.0.0 (cafe)" })),
315 )
316 .mount(&server)
317 .await;
318
319 let caps = client().capabilities(&endpoint(&server)).await.unwrap();
320 assert!(caps.is_empty());
321 }
322
323 #[tokio::test]
324 async fn list_members_drains_single_page() {
325 let server = MockServer::start().await;
326 let host = endpoint(&server);
327 Mock::given(method("GET"))
328 .and(path("/xrpc/sh.tangled.knot.listMembers"))
329 .and(query_param("subject", authority(&host).as_str()))
330 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
331 "items": [
332 {"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"},
333 {"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T12:00:00Z"}
334 ]
335 })))
336 .mount(&server)
337 .await;
338
339 let listing = client().list_members(&host).await.unwrap();
340 assert_eq!(listing.completeness, Completeness::Complete);
341 assert_eq!(
342 listing.entries,
343 vec![
344 AclEntry {
345 subject: did("did:plc:boltless"),
346 created_at: "2026-06-01T00:00:00Z".parse().unwrap(),
347 },
348 AclEntry {
349 subject: did("did:plc:akshay"),
350 created_at: "2026-06-02T12:00:00Z".parse().unwrap(),
351 },
352 ]
353 );
354 }
355
356 #[tokio::test]
357 async fn list_members_drains_multiple_pages() {
358 let server = MockServer::start().await;
359 let host = endpoint(&server);
360 Mock::given(method("GET"))
361 .and(path("/xrpc/sh.tangled.knot.listMembers"))
362 .and(query_param("cursor", "p2"))
363 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
364 "items": [{"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T00:00:00Z"}]
365 })))
366 .mount(&server)
367 .await;
368 Mock::given(method("GET"))
369 .and(path("/xrpc/sh.tangled.knot.listMembers"))
370 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
371 "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}],
372 "cursor": "p2"
373 })))
374 .mount(&server)
375 .await;
376
377 let listing = client().list_members(&host).await.unwrap();
378 assert_eq!(listing.completeness, Completeness::Complete);
379 let subjects: Vec<_> = listing.entries.into_iter().map(|m| m.subject).collect();
380 assert_eq!(
381 subjects,
382 vec![did("did:plc:boltless"), did("did:plc:akshay")]
383 );
384 }
385
386 #[tokio::test]
387 async fn list_collaborators_uses_repo_did_subject() {
388 let server = MockServer::start().await;
389 let host = endpoint(&server);
390 Mock::given(method("GET"))
391 .and(path("/xrpc/sh.tangled.repo.listCollaborators"))
392 .and(query_param("subject", "did:plc:scallop"))
393 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
394 "items": [{"subject": "did:plc:olaren", "addedBy": "did:plc:boltless", "createdAt": "2026-06-03T00:00:00Z"}]
395 })))
396 .mount(&server)
397 .await;
398
399 let listing = client()
400 .list_collaborators(&host, &did("did:plc:scallop"))
401 .await
402 .unwrap();
403 assert_eq!(listing.completeness, Completeness::Complete);
404 assert_eq!(listing.entries.len(), 1);
405 assert_eq!(listing.entries[0].subject, did("did:plc:olaren"));
406 }
407
408 #[tokio::test]
409 async fn drain_reports_truncation_at_page_cap() {
410 let server = MockServer::start().await;
411 let host = endpoint(&server);
412 Mock::given(method("GET"))
413 .and(path("/xrpc/sh.tangled.knot.listMembers"))
414 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
415 "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}],
416 "cursor": "more"
417 })))
418 .mount(&server)
419 .await;
420
421 let listing = client().list_members(&host).await.unwrap();
422 assert_eq!(
423 listing.completeness,
424 Completeness::Truncated,
425 "a never-terminating cursor must surface as a truncated listing"
426 );
427 assert_eq!(listing.entries.len(), MAX_LIST_PAGES);
428 }
429
430 #[tokio::test]
431 async fn maps_404_to_not_found() {
432 let server = MockServer::start().await;
433 Mock::given(method("GET"))
434 .respond_with(ResponseTemplate::new(404))
435 .mount(&server)
436 .await;
437
438 let err = client()
439 .capabilities(&endpoint(&server))
440 .await
441 .expect_err("404 must surface");
442 assert!(matches!(err, KnotClientError::NotFound));
443 }
444
445 #[tokio::test]
446 async fn maps_5xx_to_upstream() {
447 let server = MockServer::start().await;
448 Mock::given(method("GET"))
449 .respond_with(ResponseTemplate::new(503))
450 .mount(&server)
451 .await;
452
453 let err = client()
454 .capabilities(&endpoint(&server))
455 .await
456 .expect_err("5xx must surface");
457 match err {
458 KnotClientError::Upstream(s) => assert_eq!(s.as_u16(), 503),
459 other => panic!("wrong variant: {other:?}"),
460 }
461 }
462
463 #[tokio::test]
464 async fn does_not_follow_redirects() {
465 let server = MockServer::start().await;
466 Mock::given(method("GET"))
467 .and(path("/xrpc/sh.tangled.knot.version"))
468 .respond_with(ResponseTemplate::new(301).insert_header(
469 "location",
470 "https://kt.tngl.oyster.cafe/xrpc/sh.tangled.knot.version",
471 ))
472 .mount(&server)
473 .await;
474
475 let err = client()
476 .capabilities(&endpoint(&server))
477 .await
478 .expect_err("a redirect must surface as an error, not be followed to another knot");
479 match err {
480 KnotClientError::Upstream(s) => assert_eq!(s.as_u16(), 301),
481 other => panic!("expected Upstream(301), got {other:?}"),
482 }
483 }
484
485 #[test]
486 fn knot_endpoint_rejects_private_host() {
487 let err =
488 knot_endpoint("127.0.0.1:9", true, false).expect_err("private host must be refused");
489 assert!(matches!(err, KnotClientError::PrivateHost { .. }));
490 }
491
492 #[test]
493 fn knot_endpoint_allows_private_when_permitted() {
494 let knot = knot_endpoint("127.0.0.1:9", true, true).expect("private host allowed");
495 assert_eq!(knot.url().scheme(), "http");
496 }
497}