Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at master 16 kB View raw
1use std::future::Future; 2use std::pin::Pin; 3use std::sync::Arc; 4use std::time::Duration; 5 6use bobbin_knot_proxy::{KnotHost, KnotHostError, PrivateAddressFilter, PrivateHostReason}; 7use bobbin_runtime::{HttpRequest, HttpResponseHead, HttpTransport, NetworkError, ReqwestHttp}; 8use bytes::{Bytes, BytesMut}; 9use chrono::{DateTime, Utc}; 10use futures::TryStreamExt; 11use http::{HeaderMap, StatusCode}; 12use jacquard_common::DefaultStr; 13use jacquard_common::types::did::Did; 14use jacquard_common::types::nsid::Nsid; 15use serde::Deserialize; 16use thiserror::Error; 17use url::Url; 18 19const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION")); 20const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); 21const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); 22const MAX_BODY_BYTES: u64 = 4 * 1024 * 1024; 23const LIST_PAGE_LIMIT: i64 = 1000; 24const MAX_LIST_PAGES: usize = 256; 25 26const VERSION_NSID: &str = "sh.tangled.knot.version"; 27const LIST_MEMBERS_NSID: &str = "sh.tangled.knot.listMembers"; 28const LIST_COLLABORATORS_NSID: &str = "sh.tangled.repo.listCollaborators"; 29 30#[derive(Clone)] 31pub struct KnotClient { 32 http: Arc<dyn HttpTransport>, 33} 34 35#[derive(Clone, Debug, Eq, PartialEq, Deserialize)] 36#[serde(rename_all = "camelCase")] 37pub struct AclEntry { 38 pub subject: Did<DefaultStr>, 39 pub created_at: DateTime<Utc>, 40} 41 42#[derive(Clone, Copy, Debug, Eq, PartialEq)] 43pub enum Completeness { 44 Complete, 45 Truncated, 46} 47 48#[derive(Clone, Debug, Eq, PartialEq)] 49pub struct AclListing { 50 pub entries: Vec<AclEntry>, 51 pub completeness: Completeness, 52} 53 54#[derive(Debug, Error)] 55pub enum KnotClientError { 56 #[error("knot host: {0}")] 57 Host(#[from] KnotHostError), 58 #[error("blocked: knot {host} resolves to {reason} address space")] 59 PrivateHost { 60 host: String, 61 reason: PrivateHostReason, 62 }, 63 #[error("http client build: {0}")] 64 Build(String), 65 #[error("network: {0}")] 66 Network(#[from] NetworkError), 67 #[error("xrpc not found")] 68 NotFound, 69 #[error("upstream returned status {0}")] 70 Upstream(StatusCode), 71 #[error("response body exceeded {limit} bytes")] 72 BodyTooLarge { limit: u64 }, 73 #[error("decode response: {0}")] 74 Decode(#[from] serde_json::Error), 75} 76 77pub fn knot_endpoint( 78 host: &str, 79 dev: bool, 80 allow_private: bool, 81) -> Result<KnotHost, KnotClientError> { 82 let raw = if dev { 83 format!("http://{host}") 84 } else { 85 host.to_owned() 86 }; 87 let knot = KnotHost::parse(&raw)?; 88 if !allow_private && let Some(reason) = knot.private_literal_reason() { 89 return Err(KnotClientError::PrivateHost { 90 host: host.to_owned(), 91 reason, 92 }); 93 } 94 Ok(knot) 95} 96 97fn default_http_client(allow_private: bool) -> Result<reqwest::Client, reqwest::Error> { 98 reqwest::Client::builder() 99 .user_agent(USER_AGENT) 100 .timeout(REQUEST_TIMEOUT) 101 .connect_timeout(CONNECT_TIMEOUT) 102 .redirect(reqwest::redirect::Policy::none()) 103 .dns_resolver(Arc::new(PrivateAddressFilter::new(allow_private))) 104 .build() 105} 106 107fn nsid(s: &'static str) -> Nsid<DefaultStr> { 108 Nsid::new_static(s).expect("static nsid literal must validate") 109} 110 111pub(crate) fn authority(host: &KnotHost) -> String { 112 let url = host.url(); 113 match (url.host_str(), url.port()) { 114 (Some(h), Some(p)) => format!("{h}:{p}"), 115 (Some(h), None) => h.to_owned(), 116 (None, _) => String::new(), 117 } 118} 119 120impl KnotClient { 121 pub fn new(http: Arc<dyn HttpTransport>) -> Self { 122 Self { http } 123 } 124 125 pub fn with_default_http(allow_private: bool) -> Result<Self, KnotClientError> { 126 let client = default_http_client(allow_private) 127 .map_err(|e| KnotClientError::Build(e.to_string()))?; 128 Ok(Self::new(ReqwestHttp::shared(client))) 129 } 130 131 pub async fn capabilities(&self, host: &KnotHost) -> Result<Vec<String>, KnotClientError> { 132 let mut url = host.xrpc_url(&nsid(VERSION_NSID)); 133 url.set_query(None); 134 let bytes = self.get_json(url).await?; 135 let resp: VersionWire = serde_json::from_slice(&bytes)?; 136 Ok(resp.capabilities.unwrap_or_default()) 137 } 138 139 pub async fn list_members(&self, host: &KnotHost) -> Result<AclListing, KnotClientError> { 140 let subject = authority(host); 141 self.drain(host, LIST_MEMBERS_NSID, subject, None, 0, Vec::new()) 142 .await 143 } 144 145 pub async fn list_collaborators( 146 &self, 147 host: &KnotHost, 148 repo: &Did<DefaultStr>, 149 ) -> Result<AclListing, KnotClientError> { 150 self.drain( 151 host, 152 LIST_COLLABORATORS_NSID, 153 repo.as_ref().to_owned(), 154 None, 155 0, 156 Vec::new(), 157 ) 158 .await 159 } 160 161 fn drain<'a>( 162 &'a self, 163 host: &'a KnotHost, 164 endpoint: &'static str, 165 subject: String, 166 cursor: Option<String>, 167 page: usize, 168 mut acc: Vec<AclEntry>, 169 ) -> Pin<Box<dyn Future<Output = Result<AclListing, KnotClientError>> + Send + 'a>> { 170 Box::pin(async move { 171 if page >= MAX_LIST_PAGES { 172 tracing::warn!( 173 host = %authority(host), 174 endpoint, 175 pages = page, 176 "knot list truncated at page cap" 177 ); 178 return Ok(AclListing { 179 entries: acc, 180 completeness: Completeness::Truncated, 181 }); 182 } 183 let resp = self 184 .fetch_page(host, endpoint, &subject, cursor.as_deref()) 185 .await?; 186 acc.extend(resp.items); 187 match resp.cursor.filter(|c| !c.is_empty()) { 188 Some(next) => { 189 self.drain(host, endpoint, subject, Some(next), page + 1, acc) 190 .await 191 } 192 None => Ok(AclListing { 193 entries: acc, 194 completeness: Completeness::Complete, 195 }), 196 } 197 }) 198 } 199 200 async fn fetch_page( 201 &self, 202 host: &KnotHost, 203 endpoint: &'static str, 204 subject: &str, 205 cursor: Option<&str>, 206 ) -> Result<ListWire, KnotClientError> { 207 let mut url = host.xrpc_url(&nsid(endpoint)); 208 { 209 let mut q = url.query_pairs_mut(); 210 q.clear(); 211 q.append_pair("subject", subject); 212 q.append_pair("limit", &LIST_PAGE_LIMIT.to_string()); 213 if let Some(c) = cursor { 214 q.append_pair("cursor", c); 215 } 216 } 217 let bytes = self.get_json(url).await?; 218 Ok(serde_json::from_slice(&bytes)?) 219 } 220 221 async fn get_json(&self, url: Url) -> Result<Bytes, KnotClientError> { 222 let resp = self 223 .http 224 .execute(HttpRequest { 225 url, 226 headers: HeaderMap::new(), 227 }) 228 .await?; 229 match resp.status { 230 StatusCode::OK => read_bounded(resp).await, 231 StatusCode::NOT_FOUND => Err(KnotClientError::NotFound), 232 other => Err(KnotClientError::Upstream(other)), 233 } 234 } 235} 236 237#[derive(Deserialize)] 238struct VersionWire { 239 #[serde(default)] 240 capabilities: Option<Vec<String>>, 241} 242 243#[derive(Deserialize)] 244struct ListWire { 245 #[serde(default)] 246 items: Vec<AclEntry>, 247 #[serde(default)] 248 cursor: Option<String>, 249} 250 251async fn read_bounded(resp: HttpResponseHead) -> Result<Bytes, KnotClientError> { 252 if resp.content_length.is_some_and(|len| len > MAX_BODY_BYTES) { 253 return Err(KnotClientError::BodyTooLarge { 254 limit: MAX_BODY_BYTES, 255 }); 256 } 257 let buf = resp 258 .body 259 .map_err(KnotClientError::Network) 260 .try_fold(BytesMut::new(), |mut acc, chunk| async move { 261 if (acc.len() as u64).saturating_add(chunk.len() as u64) > MAX_BODY_BYTES { 262 return Err(KnotClientError::BodyTooLarge { 263 limit: MAX_BODY_BYTES, 264 }); 265 } 266 acc.extend_from_slice(&chunk); 267 Ok(acc) 268 }) 269 .await?; 270 Ok(buf.freeze()) 271} 272 273#[cfg(test)] 274mod tests { 275 use super::*; 276 use serde_json::json; 277 use wiremock::matchers::{method, path, query_param}; 278 use wiremock::{Mock, MockServer, ResponseTemplate}; 279 280 fn did(s: &str) -> Did<DefaultStr> { 281 Did::new_owned(s).unwrap() 282 } 283 284 fn client() -> KnotClient { 285 KnotClient::new(ReqwestHttp::shared(default_http_client(true).unwrap())) 286 } 287 288 fn endpoint(server: &MockServer) -> KnotHost { 289 KnotHost::parse(&server.uri()).unwrap() 290 } 291 292 #[tokio::test] 293 async fn capabilities_returns_declared_tokens() { 294 let server = MockServer::start().await; 295 Mock::given(method("GET")) 296 .and(path("/xrpc/sh.tangled.knot.version")) 297 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 298 "version": "1.0.0 (deadbeef)", 299 "capabilities": ["knot-acl"] 300 }))) 301 .mount(&server) 302 .await; 303 304 let caps = client().capabilities(&endpoint(&server)).await.unwrap(); 305 assert_eq!(caps, vec!["knot-acl".to_owned()]); 306 } 307 308 #[tokio::test] 309 async fn capabilities_empty_when_field_absent() { 310 let server = MockServer::start().await; 311 Mock::given(method("GET")) 312 .and(path("/xrpc/sh.tangled.knot.version")) 313 .respond_with( 314 ResponseTemplate::new(200).set_body_json(json!({ "version": "1.0.0 (cafe)" })), 315 ) 316 .mount(&server) 317 .await; 318 319 let caps = client().capabilities(&endpoint(&server)).await.unwrap(); 320 assert!(caps.is_empty()); 321 } 322 323 #[tokio::test] 324 async fn list_members_drains_single_page() { 325 let server = MockServer::start().await; 326 let host = endpoint(&server); 327 Mock::given(method("GET")) 328 .and(path("/xrpc/sh.tangled.knot.listMembers")) 329 .and(query_param("subject", authority(&host).as_str())) 330 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 331 "items": [ 332 {"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}, 333 {"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T12:00:00Z"} 334 ] 335 }))) 336 .mount(&server) 337 .await; 338 339 let listing = client().list_members(&host).await.unwrap(); 340 assert_eq!(listing.completeness, Completeness::Complete); 341 assert_eq!( 342 listing.entries, 343 vec![ 344 AclEntry { 345 subject: did("did:plc:boltless"), 346 created_at: "2026-06-01T00:00:00Z".parse().unwrap(), 347 }, 348 AclEntry { 349 subject: did("did:plc:akshay"), 350 created_at: "2026-06-02T12:00:00Z".parse().unwrap(), 351 }, 352 ] 353 ); 354 } 355 356 #[tokio::test] 357 async fn list_members_drains_multiple_pages() { 358 let server = MockServer::start().await; 359 let host = endpoint(&server); 360 Mock::given(method("GET")) 361 .and(path("/xrpc/sh.tangled.knot.listMembers")) 362 .and(query_param("cursor", "p2")) 363 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 364 "items": [{"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T00:00:00Z"}] 365 }))) 366 .mount(&server) 367 .await; 368 Mock::given(method("GET")) 369 .and(path("/xrpc/sh.tangled.knot.listMembers")) 370 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 371 "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}], 372 "cursor": "p2" 373 }))) 374 .mount(&server) 375 .await; 376 377 let listing = client().list_members(&host).await.unwrap(); 378 assert_eq!(listing.completeness, Completeness::Complete); 379 let subjects: Vec<_> = listing.entries.into_iter().map(|m| m.subject).collect(); 380 assert_eq!( 381 subjects, 382 vec![did("did:plc:boltless"), did("did:plc:akshay")] 383 ); 384 } 385 386 #[tokio::test] 387 async fn list_collaborators_uses_repo_did_subject() { 388 let server = MockServer::start().await; 389 let host = endpoint(&server); 390 Mock::given(method("GET")) 391 .and(path("/xrpc/sh.tangled.repo.listCollaborators")) 392 .and(query_param("subject", "did:plc:scallop")) 393 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 394 "items": [{"subject": "did:plc:olaren", "addedBy": "did:plc:boltless", "createdAt": "2026-06-03T00:00:00Z"}] 395 }))) 396 .mount(&server) 397 .await; 398 399 let listing = client() 400 .list_collaborators(&host, &did("did:plc:scallop")) 401 .await 402 .unwrap(); 403 assert_eq!(listing.completeness, Completeness::Complete); 404 assert_eq!(listing.entries.len(), 1); 405 assert_eq!(listing.entries[0].subject, did("did:plc:olaren")); 406 } 407 408 #[tokio::test] 409 async fn drain_reports_truncation_at_page_cap() { 410 let server = MockServer::start().await; 411 let host = endpoint(&server); 412 Mock::given(method("GET")) 413 .and(path("/xrpc/sh.tangled.knot.listMembers")) 414 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 415 "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}], 416 "cursor": "more" 417 }))) 418 .mount(&server) 419 .await; 420 421 let listing = client().list_members(&host).await.unwrap(); 422 assert_eq!( 423 listing.completeness, 424 Completeness::Truncated, 425 "a never-terminating cursor must surface as a truncated listing" 426 ); 427 assert_eq!(listing.entries.len(), MAX_LIST_PAGES); 428 } 429 430 #[tokio::test] 431 async fn maps_404_to_not_found() { 432 let server = MockServer::start().await; 433 Mock::given(method("GET")) 434 .respond_with(ResponseTemplate::new(404)) 435 .mount(&server) 436 .await; 437 438 let err = client() 439 .capabilities(&endpoint(&server)) 440 .await 441 .expect_err("404 must surface"); 442 assert!(matches!(err, KnotClientError::NotFound)); 443 } 444 445 #[tokio::test] 446 async fn maps_5xx_to_upstream() { 447 let server = MockServer::start().await; 448 Mock::given(method("GET")) 449 .respond_with(ResponseTemplate::new(503)) 450 .mount(&server) 451 .await; 452 453 let err = client() 454 .capabilities(&endpoint(&server)) 455 .await 456 .expect_err("5xx must surface"); 457 match err { 458 KnotClientError::Upstream(s) => assert_eq!(s.as_u16(), 503), 459 other => panic!("wrong variant: {other:?}"), 460 } 461 } 462 463 #[tokio::test] 464 async fn does_not_follow_redirects() { 465 let server = MockServer::start().await; 466 Mock::given(method("GET")) 467 .and(path("/xrpc/sh.tangled.knot.version")) 468 .respond_with(ResponseTemplate::new(301).insert_header( 469 "location", 470 "https://kt.tngl.oyster.cafe/xrpc/sh.tangled.knot.version", 471 )) 472 .mount(&server) 473 .await; 474 475 let err = client() 476 .capabilities(&endpoint(&server)) 477 .await 478 .expect_err("a redirect must surface as an error, not be followed to another knot"); 479 match err { 480 KnotClientError::Upstream(s) => assert_eq!(s.as_u16(), 301), 481 other => panic!("expected Upstream(301), got {other:?}"), 482 } 483 } 484 485 #[test] 486 fn knot_endpoint_rejects_private_host() { 487 let err = 488 knot_endpoint("127.0.0.1:9", true, false).expect_err("private host must be refused"); 489 assert!(matches!(err, KnotClientError::PrivateHost { .. })); 490 } 491 492 #[test] 493 fn knot_endpoint_allows_private_when_permitted() { 494 let knot = knot_endpoint("127.0.0.1:9", true, true).expect("private host allowed"); 495 assert_eq!(knot.url().scheme(), "http"); 496 } 497}