Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

bobbin/knot-ingest: handle knot115 knot-owned data

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 17, 2026, 2:54 PM +0300) commit a0d38138 parent 03855e07 change-id mzrnpppw
+3097 -18
+1
Cargo.toml
··· 34 34 bobbin-slingshot-client = { path = "bobbin/crates/slingshot-client" } 35 35 bobbin-record-lru = { path = "bobbin/crates/record-lru" } 36 36 bobbin-knot-proxy = { path = "bobbin/crates/knot-proxy" } 37 + bobbin-knot-ingest = { path = "bobbin/crates/knot-ingest" } 37 38 bobbin-runtime = { path = "bobbin/crates/runtime" } 38 39 bobbin-search = { path = "bobbin/crates/search" } 39 40 bobbin-sim = { path = "bobbin/crates/bobbin-sim" }
+2
bobbin/crates/bobbin-sim/src/runtime.rs
··· 122 122 disconnects: Some(disconnects.clone()), 123 123 warming_shadow: Some(warming_shadow.clone()), 124 124 warming_buffer: warming_buffer_enabled.then(|| warming_buffer.clone()), 125 + knot_registry: None, 126 + knot_gate: None, 125 127 }; 126 128 let ingest_config = IngestConfig { 127 129 hydrant_base,
+1
bobbin/crates/bobbin/Cargo.toml
··· 12 12 [dependencies] 13 13 bobbin-edge-index = { workspace = true } 14 14 bobbin-ingest = { workspace = true } 15 + bobbin-knot-ingest = { workspace = true } 15 16 bobbin-knot-proxy = { workspace = true } 16 17 bobbin-record-lru = { workspace = true } 17 18 bobbin-runtime = { workspace = true }
+46 -3
bobbin/crates/bobbin/src/main.rs
··· 10 10 use bobbin_ingest::{ 11 11 IngestConfig, IngestRuntime, RepoIdResolver, WarmingBuffer, run as run_ingest, 12 12 }; 13 - use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig}; 13 + use bobbin_knot_ingest::{CapabilityGate, KnotClient, KnotRegistry, Orchestrator}; 14 + use bobbin_knot_proxy::{KnotHttpConfig, KnotProxy, KnotProxyConfig, classify_ip}; 14 15 use bobbin_record_lru::{CacheCapacity, LruRecordStore, RecordStore}; 15 - use bobbin_runtime::{Clock, MemoryBudget, OsEntropy, RuntimeHasher, SystemClock, TungsteniteWs}; 16 + use bobbin_runtime::{ 17 + Clock, GuardedWs, MemoryBudget, NetworkError, OsEntropy, RuntimeHasher, SystemClock, 18 + TungsteniteWs, WsTransport, 19 + }; 16 20 use bobbin_search::{SearchIndex, SearchReader}; 17 21 use bobbin_slingshot_client::SlingshotClient; 18 22 use bobbin_xrpc::{ ··· 186 190 let pull_statuses = Arc::new(StateIndex::new(hasher.clone())); 187 191 let coverage = Arc::new(CoverageWatch::new()); 188 192 let warming_buffer = Arc::new(WarmingBuffer::new(hasher.clone())); 193 + let knot_registry = Arc::new(KnotRegistry::new()); 189 194 let knots = Arc::new(KnotProxy::new( 190 195 KnotProxyConfig { 191 196 allow_private_hosts: cfg.knot.allow_private, ··· 215 220 }; 216 221 let cancel = CancellationToken::new(); 217 222 let ingest_coverage = coverage.clone(); 223 + 224 + let knot_acl_dev = !cfg.knot.require_https; 225 + let knot_allow_private = cfg.knot.allow_private; 226 + let knot_client = KnotClient::with_default_http(knot_allow_private)?; 227 + let knot_gate = Arc::new(CapabilityGate::new( 228 + knot_client.clone(), 229 + clock.clone(), 230 + knot_acl_dev, 231 + knot_allow_private, 232 + )); 233 + let knot_ws: Arc<dyn WsTransport> = if knot_allow_private { 234 + ws.clone() 235 + } else { 236 + GuardedWs::shared(Arc::new(|addrs: &[SocketAddr]| { 237 + match addrs.iter().find_map(|sa| classify_ip(&sa.ip())) { 238 + Some(reason) => Err(NetworkError::Connect(format!( 239 + "knot eventstream resolves to {reason} address space" 240 + ))), 241 + None => Ok(()), 242 + } 243 + })) 244 + }; 245 + 218 246 let ingest_runtime = IngestRuntime { 219 247 store: edges.clone(), 220 248 issue_states: issue_states.clone(), ··· 225 253 resolver: resolver.clone(), 226 254 clock: clock.clone(), 227 255 entropy, 228 - ws, 256 + ws: ws.clone(), 229 257 cancel: cancel.clone(), 230 258 disconnects: None, 231 259 warming_shadow: None, 232 260 warming_buffer: Some(warming_buffer), 261 + knot_registry: Some(knot_registry.clone()), 262 + knot_gate: Some(knot_gate.clone()), 233 263 }; 234 264 let mut ingest_handle = tokio::spawn(run_ingest(ingest_cfg, ingest_runtime)); 265 + 266 + let knot_orchestrator = Orchestrator { 267 + client: Arc::new(knot_client), 268 + gate: knot_gate, 269 + registry: knot_registry, 270 + store: edges.clone(), 271 + ws: knot_ws, 272 + clock: clock.clone(), 273 + dev: knot_acl_dev, 274 + allow_private: knot_allow_private, 275 + cancel: cancel.clone(), 276 + }; 277 + let _knot_acl_handle = tokio::spawn(knot_orchestrator.run()); 235 278 236 279 let _adaptive_watcher = budget.zip(limiter.as_ref()).map(|(b, l)| { 237 280 mem::spawn_adaptive_watcher(
+39 -3
bobbin/crates/edge-index/src/lib.rs
··· 227 227 228 228 #[derive(Debug)] 229 229 pub struct EdgePage { 230 - pub items: Vec<AtUri<DefaultStr>>, 230 + pub items: Vec<EdgeItem>, 231 231 pub next: Option<PageToken>, 232 + } 233 + 234 + #[derive(Clone, Debug, Eq, PartialEq)] 235 + pub struct EdgeItem { 236 + pub uri: AtUri<DefaultStr>, 237 + pub sort_micros: u64, 238 + } 239 + 240 + impl AsRef<str> for EdgeItem { 241 + fn as_ref(&self) -> &str { 242 + self.uri.as_ref() 243 + } 232 244 } 233 245 234 246 #[derive(Clone, Copy, Debug, Default)] ··· 639 651 .unwrap_or(0) 640 652 } 641 653 654 + pub fn sources_for(&self, key: &EdgeKey) -> Vec<AtUri<DefaultStr>> { 655 + self.lookup_key(key) 656 + .and_then(|id| { 657 + self.forward.read_sync(&id, |_, sources| { 658 + sources 659 + .directed(PageCursor::Start, SortDir::Desc) 660 + .filter_map(|bucket| { 661 + let spur = bucket.source.to_spur()?; 662 + let stored = self.source_interner.try_resolve(&spur)?; 663 + AtUri::new_owned(self.decode_source(stored)?).ok() 664 + }) 665 + .collect::<Vec<_>>() 666 + }) 667 + }) 668 + .unwrap_or_default() 669 + } 670 + 642 671 pub fn list( 643 672 &self, 644 673 key: &EdgeKey, ··· 659 688 .filter_map(|&key| { 660 689 let spur = key.source.to_spur()?; 661 690 let stored = self.source_interner.try_resolve(&spur)?; 662 - AtUri::new_owned(self.decode_source(stored)?).ok() 691 + let uri = AtUri::new_owned(self.decode_source(stored)?).ok()?; 692 + Some(EdgeItem { 693 + uri, 694 + sort_micros: key.micros.0, 695 + }) 663 696 }) 664 697 .collect(); 665 698 let next = has_more ··· 732 765 .matched 733 766 .into_iter() 734 767 .take(visible_len) 735 - .map(|(_, uri)| uri) 768 + .map(|(key, uri)| EdgeItem { 769 + uri, 770 + sort_micros: key.micros.0, 771 + }) 736 772 .collect(); 737 773 EdgePage { items, next } 738 774 })
+1
bobbin/crates/ingest/Cargo.toml
··· 8 8 [dependencies] 9 9 bobbin-types = { workspace = true } 10 10 bobbin-edge-index = { workspace = true } 11 + bobbin-knot-ingest = { workspace = true } 11 12 bobbin-record-lru = { workspace = true } 12 13 bobbin-resolver = { workspace = true } 13 14 bobbin-runtime = { workspace = true }
+2
bobbin/crates/ingest/examples/smoke.rs
··· 46 46 disconnects: None, 47 47 warming_shadow: None, 48 48 warming_buffer: None, 49 + knot_registry: None, 50 + knot_gate: None, 49 51 }; 50 52 let task = tokio::spawn(async move { 51 53 let _ = run(cfg, runtime).await;
+171
bobbin/crates/ingest/src/lib.rs
··· 7 7 ApplyOutcome, Coverage, CoverageWatch, EdgeStore, HydrantCursor, IssueStateKind, 8 8 PromotionSignal, PullStatusKind, StateIndex, apply_record_state, 9 9 }; 10 + use bobbin_knot_ingest::{CapabilityGate, KnotRegistry}; 10 11 use bobbin_record_lru::RecordStore; 11 12 use bobbin_resolver::{NormalizeRepoRefs, decode_canon_or_upgrade_bytes, synthesize_created_at}; 12 13 use bobbin_runtime::{ ··· 15 16 }; 16 17 use bobbin_types::edges::{Edge, ExtractError, Record}; 17 18 use bobbin_types::ids::{RepoIdent, SubjectRef}; 19 + use bobbin_types::knot_acl::KnotHostKey; 18 20 use bobbin_types::record::RecordBody; 19 21 use bobbin_types::search::{SearchSink, SearchableRecord}; 20 22 use bytes::Bytes; ··· 215 217 pub disconnects: Option<Arc<DisconnectSink>>, 216 218 pub warming_shadow: Option<Arc<WarmingShadowBuffer>>, 217 219 pub warming_buffer: Option<Arc<WarmingBuffer>>, 220 + pub knot_registry: Option<Arc<KnotRegistry>>, 221 + pub knot_gate: Option<Arc<CapabilityGate>>, 218 222 } 219 223 220 224 impl<S: SearchSink + 'static> Clone for IngestRuntime<S> { ··· 234 238 disconnects: self.disconnects.clone(), 235 239 warming_shadow: self.warming_shadow.clone(), 236 240 warming_buffer: self.warming_buffer.clone(), 241 + knot_registry: self.knot_registry.clone(), 242 + knot_gate: self.knot_gate.clone(), 237 243 } 238 244 } 239 245 } ··· 250 256 search: &self.search, 251 257 shadow: self.warming_shadow.as_deref(), 252 258 buffer: self.warming_buffer.as_deref(), 259 + knot_registry: self.knot_registry.as_deref(), 260 + knot_gate: self.knot_gate.as_deref(), 253 261 } 254 262 } 255 263 } ··· 264 272 search: &'a S, 265 273 shadow: Option<&'a WarmingShadowBuffer>, 266 274 buffer: Option<&'a WarmingBuffer>, 275 + knot_registry: Option<&'a KnotRegistry>, 276 + knot_gate: Option<&'a CapabilityGate>, 267 277 } 268 278 269 279 pub async fn run<S: SearchSink + 'static>( ··· 1062 1072 finalize_drained(ctx, drained).await; 1063 1073 } 1064 1074 } 1075 + if let Some(registry) = ctx.knot_registry { 1076 + let host = KnotHostKey::new(repo.knot.as_ref()); 1077 + match repo.repo_did.clone() { 1078 + Some(repo_did) => registry.observe_repo(&host, repo_did), 1079 + None => registry.observe_host(&host), 1080 + } 1081 + } 1082 + } 1083 + match acl_disposition(&parsed, ctx.knot_gate, ctx.knot_registry) { 1084 + AclDisposition::NativeSkip => { 1085 + if let Some(registry) = ctx.knot_registry { 1086 + registry.forget_legacy_member(&source); 1087 + } 1088 + return PendingOp::Delete { source, nsid }; 1089 + } 1090 + AclDisposition::LegacyMember { host } => { 1091 + if let Some(registry) = ctx.knot_registry { 1092 + registry.observe_host(&host); 1093 + registry.note_legacy_member(source.clone(), &host); 1094 + } 1095 + } 1096 + AclDisposition::Other => {} 1065 1097 } 1066 1098 let edges = match parsed.extract_edges(&source) { 1067 1099 Ok(es) => es, ··· 1085 1117 if nsid.as_ref() == "sh.tangled.repo" { 1086 1118 ctx.resolver.forget(&record.did, &record.rkey).await; 1087 1119 } 1120 + if nsid.as_ref() == "sh.tangled.knot.member" 1121 + && let Some(registry) = ctx.knot_registry 1122 + { 1123 + registry.forget_legacy_member(&source); 1124 + } 1088 1125 PendingOp::Delete { source, nsid } 1089 1126 } 1090 1127 RecordAction::Other => { ··· 1094 1131 } 1095 1132 } 1096 1133 1134 + enum AclDisposition { 1135 + Other, 1136 + NativeSkip, 1137 + LegacyMember { host: KnotHostKey }, 1138 + } 1139 + 1140 + fn acl_disposition( 1141 + parsed: &Record, 1142 + gate: Option<&CapabilityGate>, 1143 + registry: Option<&KnotRegistry>, 1144 + ) -> AclDisposition { 1145 + let Some(gate) = gate else { 1146 + return AclDisposition::Other; 1147 + }; 1148 + match parsed { 1149 + Record::KnotMember(member) => { 1150 + let host = KnotHostKey::new(member.domain.as_ref()); 1151 + if gate.is_native(&host) { 1152 + AclDisposition::NativeSkip 1153 + } else { 1154 + AclDisposition::LegacyMember { host } 1155 + } 1156 + } 1157 + Record::Collaborator(collaborator) => { 1158 + let native = registry 1159 + .and_then(|registry| registry.host_of_repo(&collaborator.repo)) 1160 + .is_some_and(|host| gate.is_native(&host)); 1161 + if native { 1162 + AclDisposition::NativeSkip 1163 + } else { 1164 + AclDisposition::Other 1165 + } 1166 + } 1167 + _ => AclDisposition::Other, 1168 + } 1169 + } 1170 + 1097 1171 fn fallback_rfc3339( 1098 1172 rkey: &Rkey<DefaultStr>, 1099 1173 rev: &jacquard_common::types::tid::Tid, ··· 1375 1449 search, 1376 1450 shadow: None, 1377 1451 buffer: None, 1452 + knot_registry: None, 1453 + knot_gate: None, 1378 1454 }; 1379 1455 let pending = prepare_frame(frame, &ctx, now).await; 1380 1456 let pending = resolve_pending(pending, &ctx).await; ··· 1614 1690 } 1615 1691 1616 1692 #[tokio::test] 1693 + async fn native_knot_member_skipped_legacy_indexed() { 1694 + use bobbin_knot_ingest::{CapabilityGate, KnotClient, KnotRegistry}; 1695 + use wiremock::matchers::{method, path}; 1696 + use wiremock::{Mock, MockServer, ResponseTemplate}; 1697 + 1698 + let server = MockServer::start().await; 1699 + Mock::given(method("GET")) 1700 + .and(path("/xrpc/sh.tangled.knot.version")) 1701 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 1702 + "version": "1.1.0", 1703 + "capabilities": ["knot-acl"] 1704 + }))) 1705 + .mount(&server) 1706 + .await; 1707 + let url = url::Url::parse(&server.uri()).unwrap(); 1708 + let native_host = format!("{}:{}", url.host_str().unwrap(), url.port().unwrap()); 1709 + 1710 + let gate = CapabilityGate::new( 1711 + KnotClient::with_default_http(true).unwrap(), 1712 + Arc::new(SystemClock::new()), 1713 + true, 1714 + true, 1715 + ); 1716 + assert!(gate.has_knot_acl(&KnotHostKey::new(&native_host)).await); 1717 + 1718 + let registry = KnotRegistry::new(); 1719 + let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1720 + let ctx = PipelineCtx { 1721 + resolver: &resolver, 1722 + store: &store, 1723 + issue_states: &issue_states, 1724 + pull_statuses: &pull_statuses, 1725 + coverage: &cov, 1726 + records: &NoopRecordStore, 1727 + search: &NoopSearchSink, 1728 + shadow: None, 1729 + buffer: None, 1730 + knot_registry: Some(&registry), 1731 + knot_gate: Some(&gate), 1732 + }; 1733 + 1734 + let member_frame = |id: u64, rkey: &str, domain: &str| { 1735 + parse_frame(json!({ 1736 + "id": id, 1737 + "type": "record", 1738 + "record": { 1739 + "live": false, 1740 + "did": "did:plc:akshay", 1741 + "rev": fresh_tid().as_str(), 1742 + "collection": "sh.tangled.knot.member", 1743 + "rkey": rkey, 1744 + "action": "create", 1745 + "record": { 1746 + "$type": "sh.tangled.knot.member", 1747 + "subject": "did:plc:boltless", 1748 + "domain": domain, 1749 + "createdAt": "2026-06-01T00:00:00Z" 1750 + } 1751 + } 1752 + })) 1753 + }; 1754 + 1755 + let native = 1756 + prepare_frame(member_frame(1, "aaaaaaaaaaaaz", &native_host), &ctx, now()).await; 1757 + assert!( 1758 + matches!(native.op, PendingOp::Delete { .. }), 1759 + "member record for a native knot must be dropped" 1760 + ); 1761 + 1762 + let legacy = 1763 + prepare_frame(member_frame(2, "bbbbbbbbbbbbz", "legacy.knot"), &ctx, now()).await; 1764 + assert!( 1765 + matches!(legacy.op, PendingOp::Upsert { .. }), 1766 + "member record for a legacy knot must be ingested" 1767 + ); 1768 + assert!( 1769 + registry.hosts().contains(&KnotHostKey::new("legacy.knot")), 1770 + "a member record seeds host discovery even before any repo is seen" 1771 + ); 1772 + assert_eq!( 1773 + registry 1774 + .drain_legacy_members(&KnotHostKey::new("legacy.knot")) 1775 + .len(), 1776 + 1, 1777 + "legacy member edge is indexed for later purge once the knot upgrades" 1778 + ); 1779 + } 1780 + 1781 + #[tokio::test] 1617 1782 async fn create_then_delete_round_trips_a_star() { 1618 1783 let (store, issue_states, pull_statuses, cov, resolver) = fresh(); 1619 1784 let create: HydrantFrame = parse_frame(json!({ ··· 1752 1917 search: &search, 1753 1918 shadow: None, 1754 1919 buffer: None, 1920 + knot_registry: None, 1921 + knot_gate: None, 1755 1922 }; 1756 1923 let mk = |live: bool| -> HydrantFrame { 1757 1924 parse_frame(json!({ ··· 2418 2585 disconnects: None, 2419 2586 warming_shadow: None, 2420 2587 warming_buffer: None, 2588 + knot_registry: None, 2589 + knot_gate: None, 2421 2590 } 2422 2591 } 2423 2592 ··· 3046 3215 disconnects: None, 3047 3216 warming_shadow: None, 3048 3217 warming_buffer: None, 3218 + knot_registry: None, 3219 + knot_gate: None, 3049 3220 }; 3050 3221 3051 3222 let parallelism = 4usize;
+30
bobbin/crates/knot-ingest/Cargo.toml
··· 1 + [package] 2 + name = "bobbin-knot-ingest" 3 + version.workspace = true 4 + edition.workspace = true 5 + license.workspace = true 6 + rust-version.workspace = true 7 + 8 + [dependencies] 9 + bobbin-edge-index = { workspace = true } 10 + bobbin-knot-proxy = { workspace = true } 11 + bobbin-runtime = { workspace = true } 12 + bobbin-types = { workspace = true } 13 + jacquard-common = { workspace = true } 14 + 15 + bytes = { workspace = true } 16 + chrono = { workspace = true } 17 + futures = { workspace = true } 18 + http = { workspace = true } 19 + reqwest = { workspace = true } 20 + serde = { workspace = true } 21 + serde_json = { workspace = true } 22 + thiserror = { workspace = true } 23 + tokio = { workspace = true } 24 + tokio-util = { workspace = true } 25 + tracing = { workspace = true } 26 + url = { workspace = true } 27 + 28 + [dev-dependencies] 29 + tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } 30 + wiremock = { workspace = true }
+497
bobbin/crates/knot-ingest/src/client.rs
··· 1 + use std::future::Future; 2 + use std::pin::Pin; 3 + use std::sync::Arc; 4 + use std::time::Duration; 5 + 6 + use bobbin_knot_proxy::{KnotHost, KnotHostError, PrivateAddressFilter, PrivateHostReason}; 7 + use bobbin_runtime::{HttpRequest, HttpResponseHead, HttpTransport, NetworkError, ReqwestHttp}; 8 + use bytes::{Bytes, BytesMut}; 9 + use chrono::{DateTime, Utc}; 10 + use futures::TryStreamExt; 11 + use http::{HeaderMap, StatusCode}; 12 + use jacquard_common::DefaultStr; 13 + use jacquard_common::types::did::Did; 14 + use jacquard_common::types::nsid::Nsid; 15 + use serde::Deserialize; 16 + use thiserror::Error; 17 + use url::Url; 18 + 19 + const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION")); 20 + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); 21 + const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); 22 + const MAX_BODY_BYTES: u64 = 4 * 1024 * 1024; 23 + const LIST_PAGE_LIMIT: i64 = 1000; 24 + const MAX_LIST_PAGES: usize = 256; 25 + 26 + const VERSION_NSID: &str = "sh.tangled.knot.version"; 27 + const LIST_MEMBERS_NSID: &str = "sh.tangled.knot.listMembers"; 28 + const LIST_COLLABORATORS_NSID: &str = "sh.tangled.repo.listCollaborators"; 29 + 30 + #[derive(Clone)] 31 + pub struct KnotClient { 32 + http: Arc<dyn HttpTransport>, 33 + } 34 + 35 + #[derive(Clone, Debug, Eq, PartialEq, Deserialize)] 36 + #[serde(rename_all = "camelCase")] 37 + pub struct AclEntry { 38 + pub subject: Did<DefaultStr>, 39 + pub created_at: DateTime<Utc>, 40 + } 41 + 42 + #[derive(Clone, Copy, Debug, Eq, PartialEq)] 43 + pub enum Completeness { 44 + Complete, 45 + Truncated, 46 + } 47 + 48 + #[derive(Clone, Debug, Eq, PartialEq)] 49 + pub struct AclListing { 50 + pub entries: Vec<AclEntry>, 51 + pub completeness: Completeness, 52 + } 53 + 54 + #[derive(Debug, Error)] 55 + pub enum KnotClientError { 56 + #[error("knot host: {0}")] 57 + Host(#[from] KnotHostError), 58 + #[error("blocked: knot {host} resolves to {reason} address space")] 59 + PrivateHost { 60 + host: String, 61 + reason: PrivateHostReason, 62 + }, 63 + #[error("http client build: {0}")] 64 + Build(String), 65 + #[error("network: {0}")] 66 + Network(#[from] NetworkError), 67 + #[error("xrpc not found")] 68 + NotFound, 69 + #[error("upstream returned status {0}")] 70 + Upstream(StatusCode), 71 + #[error("response body exceeded {limit} bytes")] 72 + BodyTooLarge { limit: u64 }, 73 + #[error("decode response: {0}")] 74 + Decode(#[from] serde_json::Error), 75 + } 76 + 77 + pub fn knot_endpoint( 78 + host: &str, 79 + dev: bool, 80 + allow_private: bool, 81 + ) -> Result<KnotHost, KnotClientError> { 82 + let raw = if dev { 83 + format!("http://{host}") 84 + } else { 85 + host.to_owned() 86 + }; 87 + let knot = KnotHost::parse(&raw)?; 88 + if !allow_private && let Some(reason) = knot.private_literal_reason() { 89 + return Err(KnotClientError::PrivateHost { 90 + host: host.to_owned(), 91 + reason, 92 + }); 93 + } 94 + Ok(knot) 95 + } 96 + 97 + fn default_http_client(allow_private: bool) -> Result<reqwest::Client, reqwest::Error> { 98 + reqwest::Client::builder() 99 + .user_agent(USER_AGENT) 100 + .timeout(REQUEST_TIMEOUT) 101 + .connect_timeout(CONNECT_TIMEOUT) 102 + .redirect(reqwest::redirect::Policy::none()) 103 + .dns_resolver(Arc::new(PrivateAddressFilter::new(allow_private))) 104 + .build() 105 + } 106 + 107 + fn nsid(s: &'static str) -> Nsid<DefaultStr> { 108 + Nsid::new_static(s).expect("static nsid literal must validate") 109 + } 110 + 111 + pub(crate) fn authority(host: &KnotHost) -> String { 112 + let url = host.url(); 113 + match (url.host_str(), url.port()) { 114 + (Some(h), Some(p)) => format!("{h}:{p}"), 115 + (Some(h), None) => h.to_owned(), 116 + (None, _) => String::new(), 117 + } 118 + } 119 + 120 + impl KnotClient { 121 + pub fn new(http: Arc<dyn HttpTransport>) -> Self { 122 + Self { http } 123 + } 124 + 125 + pub fn with_default_http(allow_private: bool) -> Result<Self, KnotClientError> { 126 + let client = default_http_client(allow_private) 127 + .map_err(|e| KnotClientError::Build(e.to_string()))?; 128 + Ok(Self::new(ReqwestHttp::shared(client))) 129 + } 130 + 131 + pub async fn capabilities(&self, host: &KnotHost) -> Result<Vec<String>, KnotClientError> { 132 + let mut url = host.xrpc_url(&nsid(VERSION_NSID)); 133 + url.set_query(None); 134 + let bytes = self.get_json(url).await?; 135 + let resp: VersionWire = serde_json::from_slice(&bytes)?; 136 + Ok(resp.capabilities.unwrap_or_default()) 137 + } 138 + 139 + pub async fn list_members(&self, host: &KnotHost) -> Result<AclListing, KnotClientError> { 140 + let subject = authority(host); 141 + self.drain(host, LIST_MEMBERS_NSID, subject, None, 0, Vec::new()) 142 + .await 143 + } 144 + 145 + pub async fn list_collaborators( 146 + &self, 147 + host: &KnotHost, 148 + repo: &Did<DefaultStr>, 149 + ) -> Result<AclListing, KnotClientError> { 150 + self.drain( 151 + host, 152 + LIST_COLLABORATORS_NSID, 153 + repo.as_ref().to_owned(), 154 + None, 155 + 0, 156 + Vec::new(), 157 + ) 158 + .await 159 + } 160 + 161 + fn drain<'a>( 162 + &'a self, 163 + host: &'a KnotHost, 164 + endpoint: &'static str, 165 + subject: String, 166 + cursor: Option<String>, 167 + page: usize, 168 + mut acc: Vec<AclEntry>, 169 + ) -> Pin<Box<dyn Future<Output = Result<AclListing, KnotClientError>> + Send + 'a>> { 170 + Box::pin(async move { 171 + if page >= MAX_LIST_PAGES { 172 + tracing::warn!( 173 + host = %authority(host), 174 + endpoint, 175 + pages = page, 176 + "knot list truncated at page cap" 177 + ); 178 + return Ok(AclListing { 179 + entries: acc, 180 + completeness: Completeness::Truncated, 181 + }); 182 + } 183 + let resp = self 184 + .fetch_page(host, endpoint, &subject, cursor.as_deref()) 185 + .await?; 186 + acc.extend(resp.items); 187 + match resp.cursor.filter(|c| !c.is_empty()) { 188 + Some(next) => { 189 + self.drain(host, endpoint, subject, Some(next), page + 1, acc) 190 + .await 191 + } 192 + None => Ok(AclListing { 193 + entries: acc, 194 + completeness: Completeness::Complete, 195 + }), 196 + } 197 + }) 198 + } 199 + 200 + async fn fetch_page( 201 + &self, 202 + host: &KnotHost, 203 + endpoint: &'static str, 204 + subject: &str, 205 + cursor: Option<&str>, 206 + ) -> Result<ListWire, KnotClientError> { 207 + let mut url = host.xrpc_url(&nsid(endpoint)); 208 + { 209 + let mut q = url.query_pairs_mut(); 210 + q.clear(); 211 + q.append_pair("subject", subject); 212 + q.append_pair("limit", &LIST_PAGE_LIMIT.to_string()); 213 + if let Some(c) = cursor { 214 + q.append_pair("cursor", c); 215 + } 216 + } 217 + let bytes = self.get_json(url).await?; 218 + Ok(serde_json::from_slice(&bytes)?) 219 + } 220 + 221 + async fn get_json(&self, url: Url) -> Result<Bytes, KnotClientError> { 222 + let resp = self 223 + .http 224 + .execute(HttpRequest { 225 + url, 226 + headers: HeaderMap::new(), 227 + }) 228 + .await?; 229 + match resp.status { 230 + StatusCode::OK => read_bounded(resp).await, 231 + StatusCode::NOT_FOUND => Err(KnotClientError::NotFound), 232 + other => Err(KnotClientError::Upstream(other)), 233 + } 234 + } 235 + } 236 + 237 + #[derive(Deserialize)] 238 + struct VersionWire { 239 + #[serde(default)] 240 + capabilities: Option<Vec<String>>, 241 + } 242 + 243 + #[derive(Deserialize)] 244 + struct ListWire { 245 + #[serde(default)] 246 + items: Vec<AclEntry>, 247 + #[serde(default)] 248 + cursor: Option<String>, 249 + } 250 + 251 + async fn read_bounded(resp: HttpResponseHead) -> Result<Bytes, KnotClientError> { 252 + if resp.content_length.is_some_and(|len| len > MAX_BODY_BYTES) { 253 + return Err(KnotClientError::BodyTooLarge { 254 + limit: MAX_BODY_BYTES, 255 + }); 256 + } 257 + let buf = resp 258 + .body 259 + .map_err(KnotClientError::Network) 260 + .try_fold(BytesMut::new(), |mut acc, chunk| async move { 261 + if (acc.len() as u64).saturating_add(chunk.len() as u64) > MAX_BODY_BYTES { 262 + return Err(KnotClientError::BodyTooLarge { 263 + limit: MAX_BODY_BYTES, 264 + }); 265 + } 266 + acc.extend_from_slice(&chunk); 267 + Ok(acc) 268 + }) 269 + .await?; 270 + Ok(buf.freeze()) 271 + } 272 + 273 + #[cfg(test)] 274 + mod tests { 275 + use super::*; 276 + use serde_json::json; 277 + use wiremock::matchers::{method, path, query_param}; 278 + use wiremock::{Mock, MockServer, ResponseTemplate}; 279 + 280 + fn did(s: &str) -> Did<DefaultStr> { 281 + Did::new_owned(s).unwrap() 282 + } 283 + 284 + fn client() -> KnotClient { 285 + KnotClient::new(ReqwestHttp::shared(default_http_client(true).unwrap())) 286 + } 287 + 288 + fn endpoint(server: &MockServer) -> KnotHost { 289 + KnotHost::parse(&server.uri()).unwrap() 290 + } 291 + 292 + #[tokio::test] 293 + async fn capabilities_returns_declared_tokens() { 294 + let server = MockServer::start().await; 295 + Mock::given(method("GET")) 296 + .and(path("/xrpc/sh.tangled.knot.version")) 297 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 298 + "version": "1.0.0 (deadbeef)", 299 + "capabilities": ["knot-acl"] 300 + }))) 301 + .mount(&server) 302 + .await; 303 + 304 + let caps = client().capabilities(&endpoint(&server)).await.unwrap(); 305 + assert_eq!(caps, vec!["knot-acl".to_owned()]); 306 + } 307 + 308 + #[tokio::test] 309 + async fn capabilities_empty_when_field_absent() { 310 + let server = MockServer::start().await; 311 + Mock::given(method("GET")) 312 + .and(path("/xrpc/sh.tangled.knot.version")) 313 + .respond_with( 314 + ResponseTemplate::new(200).set_body_json(json!({ "version": "1.0.0 (cafe)" })), 315 + ) 316 + .mount(&server) 317 + .await; 318 + 319 + let caps = client().capabilities(&endpoint(&server)).await.unwrap(); 320 + assert!(caps.is_empty()); 321 + } 322 + 323 + #[tokio::test] 324 + async fn list_members_drains_single_page() { 325 + let server = MockServer::start().await; 326 + let host = endpoint(&server); 327 + Mock::given(method("GET")) 328 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 329 + .and(query_param("subject", authority(&host).as_str())) 330 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 331 + "items": [ 332 + {"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}, 333 + {"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T12:00:00Z"} 334 + ] 335 + }))) 336 + .mount(&server) 337 + .await; 338 + 339 + let listing = client().list_members(&host).await.unwrap(); 340 + assert_eq!(listing.completeness, Completeness::Complete); 341 + assert_eq!( 342 + listing.entries, 343 + vec![ 344 + AclEntry { 345 + subject: did("did:plc:boltless"), 346 + created_at: "2026-06-01T00:00:00Z".parse().unwrap(), 347 + }, 348 + AclEntry { 349 + subject: did("did:plc:akshay"), 350 + created_at: "2026-06-02T12:00:00Z".parse().unwrap(), 351 + }, 352 + ] 353 + ); 354 + } 355 + 356 + #[tokio::test] 357 + async fn list_members_drains_multiple_pages() { 358 + let server = MockServer::start().await; 359 + let host = endpoint(&server); 360 + Mock::given(method("GET")) 361 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 362 + .and(query_param("cursor", "p2")) 363 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 364 + "items": [{"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T00:00:00Z"}] 365 + }))) 366 + .mount(&server) 367 + .await; 368 + Mock::given(method("GET")) 369 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 370 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 371 + "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}], 372 + "cursor": "p2" 373 + }))) 374 + .mount(&server) 375 + .await; 376 + 377 + let listing = client().list_members(&host).await.unwrap(); 378 + assert_eq!(listing.completeness, Completeness::Complete); 379 + let subjects: Vec<_> = listing.entries.into_iter().map(|m| m.subject).collect(); 380 + assert_eq!( 381 + subjects, 382 + vec![did("did:plc:boltless"), did("did:plc:akshay")] 383 + ); 384 + } 385 + 386 + #[tokio::test] 387 + async fn list_collaborators_uses_repo_did_subject() { 388 + let server = MockServer::start().await; 389 + let host = endpoint(&server); 390 + Mock::given(method("GET")) 391 + .and(path("/xrpc/sh.tangled.repo.listCollaborators")) 392 + .and(query_param("subject", "did:plc:scallop")) 393 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 394 + "items": [{"subject": "did:plc:olaren", "addedBy": "did:plc:boltless", "createdAt": "2026-06-03T00:00:00Z"}] 395 + }))) 396 + .mount(&server) 397 + .await; 398 + 399 + let listing = client() 400 + .list_collaborators(&host, &did("did:plc:scallop")) 401 + .await 402 + .unwrap(); 403 + assert_eq!(listing.completeness, Completeness::Complete); 404 + assert_eq!(listing.entries.len(), 1); 405 + assert_eq!(listing.entries[0].subject, did("did:plc:olaren")); 406 + } 407 + 408 + #[tokio::test] 409 + async fn drain_reports_truncation_at_page_cap() { 410 + let server = MockServer::start().await; 411 + let host = endpoint(&server); 412 + Mock::given(method("GET")) 413 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 414 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 415 + "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}], 416 + "cursor": "more" 417 + }))) 418 + .mount(&server) 419 + .await; 420 + 421 + let listing = client().list_members(&host).await.unwrap(); 422 + assert_eq!( 423 + listing.completeness, 424 + Completeness::Truncated, 425 + "a never-terminating cursor must surface as a truncated listing" 426 + ); 427 + assert_eq!(listing.entries.len(), MAX_LIST_PAGES); 428 + } 429 + 430 + #[tokio::test] 431 + async fn maps_404_to_not_found() { 432 + let server = MockServer::start().await; 433 + Mock::given(method("GET")) 434 + .respond_with(ResponseTemplate::new(404)) 435 + .mount(&server) 436 + .await; 437 + 438 + let err = client() 439 + .capabilities(&endpoint(&server)) 440 + .await 441 + .expect_err("404 must surface"); 442 + assert!(matches!(err, KnotClientError::NotFound)); 443 + } 444 + 445 + #[tokio::test] 446 + async fn maps_5xx_to_upstream() { 447 + let server = MockServer::start().await; 448 + Mock::given(method("GET")) 449 + .respond_with(ResponseTemplate::new(503)) 450 + .mount(&server) 451 + .await; 452 + 453 + let err = client() 454 + .capabilities(&endpoint(&server)) 455 + .await 456 + .expect_err("5xx must surface"); 457 + match err { 458 + KnotClientError::Upstream(s) => assert_eq!(s.as_u16(), 503), 459 + other => panic!("wrong variant: {other:?}"), 460 + } 461 + } 462 + 463 + #[tokio::test] 464 + async fn does_not_follow_redirects() { 465 + let server = MockServer::start().await; 466 + Mock::given(method("GET")) 467 + .and(path("/xrpc/sh.tangled.knot.version")) 468 + .respond_with(ResponseTemplate::new(301).insert_header( 469 + "location", 470 + "https://kt.tngl.oyster.cafe/xrpc/sh.tangled.knot.version", 471 + )) 472 + .mount(&server) 473 + .await; 474 + 475 + let err = client() 476 + .capabilities(&endpoint(&server)) 477 + .await 478 + .expect_err("a redirect must surface as an error, not be followed to another knot"); 479 + match err { 480 + KnotClientError::Upstream(s) => assert_eq!(s.as_u16(), 301), 481 + other => panic!("expected Upstream(301), got {other:?}"), 482 + } 483 + } 484 + 485 + #[test] 486 + fn knot_endpoint_rejects_private_host() { 487 + let err = 488 + knot_endpoint("127.0.0.1:9", true, false).expect_err("private host must be refused"); 489 + assert!(matches!(err, KnotClientError::PrivateHost { .. })); 490 + } 491 + 492 + #[test] 493 + fn knot_endpoint_allows_private_when_permitted() { 494 + let knot = knot_endpoint("127.0.0.1:9", true, true).expect("private host allowed"); 495 + assert_eq!(knot.url().scheme(), "http"); 496 + } 497 + }
+255
bobbin/crates/knot-ingest/src/gate.rs
··· 1 + use std::collections::{HashMap, HashSet}; 2 + use std::sync::Arc; 3 + use std::sync::Mutex; 4 + use std::time::Duration; 5 + 6 + use bobbin_runtime::Clock; 7 + use bobbin_types::knot_acl::KnotHostKey; 8 + use tokio::time::Instant; 9 + 10 + use crate::client::{KnotClient, knot_endpoint}; 11 + 12 + const KNOT_ACL_CAPABILITY: &str = "knot-acl"; 13 + const LEGACY_REPROBE_INTERVAL: Duration = Duration::from_secs(300); 14 + const ERROR_REPROBE_INTERVAL: Duration = Duration::from_secs(60); 15 + 16 + struct ProbeRecord { 17 + at: Instant, 18 + retry_after: Duration, 19 + } 20 + 21 + pub struct CapabilityGate { 22 + client: KnotClient, 23 + clock: Arc<dyn Clock>, 24 + dev: bool, 25 + allow_private: bool, 26 + native: Mutex<HashSet<KnotHostKey>>, 27 + last_probe: Mutex<HashMap<KnotHostKey, ProbeRecord>>, 28 + } 29 + 30 + impl CapabilityGate { 31 + pub fn new(client: KnotClient, clock: Arc<dyn Clock>, dev: bool, allow_private: bool) -> Self { 32 + Self { 33 + client, 34 + clock, 35 + dev, 36 + allow_private, 37 + native: Mutex::new(HashSet::new()), 38 + last_probe: Mutex::new(HashMap::new()), 39 + } 40 + } 41 + 42 + pub fn is_native(&self, host: &KnotHostKey) -> bool { 43 + self.native.lock().unwrap().contains(host) 44 + } 45 + 46 + pub async fn has_knot_acl(&self, host: &KnotHostKey) -> bool { 47 + if self.is_native(host) { 48 + return true; 49 + } 50 + let now = self.clock.now_instant(); 51 + if self.throttled(host, now) { 52 + return false; 53 + } 54 + match self.probe(host).await { 55 + Ok(true) => { 56 + self.native.lock().unwrap().insert(host.clone()); 57 + true 58 + } 59 + Ok(false) => { 60 + self.mark(host, now, LEGACY_REPROBE_INTERVAL); 61 + false 62 + } 63 + Err(err) => { 64 + tracing::warn!(host = %host, error = %err, "knot capability probe failed"); 65 + self.mark(host, now, ERROR_REPROBE_INTERVAL); 66 + false 67 + } 68 + } 69 + } 70 + 71 + async fn probe(&self, host: &KnotHostKey) -> Result<bool, crate::client::KnotClientError> { 72 + let endpoint = knot_endpoint(host.as_str(), self.dev, self.allow_private)?; 73 + let caps = self.client.capabilities(&endpoint).await?; 74 + Ok(caps.iter().any(|cap| cap == KNOT_ACL_CAPABILITY)) 75 + } 76 + 77 + fn throttled(&self, host: &KnotHostKey, now: Instant) -> bool { 78 + self.last_probe 79 + .lock() 80 + .unwrap() 81 + .get(host) 82 + .is_some_and(|rec| now.saturating_duration_since(rec.at) < rec.retry_after) 83 + } 84 + 85 + fn mark(&self, host: &KnotHostKey, now: Instant, retry_after: Duration) { 86 + self.last_probe.lock().unwrap().insert( 87 + host.clone(), 88 + ProbeRecord { 89 + at: now, 90 + retry_after, 91 + }, 92 + ); 93 + } 94 + } 95 + 96 + #[cfg(test)] 97 + mod tests { 98 + use super::*; 99 + use std::sync::atomic::{AtomicU64, Ordering}; 100 + 101 + use bobbin_runtime::{ReqwestHttp, SleepFuture, UnixMicros}; 102 + use serde_json::json; 103 + use wiremock::matchers::{method, path}; 104 + use wiremock::{Mock, MockServer, ResponseTemplate}; 105 + 106 + struct ManualClock { 107 + base: Instant, 108 + offset_micros: AtomicU64, 109 + } 110 + 111 + impl ManualClock { 112 + fn new() -> Self { 113 + Self { 114 + base: Instant::now(), 115 + offset_micros: AtomicU64::new(0), 116 + } 117 + } 118 + 119 + fn advance(&self, by: Duration) { 120 + self.offset_micros 121 + .fetch_add(by.as_micros() as u64, Ordering::SeqCst); 122 + } 123 + } 124 + 125 + impl Clock for ManualClock { 126 + fn now_unix_micros(&self) -> UnixMicros { 127 + UnixMicros::new(self.offset_micros.load(Ordering::SeqCst)) 128 + } 129 + fn now_instant(&self) -> Instant { 130 + self.base + Duration::from_micros(self.offset_micros.load(Ordering::SeqCst)) 131 + } 132 + fn sleep(&self, _: Duration) -> SleepFuture { 133 + Box::pin(async {}) 134 + } 135 + fn sleep_until(&self, _: Instant) -> SleepFuture { 136 + Box::pin(async {}) 137 + } 138 + } 139 + 140 + fn gate(server: &MockServer, clock: Arc<dyn Clock>) -> (CapabilityGate, KnotHostKey) { 141 + let client = KnotClient::new(ReqwestHttp::shared(reqwest::Client::new())); 142 + let url = url::Url::parse(&server.uri()).unwrap(); 143 + let host = format!("{}:{}", url.host_str().unwrap(), url.port().unwrap()); 144 + ( 145 + CapabilityGate::new(client, clock, true, true), 146 + KnotHostKey::new(&host), 147 + ) 148 + } 149 + 150 + async fn mount_version(server: &MockServer, caps: serde_json::Value, expect: u64) { 151 + Mock::given(method("GET")) 152 + .and(path("/xrpc/sh.tangled.knot.version")) 153 + .respond_with( 154 + ResponseTemplate::new(200) 155 + .set_body_json(json!({ "version": "1.0.0 (cafe)", "capabilities": caps })), 156 + ) 157 + .expect(expect) 158 + .mount(server) 159 + .await; 160 + } 161 + 162 + #[tokio::test] 163 + async fn declares_knot_acl() { 164 + let server = MockServer::start().await; 165 + mount_version(&server, json!(["knot-acl"]), 1).await; 166 + let (gate, host) = gate(&server, Arc::new(ManualClock::new())); 167 + assert!(gate.has_knot_acl(&host).await); 168 + assert!(gate.is_native(&host)); 169 + } 170 + 171 + #[tokio::test] 172 + async fn legacy_knot_without_capability() { 173 + let server = MockServer::start().await; 174 + mount_version(&server, json!([]), 1).await; 175 + let (gate, host) = gate(&server, Arc::new(ManualClock::new())); 176 + assert!(!gate.has_knot_acl(&host).await); 177 + assert!(!gate.is_native(&host)); 178 + } 179 + 180 + #[tokio::test] 181 + async fn native_is_latched_and_survives_probe_error() { 182 + let server = MockServer::start().await; 183 + mount_version(&server, json!(["knot-acl"]), 1).await; 184 + let clock = Arc::new(ManualClock::new()); 185 + let (gate, host) = gate(&server, clock.clone()); 186 + assert!(gate.has_knot_acl(&host).await); 187 + 188 + server.reset().await; 189 + clock.advance(LEGACY_REPROBE_INTERVAL + Duration::from_secs(1)); 190 + assert!( 191 + gate.has_knot_acl(&host).await, 192 + "latched native never re-probes" 193 + ); 194 + assert!(gate.is_native(&host)); 195 + } 196 + 197 + #[tokio::test] 198 + async fn legacy_throttled_then_reprobed_after_interval() { 199 + let server = MockServer::start().await; 200 + mount_version(&server, json!([]), 2).await; 201 + let clock = Arc::new(ManualClock::new()); 202 + let (gate, host) = gate(&server, clock.clone()); 203 + assert!(!gate.has_knot_acl(&host).await); 204 + clock.advance(Duration::from_secs(60)); 205 + assert!(!gate.has_knot_acl(&host).await); 206 + clock.advance(LEGACY_REPROBE_INTERVAL); 207 + assert!(!gate.has_knot_acl(&host).await); 208 + } 209 + 210 + #[tokio::test] 211 + async fn legacy_upgrade_is_detected_on_reprobe() { 212 + let server = MockServer::start().await; 213 + Mock::given(method("GET")) 214 + .and(path("/xrpc/sh.tangled.knot.version")) 215 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "version": "1.0.0" }))) 216 + .up_to_n_times(1) 217 + .mount(&server) 218 + .await; 219 + Mock::given(method("GET")) 220 + .and(path("/xrpc/sh.tangled.knot.version")) 221 + .respond_with( 222 + ResponseTemplate::new(200) 223 + .set_body_json(json!({ "version": "1.1.0", "capabilities": ["knot-acl"] })), 224 + ) 225 + .mount(&server) 226 + .await; 227 + let clock = Arc::new(ManualClock::new()); 228 + let (gate, host) = gate(&server, clock.clone()); 229 + assert!(!gate.has_knot_acl(&host).await); 230 + clock.advance(LEGACY_REPROBE_INTERVAL + Duration::from_secs(1)); 231 + assert!(gate.has_knot_acl(&host).await); 232 + assert!(gate.is_native(&host)); 233 + } 234 + 235 + #[tokio::test] 236 + async fn probe_error_throttled_briefly_then_reprobed() { 237 + let server = MockServer::start().await; 238 + Mock::given(method("GET")) 239 + .and(path("/xrpc/sh.tangled.knot.version")) 240 + .respond_with(ResponseTemplate::new(503)) 241 + .expect(2) 242 + .mount(&server) 243 + .await; 244 + let clock = Arc::new(ManualClock::new()); 245 + let (gate, host) = gate(&server, clock.clone()); 246 + assert!(!gate.has_knot_acl(&host).await); 247 + clock.advance(Duration::from_secs(1)); 248 + assert!( 249 + !gate.has_knot_acl(&host).await, 250 + "error reprobe is throttled" 251 + ); 252 + clock.advance(ERROR_REPROBE_INTERVAL); 253 + assert!(!gate.has_knot_acl(&host).await); 254 + } 255 + }
+13
bobbin/crates/knot-ingest/src/lib.rs
··· 1 + pub mod client; 2 + pub mod gate; 3 + pub mod orchestrator; 4 + pub mod registry; 5 + pub mod roster; 6 + pub mod stream; 7 + 8 + pub use client::{AclEntry, AclListing, Completeness, KnotClient, KnotClientError, knot_endpoint}; 9 + pub use gate::CapabilityGate; 10 + pub use orchestrator::Orchestrator; 11 + pub use registry::KnotRegistry; 12 + pub use roster::{AclOp, Cursor, Roster}; 13 + pub use stream::{StreamConfig, run_stream};
+463
bobbin/crates/knot-ingest/src/orchestrator.rs
··· 1 + use std::collections::{HashMap, HashSet}; 2 + use std::sync::{Arc, Mutex}; 3 + use std::time::Duration; 4 + 5 + use bobbin_edge_index::EdgeStore; 6 + use bobbin_knot_proxy::KnotHost; 7 + use bobbin_runtime::{Clock, WsTransport}; 8 + use bobbin_types::knot_acl::{KnotHostKey, host_to_knot_did}; 9 + use chrono::{DateTime, Utc}; 10 + use futures::StreamExt; 11 + use jacquard_common::DefaultStr; 12 + use jacquard_common::types::did::Did; 13 + use tokio_util::sync::CancellationToken; 14 + 15 + use crate::client::{AclListing, Completeness, KnotClient, knot_endpoint}; 16 + use crate::gate::CapabilityGate; 17 + use crate::registry::KnotRegistry; 18 + use crate::roster::{AclOp, Cursor, Roster}; 19 + use crate::stream::{StreamConfig, run_stream}; 20 + 21 + const POLL_INTERVAL: Duration = Duration::from_secs(30); 22 + const RECONCILE_INTERVAL: Duration = Duration::from_secs(300); 23 + const PROBE_CONCURRENCY: usize = 16; 24 + 25 + pub struct Orchestrator { 26 + pub client: Arc<KnotClient>, 27 + pub gate: Arc<CapabilityGate>, 28 + pub registry: Arc<KnotRegistry>, 29 + pub store: Arc<EdgeStore>, 30 + pub ws: Arc<dyn WsTransport>, 31 + pub clock: Arc<dyn Clock>, 32 + pub dev: bool, 33 + pub allow_private: bool, 34 + pub cancel: CancellationToken, 35 + } 36 + 37 + impl Orchestrator { 38 + pub async fn run(self) { 39 + let mut subscribed: HashMap<KnotHostKey, CancellationToken> = HashMap::new(); 40 + let mut unspawnable: HashSet<KnotHostKey> = HashSet::new(); 41 + loop { 42 + if self.cancel.is_cancelled() { 43 + break; 44 + } 45 + self.discover(&mut subscribed, &mut unspawnable).await; 46 + tokio::select! { 47 + _ = self.cancel.cancelled() => break, 48 + _ = self.clock.sleep(POLL_INTERVAL) => {} 49 + } 50 + } 51 + } 52 + 53 + async fn discover( 54 + &self, 55 + subscribed: &mut HashMap<KnotHostKey, CancellationToken>, 56 + unspawnable: &mut HashSet<KnotHostKey>, 57 + ) { 58 + let candidates: Vec<KnotHostKey> = self 59 + .registry 60 + .hosts() 61 + .into_iter() 62 + .filter(|host| !subscribed.contains_key(host) && !unspawnable.contains(host)) 63 + .collect(); 64 + let approved: Vec<KnotHostKey> = futures::stream::iter(candidates) 65 + .map(|host| async move { self.gate.has_knot_acl(&host).await.then_some(host) }) 66 + .buffer_unordered(PROBE_CONCURRENCY) 67 + .filter_map(|approved| async move { approved }) 68 + .collect() 69 + .await; 70 + approved 71 + .into_iter() 72 + .for_each(|host| match self.spawn(&host) { 73 + Some(token) => { 74 + subscribed.insert(host, token); 75 + } 76 + None => { 77 + tracing::warn!(host = %host, "skipping unspawnable knot endpoint"); 78 + unspawnable.insert(host); 79 + } 80 + }); 81 + } 82 + 83 + fn spawn(&self, host: &KnotHostKey) -> Option<CancellationToken> { 84 + let endpoint = knot_endpoint(host.as_str(), self.dev, self.allow_private).ok()?; 85 + let knot_did = host_to_knot_did(host.as_str())?; 86 + let roster = Arc::new(Mutex::new(Roster::new( 87 + self.store.clone(), 88 + knot_did, 89 + self.registry.clone(), 90 + host.clone(), 91 + ))); 92 + let token = self.cancel.child_token(); 93 + 94 + let stream_cfg = StreamConfig { 95 + ws: self.ws.clone(), 96 + clock: self.clock.clone(), 97 + cancel: token.clone(), 98 + }; 99 + let stream_roster = roster.clone(); 100 + let stream_endpoint = endpoint.clone(); 101 + tokio::spawn(async move { 102 + run_stream(&stream_cfg, &stream_endpoint, &stream_roster, 0).await; 103 + }); 104 + 105 + let client = self.client.clone(); 106 + let registry = self.registry.clone(); 107 + let clock = self.clock.clone(); 108 + let host_owned = host.clone(); 109 + let reconcile_token = token.clone(); 110 + tokio::spawn(async move { 111 + reconcile_loop( 112 + &client, 113 + &endpoint, 114 + &host_owned, 115 + &registry, 116 + &roster, 117 + &*clock, 118 + &reconcile_token, 119 + ) 120 + .await; 121 + }); 122 + 123 + Some(token) 124 + } 125 + } 126 + 127 + async fn reconcile_loop( 128 + client: &KnotClient, 129 + endpoint: &KnotHost, 130 + host: &KnotHostKey, 131 + registry: &KnotRegistry, 132 + roster: &Mutex<Roster>, 133 + clock: &dyn Clock, 134 + cancel: &CancellationToken, 135 + ) { 136 + loop { 137 + if cancel.is_cancelled() { 138 + return; 139 + } 140 + reconcile_once(client, endpoint, host, registry, roster).await; 141 + tokio::select! { 142 + _ = cancel.cancelled() => return, 143 + _ = clock.sleep(RECONCILE_INTERVAL) => {} 144 + } 145 + } 146 + } 147 + 148 + async fn reconcile_once( 149 + client: &KnotClient, 150 + endpoint: &KnotHost, 151 + host: &KnotHostKey, 152 + registry: &KnotRegistry, 153 + roster: &Mutex<Roster>, 154 + ) { 155 + let horizon = roster.lock().unwrap().max_cursor(); 156 + 157 + match client.list_members(endpoint).await { 158 + Ok(AclListing { 159 + entries, 160 + completeness, 161 + }) => { 162 + let present: HashSet<Did<DefaultStr>> = 163 + entries.iter().map(|entry| entry.subject.clone()).collect(); 164 + let mut guard = roster.lock().unwrap(); 165 + entries.into_iter().for_each(|entry| { 166 + guard.apply_member(AclOp::Add, entry.subject, Cursor(nanos(entry.created_at))); 167 + }); 168 + if completeness == Completeness::Complete { 169 + guard.reap_members(&present, horizon); 170 + } 171 + } 172 + Err(err) => tracing::warn!(host = %host, error = %err, "knot member reconcile failed"), 173 + } 174 + 175 + futures::stream::iter(registry.repos(host)) 176 + .for_each(|repo| async move { 177 + match client.list_collaborators(endpoint, &repo).await { 178 + Ok(AclListing { 179 + entries, 180 + completeness, 181 + }) => { 182 + let present: HashSet<Did<DefaultStr>> = 183 + entries.iter().map(|entry| entry.subject.clone()).collect(); 184 + let mut guard = roster.lock().unwrap(); 185 + entries.into_iter().for_each(|entry| { 186 + guard.apply_collaborator( 187 + AclOp::Add, 188 + repo.clone(), 189 + entry.subject, 190 + Cursor(nanos(entry.created_at)), 191 + ); 192 + }); 193 + if completeness == Completeness::Complete { 194 + guard.reap_collaborators(&repo, &present, horizon); 195 + } 196 + } 197 + Err(err) => { 198 + tracing::warn!(host = %host, repo = %repo.as_ref(), error = %err, "knot collaborator reconcile failed") 199 + } 200 + } 201 + }) 202 + .await; 203 + 204 + roster.lock().unwrap().purge_legacy(); 205 + } 206 + 207 + fn nanos(timestamp: DateTime<Utc>) -> i64 { 208 + timestamp.timestamp_nanos_opt().unwrap_or(0) 209 + } 210 + 211 + #[cfg(test)] 212 + mod tests { 213 + use super::*; 214 + use crate::client::authority; 215 + use bobbin_runtime::{ReqwestHttp, RuntimeHasher}; 216 + use bobbin_types::edges::Edge; 217 + use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 218 + use jacquard_common::types::string::AtUri; 219 + use serde_json::json; 220 + use wiremock::matchers::{method, path, query_param}; 221 + use wiremock::{Mock, MockServer, ResponseTemplate}; 222 + 223 + fn did(s: &str) -> Did<DefaultStr> { 224 + Did::new_owned(s).unwrap() 225 + } 226 + 227 + fn member_count(store: &EdgeStore, subject: &str) -> u64 { 228 + store.count(&EdgeKey::new( 229 + nsid_static("sh.tangled.knot.member"), 230 + SubjectRef::Did(did(subject)), 231 + )) 232 + } 233 + 234 + fn collaborator_count(store: &EdgeStore, repo: &str) -> u64 { 235 + store.count(&EdgeKey::new( 236 + nsid_static("sh.tangled.repo.collaborator"), 237 + SubjectRef::Did(did(repo)), 238 + )) 239 + } 240 + 241 + #[tokio::test] 242 + async fn reconcile_backfills_members_and_collaborators() { 243 + let server = MockServer::start().await; 244 + let endpoint = KnotHost::parse(&server.uri()).unwrap(); 245 + let host = KnotHostKey::new(&authority(&endpoint)); 246 + 247 + Mock::given(method("GET")) 248 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 249 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 250 + "items": [ 251 + {"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}, 252 + {"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T00:00:00Z"} 253 + ] 254 + }))) 255 + .mount(&server) 256 + .await; 257 + Mock::given(method("GET")) 258 + .and(path("/xrpc/sh.tangled.repo.listCollaborators")) 259 + .and(query_param("subject", "did:plc:scallop")) 260 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 261 + "items": [ 262 + {"subject": "did:plc:olaren", "addedBy": "did:plc:boltless", "createdAt": "2026-06-03T00:00:00Z"} 263 + ] 264 + }))) 265 + .mount(&server) 266 + .await; 267 + 268 + let registry = Arc::new(KnotRegistry::new()); 269 + registry.observe_repo(&host, did("did:plc:scallop")); 270 + 271 + let store = Arc::new(EdgeStore::new(RuntimeHasher::default())); 272 + let knot_did = host_to_knot_did(host.as_str()).unwrap(); 273 + let roster = Mutex::new(Roster::new( 274 + store.clone(), 275 + knot_did, 276 + registry.clone(), 277 + host.clone(), 278 + )); 279 + let client = KnotClient::new(ReqwestHttp::shared(reqwest::Client::new())); 280 + 281 + reconcile_once(&client, &endpoint, &host, &registry, &roster).await; 282 + 283 + assert_eq!(member_count(&store, "did:plc:boltless"), 1); 284 + assert_eq!(member_count(&store, "did:plc:akshay"), 1); 285 + assert_eq!(collaborator_count(&store, "did:plc:scallop"), 1); 286 + } 287 + 288 + #[tokio::test] 289 + async fn reconcile_reaps_departed_member() { 290 + let server = MockServer::start().await; 291 + let endpoint = KnotHost::parse(&server.uri()).unwrap(); 292 + let host = KnotHostKey::new(&authority(&endpoint)); 293 + 294 + let registry = Arc::new(KnotRegistry::new()); 295 + let store = Arc::new(EdgeStore::new(RuntimeHasher::default())); 296 + let knot_did = host_to_knot_did(host.as_str()).unwrap(); 297 + let roster = Mutex::new(Roster::new( 298 + store.clone(), 299 + knot_did, 300 + registry.clone(), 301 + host.clone(), 302 + )); 303 + let client = KnotClient::new(ReqwestHttp::shared(reqwest::Client::new())); 304 + 305 + Mock::given(method("GET")) 306 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 307 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 308 + "items": [ 309 + {"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}, 310 + {"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T00:00:00Z"} 311 + ] 312 + }))) 313 + .mount(&server) 314 + .await; 315 + reconcile_once(&client, &endpoint, &host, &registry, &roster).await; 316 + assert_eq!(member_count(&store, "did:plc:boltless"), 1); 317 + assert_eq!(member_count(&store, "did:plc:akshay"), 1); 318 + 319 + server.reset().await; 320 + Mock::given(method("GET")) 321 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 322 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 323 + "items": [ 324 + {"subject": "did:plc:akshay", "addedBy": "did:plc:akshay", "createdAt": "2026-06-02T00:00:00Z"} 325 + ] 326 + }))) 327 + .mount(&server) 328 + .await; 329 + reconcile_once(&client, &endpoint, &host, &registry, &roster).await; 330 + 331 + assert_eq!( 332 + member_count(&store, "did:plc:boltless"), 333 + 0, 334 + "a member dropped from the authoritative snapshot is reaped on reconcile" 335 + ); 336 + assert_eq!(member_count(&store, "did:plc:akshay"), 1); 337 + } 338 + 339 + #[tokio::test] 340 + async fn reconcile_skips_reap_when_member_list_truncated() { 341 + let server = MockServer::start().await; 342 + let endpoint = KnotHost::parse(&server.uri()).unwrap(); 343 + let host = KnotHostKey::new(&authority(&endpoint)); 344 + 345 + Mock::given(method("GET")) 346 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 347 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 348 + "items": [{"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"}], 349 + "cursor": "more" 350 + }))) 351 + .mount(&server) 352 + .await; 353 + 354 + let registry = Arc::new(KnotRegistry::new()); 355 + let store = Arc::new(EdgeStore::new(RuntimeHasher::default())); 356 + let knot_did = host_to_knot_did(host.as_str()).unwrap(); 357 + let roster = Mutex::new(Roster::new( 358 + store.clone(), 359 + knot_did, 360 + registry.clone(), 361 + host.clone(), 362 + )); 363 + let client = KnotClient::new(ReqwestHttp::shared(reqwest::Client::new())); 364 + 365 + let stayed = did("did:plc:akshay"); 366 + roster 367 + .lock() 368 + .unwrap() 369 + .apply_member(AclOp::Add, stayed, Cursor(1_000_000)); 370 + assert_eq!(member_count(&store, "did:plc:akshay"), 1); 371 + 372 + reconcile_once(&client, &endpoint, &host, &registry, &roster).await; 373 + 374 + assert_eq!( 375 + member_count(&store, "did:plc:akshay"), 376 + 1, 377 + "a truncated member snapshot must not reap members it could not enumerate" 378 + ); 379 + assert_eq!(member_count(&store, "did:plc:boltless"), 1); 380 + } 381 + 382 + fn seed_legacy_edge(store: &EdgeStore, kind: &'static str, subject: &str, source: &str) { 383 + let source = AtUri::new_owned(source).unwrap(); 384 + store.upsert_source( 385 + &source, 386 + vec![Edge { 387 + kind: nsid_static(kind), 388 + subject: SubjectRef::Did(did(subject)), 389 + source: source.clone(), 390 + sort_micros: 0, 391 + }], 392 + ); 393 + } 394 + 395 + #[tokio::test] 396 + async fn reconcile_purges_seeded_legacy_acl() { 397 + let server = MockServer::start().await; 398 + let endpoint = KnotHost::parse(&server.uri()).unwrap(); 399 + let host = KnotHostKey::new(&authority(&endpoint)); 400 + 401 + Mock::given(method("GET")) 402 + .and(path("/xrpc/sh.tangled.knot.listMembers")) 403 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 404 + "items": [ 405 + {"subject": "did:plc:boltless", "addedBy": "did:plc:akshay", "createdAt": "2026-06-01T00:00:00Z"} 406 + ] 407 + }))) 408 + .mount(&server) 409 + .await; 410 + Mock::given(method("GET")) 411 + .and(path("/xrpc/sh.tangled.repo.listCollaborators")) 412 + .and(query_param("subject", "did:plc:scallop")) 413 + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ 414 + "items": [ 415 + {"subject": "did:plc:olaren", "addedBy": "did:plc:boltless", "createdAt": "2026-06-03T00:00:00Z"} 416 + ] 417 + }))) 418 + .mount(&server) 419 + .await; 420 + 421 + let registry = Arc::new(KnotRegistry::new()); 422 + let repo = did("did:plc:scallop"); 423 + registry.observe_repo(&host, repo.clone()); 424 + 425 + let store = Arc::new(EdgeStore::new(RuntimeHasher::default())); 426 + let member_source = "at://did:plc:akshay/sh.tangled.knot.member/r1"; 427 + seed_legacy_edge( 428 + &store, 429 + "sh.tangled.knot.member", 430 + "did:plc:boltless", 431 + member_source, 432 + ); 433 + registry.note_legacy_member(AtUri::new_owned(member_source).unwrap(), &host); 434 + seed_legacy_edge( 435 + &store, 436 + "sh.tangled.repo.collaborator", 437 + "did:plc:scallop", 438 + "at://did:plc:akshay/sh.tangled.repo.collaborator/r2", 439 + ); 440 + 441 + let knot_did = host_to_knot_did(host.as_str()).unwrap(); 442 + let roster = Mutex::new(Roster::new( 443 + store.clone(), 444 + knot_did, 445 + registry.clone(), 446 + host.clone(), 447 + )); 448 + let client = KnotClient::new(ReqwestHttp::shared(reqwest::Client::new())); 449 + 450 + reconcile_once(&client, &endpoint, &host, &registry, &roster).await; 451 + 452 + assert_eq!( 453 + member_count(&store, "did:plc:boltless"), 454 + 1, 455 + "stale legacy member purged, knot-owned member kept" 456 + ); 457 + assert_eq!( 458 + collaborator_count(&store, "did:plc:scallop"), 459 + 1, 460 + "stale legacy collaborator purged, knot-owned collaborator kept" 461 + ); 462 + } 463 + }
+197
bobbin/crates/knot-ingest/src/registry.rs
··· 1 + use std::collections::{HashMap, HashSet}; 2 + use std::sync::Mutex; 3 + 4 + use bobbin_types::knot_acl::KnotHostKey; 5 + use jacquard_common::DefaultStr; 6 + use jacquard_common::types::did::Did; 7 + use jacquard_common::types::string::AtUri; 8 + 9 + #[derive(Default)] 10 + struct Inner { 11 + hosts: HashSet<KnotHostKey>, 12 + repos: HashMap<KnotHostKey, HashSet<Did<DefaultStr>>>, 13 + repo_host: HashMap<Did<DefaultStr>, KnotHostKey>, 14 + legacy_members: HashMap<AtUri<DefaultStr>, KnotHostKey>, 15 + } 16 + 17 + #[derive(Default)] 18 + pub struct KnotRegistry { 19 + inner: Mutex<Inner>, 20 + } 21 + 22 + impl KnotRegistry { 23 + pub fn new() -> Self { 24 + Self::default() 25 + } 26 + 27 + pub fn observe_host(&self, host: &KnotHostKey) { 28 + self.inner.lock().unwrap().hosts.insert(host.clone()); 29 + } 30 + 31 + pub fn observe_repo(&self, host: &KnotHostKey, repo: Did<DefaultStr>) { 32 + let mut inner = self.inner.lock().unwrap(); 33 + inner.hosts.insert(host.clone()); 34 + inner 35 + .repos 36 + .entry(host.clone()) 37 + .or_default() 38 + .insert(repo.clone()); 39 + inner.repo_host.insert(repo, host.clone()); 40 + } 41 + 42 + pub fn hosts(&self) -> Vec<KnotHostKey> { 43 + self.inner.lock().unwrap().hosts.iter().cloned().collect() 44 + } 45 + 46 + pub fn repos(&self, host: &KnotHostKey) -> Vec<Did<DefaultStr>> { 47 + self.inner 48 + .lock() 49 + .unwrap() 50 + .repos 51 + .get(host) 52 + .map(|set| set.iter().cloned().collect()) 53 + .unwrap_or_default() 54 + } 55 + 56 + pub fn repo_on_host(&self, host: &KnotHostKey, repo: &Did<DefaultStr>) -> bool { 57 + self.inner 58 + .lock() 59 + .unwrap() 60 + .repos 61 + .get(host) 62 + .is_some_and(|set| set.contains(repo)) 63 + } 64 + 65 + pub fn host_of_repo(&self, repo: &Did<DefaultStr>) -> Option<KnotHostKey> { 66 + self.inner.lock().unwrap().repo_host.get(repo).cloned() 67 + } 68 + 69 + pub fn note_legacy_member(&self, source: AtUri<DefaultStr>, host: &KnotHostKey) { 70 + self.inner 71 + .lock() 72 + .unwrap() 73 + .legacy_members 74 + .insert(source, host.clone()); 75 + } 76 + 77 + pub fn forget_legacy_member(&self, source: &AtUri<DefaultStr>) { 78 + self.inner.lock().unwrap().legacy_members.remove(source); 79 + } 80 + 81 + pub fn drain_legacy_members(&self, host: &KnotHostKey) -> Vec<AtUri<DefaultStr>> { 82 + let mut inner = self.inner.lock().unwrap(); 83 + let matched: Vec<AtUri<DefaultStr>> = inner 84 + .legacy_members 85 + .iter() 86 + .filter(|(_, member_host)| member_host.as_str() == host.as_str()) 87 + .map(|(source, _)| source.clone()) 88 + .collect(); 89 + matched.iter().for_each(|source| { 90 + inner.legacy_members.remove(source); 91 + }); 92 + matched 93 + } 94 + } 95 + 96 + #[cfg(test)] 97 + mod tests { 98 + use super::*; 99 + 100 + fn did(s: &str) -> Did<DefaultStr> { 101 + Did::new_owned(s).unwrap() 102 + } 103 + 104 + fn at(s: &str) -> AtUri<DefaultStr> { 105 + AtUri::new_owned(s).unwrap() 106 + } 107 + 108 + fn host(s: &str) -> KnotHostKey { 109 + KnotHostKey::new(s) 110 + } 111 + 112 + #[test] 113 + fn observe_repo_registers_host_and_repo() { 114 + let registry = KnotRegistry::new(); 115 + registry.observe_repo(&host("oyster.cafe"), did("did:plc:scallop")); 116 + registry.observe_repo(&host("oyster.cafe"), did("did:plc:limpet")); 117 + 118 + assert_eq!(registry.hosts(), vec![host("oyster.cafe")]); 119 + let mut repos = registry.repos(&host("oyster.cafe")); 120 + repos.sort_by(|a, b| a.as_ref().cmp(b.as_ref())); 121 + assert_eq!(repos, vec![did("did:plc:limpet"), did("did:plc:scallop")]); 122 + } 123 + 124 + #[test] 125 + fn observe_repo_dedups() { 126 + let registry = KnotRegistry::new(); 127 + registry.observe_repo(&host("nel.pet"), did("did:plc:whelk")); 128 + registry.observe_repo(&host("nel.pet"), did("did:plc:whelk")); 129 + assert_eq!(registry.repos(&host("nel.pet")), vec![did("did:plc:whelk")]); 130 + } 131 + 132 + #[test] 133 + fn observe_host_without_repos() { 134 + let registry = KnotRegistry::new(); 135 + registry.observe_host(&host("oyster.cafe")); 136 + assert_eq!(registry.hosts(), vec![host("oyster.cafe")]); 137 + assert!(registry.repos(&host("oyster.cafe")).is_empty()); 138 + } 139 + 140 + #[test] 141 + fn host_lookups_are_case_insensitive() { 142 + let registry = KnotRegistry::new(); 143 + registry.observe_repo(&host("KT.Oyster.Cafe"), did("did:plc:scallop")); 144 + assert!(registry.repo_on_host(&host("kt.oyster.cafe"), &did("did:plc:scallop"))); 145 + assert_eq!( 146 + registry.host_of_repo(&did("did:plc:scallop")), 147 + Some(host("kt.oyster.cafe")) 148 + ); 149 + } 150 + 151 + #[test] 152 + fn host_of_repo_resolves_owning_knot() { 153 + let registry = KnotRegistry::new(); 154 + registry.observe_repo(&host("oyster.cafe"), did("did:plc:scallop")); 155 + assert_eq!( 156 + registry.host_of_repo(&did("did:plc:scallop")), 157 + Some(host("oyster.cafe")) 158 + ); 159 + assert_eq!(registry.host_of_repo(&did("did:plc:limpet")), None); 160 + } 161 + 162 + #[test] 163 + fn drain_legacy_members_returns_only_matching_host() { 164 + let registry = KnotRegistry::new(); 165 + let here = at("at://did:plc:akshay/sh.tangled.knot.member/r1"); 166 + let elsewhere = at("at://did:plc:akshay/sh.tangled.knot.member/r2"); 167 + registry.note_legacy_member(here.clone(), &host("oyster.cafe")); 168 + registry.note_legacy_member(elsewhere.clone(), &host("nel.pet")); 169 + 170 + assert_eq!( 171 + registry.drain_legacy_members(&host("oyster.cafe")), 172 + vec![here] 173 + ); 174 + assert!( 175 + registry 176 + .drain_legacy_members(&host("oyster.cafe")) 177 + .is_empty() 178 + ); 179 + assert_eq!( 180 + registry.drain_legacy_members(&host("nel.pet")), 181 + vec![elsewhere] 182 + ); 183 + } 184 + 185 + #[test] 186 + fn forget_legacy_member_drops_source() { 187 + let registry = KnotRegistry::new(); 188 + let source = at("at://did:plc:akshay/sh.tangled.knot.member/r1"); 189 + registry.note_legacy_member(source.clone(), &host("oyster.cafe")); 190 + registry.forget_legacy_member(&source); 191 + assert!( 192 + registry 193 + .drain_legacy_members(&host("oyster.cafe")) 194 + .is_empty() 195 + ); 196 + } 197 + }
+445
bobbin/crates/knot-ingest/src/roster.rs
··· 1 + use std::collections::{HashMap, HashSet}; 2 + use std::sync::Arc; 3 + 4 + use bobbin_edge_index::EdgeStore; 5 + use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 6 + use bobbin_types::knot_acl::{self, KnotHostKey}; 7 + use jacquard_common::DefaultStr; 8 + use jacquard_common::types::did::Did; 9 + use serde::Deserialize; 10 + 11 + use crate::registry::KnotRegistry; 12 + 13 + const REPO_COLLABORATOR_KIND: &str = "sh.tangled.repo.collaborator"; 14 + 15 + #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] 16 + pub struct Cursor(pub i64); 17 + 18 + impl Cursor { 19 + fn micros(self) -> u64 { 20 + self.0.max(0) as u64 / 1000 21 + } 22 + } 23 + 24 + #[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize)] 25 + #[serde(rename_all = "lowercase")] 26 + pub enum AclOp { 27 + Add, 28 + Remove, 29 + } 30 + 31 + #[derive(Clone, Eq, Hash, PartialEq)] 32 + enum DedupKey { 33 + Member(Did<DefaultStr>), 34 + Collaborator(Did<DefaultStr>, Did<DefaultStr>), 35 + } 36 + 37 + struct SeenState { 38 + cursor: Cursor, 39 + present: bool, 40 + } 41 + 42 + pub struct Roster { 43 + store: Arc<EdgeStore>, 44 + knot: Did<DefaultStr>, 45 + registry: Arc<KnotRegistry>, 46 + host: KnotHostKey, 47 + seen: HashMap<DedupKey, SeenState>, 48 + } 49 + 50 + impl Roster { 51 + pub fn new( 52 + store: Arc<EdgeStore>, 53 + knot: Did<DefaultStr>, 54 + registry: Arc<KnotRegistry>, 55 + host: KnotHostKey, 56 + ) -> Self { 57 + Self { 58 + store, 59 + knot, 60 + registry, 61 + host, 62 + seen: HashMap::new(), 63 + } 64 + } 65 + 66 + pub fn apply_member(&mut self, op: AclOp, subject: Did<DefaultStr>, cursor: Cursor) { 67 + if !self.advance( 68 + DedupKey::Member(subject.clone()), 69 + cursor, 70 + matches!(op, AclOp::Add), 71 + ) { 72 + return; 73 + } 74 + match op { 75 + AclOp::Add => { 76 + if let Some((source, edges)) = 77 + knot_acl::member_upsert(&self.knot, &subject, cursor.micros()) 78 + { 79 + self.store.upsert_source(&source, edges); 80 + } 81 + } 82 + AclOp::Remove => { 83 + if let Some(source) = knot_acl::member_source(&self.knot, &subject) { 84 + self.store.remove_source(&source); 85 + } 86 + } 87 + } 88 + } 89 + 90 + pub fn apply_collaborator( 91 + &mut self, 92 + op: AclOp, 93 + repo: Did<DefaultStr>, 94 + subject: Did<DefaultStr>, 95 + cursor: Cursor, 96 + ) { 97 + if !self.registry.repo_on_host(&self.host, &repo) { 98 + return; 99 + } 100 + if !self.advance( 101 + DedupKey::Collaborator(repo.clone(), subject.clone()), 102 + cursor, 103 + matches!(op, AclOp::Add), 104 + ) { 105 + return; 106 + } 107 + match op { 108 + AclOp::Add => { 109 + if let Some((source, edges)) = 110 + knot_acl::collaborator_upsert(&repo, &subject, cursor.micros()) 111 + { 112 + self.store.upsert_source(&source, edges); 113 + } 114 + } 115 + AclOp::Remove => { 116 + if let Some(source) = knot_acl::collaborator_source(&repo, &subject) { 117 + self.store.remove_source(&source); 118 + } 119 + } 120 + } 121 + } 122 + 123 + pub fn max_cursor(&self) -> Cursor { 124 + self.seen 125 + .values() 126 + .map(|state| state.cursor) 127 + .max() 128 + .unwrap_or(Cursor(0)) 129 + } 130 + 131 + pub fn reap_members(&mut self, present: &HashSet<Did<DefaultStr>>, horizon: Cursor) { 132 + let stale: Vec<Did<DefaultStr>> = self 133 + .seen 134 + .iter() 135 + .filter_map(|(key, state)| match key { 136 + DedupKey::Member(subject) 137 + if state.present && state.cursor <= horizon && !present.contains(subject) => 138 + { 139 + Some(subject.clone()) 140 + } 141 + _ => None, 142 + }) 143 + .collect(); 144 + stale 145 + .into_iter() 146 + .for_each(|subject| self.retire_member(subject)); 147 + } 148 + 149 + pub fn reap_collaborators( 150 + &mut self, 151 + repo: &Did<DefaultStr>, 152 + present: &HashSet<Did<DefaultStr>>, 153 + horizon: Cursor, 154 + ) { 155 + let stale: Vec<Did<DefaultStr>> = self 156 + .seen 157 + .iter() 158 + .filter_map(|(key, state)| match key { 159 + DedupKey::Collaborator(edge_repo, subject) 160 + if edge_repo == repo 161 + && state.present 162 + && state.cursor <= horizon 163 + && !present.contains(subject) => 164 + { 165 + Some(subject.clone()) 166 + } 167 + _ => None, 168 + }) 169 + .collect(); 170 + stale 171 + .into_iter() 172 + .for_each(|subject| self.retire_collaborator(repo.clone(), subject)); 173 + } 174 + 175 + pub fn purge_legacy(&self) { 176 + self.registry 177 + .drain_legacy_members(&self.host) 178 + .iter() 179 + .for_each(|source| self.store.remove_source(source)); 180 + 181 + self.registry 182 + .repos(&self.host) 183 + .into_iter() 184 + .for_each(|repo| { 185 + let key = EdgeKey::new(nsid_static(REPO_COLLABORATOR_KIND), SubjectRef::Did(repo)); 186 + self.store 187 + .sources_for(&key) 188 + .into_iter() 189 + .filter(|source| knot_acl::decode_knot_owned_source(source).is_none()) 190 + .for_each(|source| self.store.remove_source(&source)); 191 + }); 192 + } 193 + 194 + fn retire_member(&mut self, subject: Did<DefaultStr>) { 195 + if let Some(source) = knot_acl::member_source(&self.knot, &subject) { 196 + self.store.remove_source(&source); 197 + } 198 + if let Some(state) = self.seen.get_mut(&DedupKey::Member(subject)) { 199 + state.present = false; 200 + } 201 + } 202 + 203 + fn retire_collaborator(&mut self, repo: Did<DefaultStr>, subject: Did<DefaultStr>) { 204 + if let Some(source) = knot_acl::collaborator_source(&repo, &subject) { 205 + self.store.remove_source(&source); 206 + } 207 + if let Some(state) = self.seen.get_mut(&DedupKey::Collaborator(repo, subject)) { 208 + state.present = false; 209 + } 210 + } 211 + 212 + fn advance(&mut self, key: DedupKey, cursor: Cursor, present: bool) -> bool { 213 + match self.seen.get(&key) { 214 + Some(state) if cursor <= state.cursor => false, 215 + _ => { 216 + self.seen.insert(key, SeenState { cursor, present }); 217 + true 218 + } 219 + } 220 + } 221 + } 222 + 223 + #[cfg(test)] 224 + mod tests { 225 + use super::*; 226 + use bobbin_runtime::RuntimeHasher; 227 + use bobbin_types::edges::Edge; 228 + use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 229 + use jacquard_common::types::string::AtUri; 230 + 231 + fn store() -> Arc<EdgeStore> { 232 + Arc::new(EdgeStore::new(RuntimeHasher::default())) 233 + } 234 + 235 + fn did(s: &str) -> Did<DefaultStr> { 236 + Did::new_owned(s).unwrap() 237 + } 238 + 239 + fn at(s: &str) -> AtUri<DefaultStr> { 240 + AtUri::new_owned(s).unwrap() 241 + } 242 + 243 + fn host() -> KnotHostKey { 244 + KnotHostKey::new("oyster.cafe") 245 + } 246 + 247 + fn add_legacy_edge( 248 + store: &EdgeStore, 249 + kind: &'static str, 250 + subject: &Did<DefaultStr>, 251 + source: &AtUri<DefaultStr>, 252 + ) { 253 + store.upsert_source( 254 + source, 255 + vec![Edge { 256 + kind: nsid_static(kind), 257 + subject: SubjectRef::Did(subject.clone()), 258 + source: source.clone(), 259 + sort_micros: 0, 260 + }], 261 + ); 262 + } 263 + 264 + fn knot() -> Did<DefaultStr> { 265 + knot_acl::host_to_knot_did("oyster.cafe").unwrap() 266 + } 267 + 268 + fn member_count(store: &EdgeStore, subject: &Did<DefaultStr>) -> u64 { 269 + store.count(&EdgeKey::new( 270 + nsid_static("sh.tangled.knot.member"), 271 + SubjectRef::Did(subject.clone()), 272 + )) 273 + } 274 + 275 + fn collaborator_count(store: &EdgeStore, repo: &Did<DefaultStr>) -> u64 { 276 + store.count(&EdgeKey::new( 277 + nsid_static("sh.tangled.repo.collaborator"), 278 + SubjectRef::Did(repo.clone()), 279 + )) 280 + } 281 + 282 + fn empty_registry() -> Arc<KnotRegistry> { 283 + Arc::new(KnotRegistry::new()) 284 + } 285 + 286 + fn member_roster(store: Arc<EdgeStore>) -> Roster { 287 + Roster::new(store, knot(), empty_registry(), host()) 288 + } 289 + 290 + fn subjects(items: &[&Did<DefaultStr>]) -> HashSet<Did<DefaultStr>> { 291 + items.iter().map(|d| (*d).clone()).collect() 292 + } 293 + 294 + #[test] 295 + fn member_add_then_remove() { 296 + let store = store(); 297 + let mut roster = member_roster(store.clone()); 298 + let m = did("did:plc:boltless"); 299 + roster.apply_member(AclOp::Add, m.clone(), Cursor(1_000_000)); 300 + assert_eq!(member_count(&store, &m), 1); 301 + roster.apply_member(AclOp::Remove, m.clone(), Cursor(2_000_000)); 302 + assert_eq!(member_count(&store, &m), 0); 303 + } 304 + 305 + #[test] 306 + fn stale_add_cannot_resurrect_removed_member() { 307 + let store = store(); 308 + let mut roster = member_roster(store.clone()); 309 + let m = did("did:plc:boltless"); 310 + roster.apply_member(AclOp::Remove, m.clone(), Cursor(5)); 311 + roster.apply_member(AclOp::Add, m.clone(), Cursor(1)); 312 + assert_eq!(member_count(&store, &m), 0); 313 + } 314 + 315 + #[test] 316 + fn duplicate_cursor_is_idempotent() { 317 + let store = store(); 318 + let mut roster = member_roster(store.clone()); 319 + let m = did("did:plc:akshay"); 320 + roster.apply_member(AclOp::Add, m.clone(), Cursor(10)); 321 + roster.apply_member(AclOp::Add, m.clone(), Cursor(10)); 322 + assert_eq!(member_count(&store, &m), 1); 323 + } 324 + 325 + #[test] 326 + fn collaborator_keyed_on_repo_when_hosted() { 327 + let store = store(); 328 + let repo = did("did:plc:scallop"); 329 + let registry = empty_registry(); 330 + registry.observe_repo(&host(), repo.clone()); 331 + let mut roster = Roster::new(store.clone(), knot(), registry, host()); 332 + let subject = did("did:plc:olaren"); 333 + roster.apply_collaborator(AclOp::Add, repo.clone(), subject.clone(), Cursor(7)); 334 + assert_eq!(collaborator_count(&store, &repo), 1); 335 + roster.apply_collaborator(AclOp::Remove, repo.clone(), subject.clone(), Cursor(8)); 336 + assert_eq!(collaborator_count(&store, &repo), 0); 337 + } 338 + 339 + #[test] 340 + fn reap_removes_departed_member() { 341 + let store = store(); 342 + let mut roster = member_roster(store.clone()); 343 + let stayed = did("did:plc:akshay"); 344 + let left = did("did:plc:boltless"); 345 + roster.apply_member(AclOp::Add, stayed.clone(), Cursor(10)); 346 + roster.apply_member(AclOp::Add, left.clone(), Cursor(20)); 347 + 348 + let horizon = roster.max_cursor(); 349 + roster.reap_members(&subjects(&[&stayed]), horizon); 350 + 351 + assert_eq!(member_count(&store, &stayed), 1); 352 + assert_eq!( 353 + member_count(&store, &left), 354 + 0, 355 + "a member absent from the authoritative snapshot is reaped" 356 + ); 357 + } 358 + 359 + #[test] 360 + fn reap_skips_member_added_after_horizon() { 361 + let store = store(); 362 + let mut roster = member_roster(store.clone()); 363 + let m = did("did:plc:boltless"); 364 + let horizon = roster.max_cursor(); 365 + roster.apply_member(AclOp::Add, m.clone(), Cursor(100)); 366 + 367 + roster.reap_members(&subjects(&[]), horizon); 368 + 369 + assert_eq!( 370 + member_count(&store, &m), 371 + 1, 372 + "a member added after the snapshot horizon must survive the reap" 373 + ); 374 + } 375 + 376 + #[test] 377 + fn reap_removes_departed_collaborator() { 378 + let store = store(); 379 + let repo = did("did:plc:scallop"); 380 + let registry = empty_registry(); 381 + registry.observe_repo(&host(), repo.clone()); 382 + let mut roster = Roster::new(store.clone(), knot(), registry, host()); 383 + let left = did("did:plc:olaren"); 384 + roster.apply_collaborator(AclOp::Add, repo.clone(), left.clone(), Cursor(7)); 385 + 386 + let horizon = roster.max_cursor(); 387 + roster.reap_collaborators(&repo, &subjects(&[]), horizon); 388 + 389 + assert_eq!(collaborator_count(&store, &repo), 0); 390 + } 391 + 392 + #[test] 393 + fn purge_legacy_strips_pds_collaborator_keeps_knot_owned() { 394 + let store = store(); 395 + let repo = did("did:plc:scallop"); 396 + let registry = empty_registry(); 397 + registry.observe_repo(&host(), repo.clone()); 398 + let mut roster = Roster::new(store.clone(), knot(), registry, host()); 399 + 400 + roster.apply_collaborator(AclOp::Add, repo.clone(), did("did:plc:olaren"), Cursor(7)); 401 + add_legacy_edge( 402 + &store, 403 + "sh.tangled.repo.collaborator", 404 + &repo, 405 + &at("at://did:plc:akshay/sh.tangled.repo.collaborator/r1"), 406 + ); 407 + assert_eq!(collaborator_count(&store, &repo), 2); 408 + 409 + roster.purge_legacy(); 410 + assert_eq!( 411 + collaborator_count(&store, &repo), 412 + 1, 413 + "only the knot-owned collaborator survives the purge" 414 + ); 415 + } 416 + 417 + #[test] 418 + fn purge_legacy_removes_indexed_member_edges() { 419 + let store = store(); 420 + let registry = empty_registry(); 421 + let roster = Roster::new(store.clone(), knot(), registry.clone(), host()); 422 + let member = did("did:plc:boltless"); 423 + let source = at("at://did:plc:akshay/sh.tangled.knot.member/r1"); 424 + 425 + add_legacy_edge(&store, "sh.tangled.knot.member", &member, &source); 426 + registry.note_legacy_member(source.clone(), &host()); 427 + assert_eq!(member_count(&store, &member), 1); 428 + 429 + roster.purge_legacy(); 430 + assert_eq!(member_count(&store, &member), 0); 431 + } 432 + 433 + #[test] 434 + fn collaborator_for_unhosted_repo_is_dropped() { 435 + let store = store(); 436 + let repo = did("did:plc:scallop"); 437 + let mut roster = Roster::new(store.clone(), knot(), empty_registry(), host()); 438 + roster.apply_collaborator(AclOp::Add, repo.clone(), did("did:plc:olaren"), Cursor(7)); 439 + assert_eq!( 440 + collaborator_count(&store, &repo), 441 + 0, 442 + "a knot cannot assert collaborators on a repo it does not host" 443 + ); 444 + } 445 + }
+446
bobbin/crates/knot-ingest/src/stream.rs
··· 1 + use std::sync::{Arc, Mutex}; 2 + use std::time::Duration; 3 + 4 + use bobbin_knot_proxy::KnotHost; 5 + use bobbin_runtime::{Clock, WsConn, WsMessage, WsTransport}; 6 + use jacquard_common::DefaultStr; 7 + use jacquard_common::types::did::Did; 8 + use serde::Deserialize; 9 + use serde_json::value::RawValue; 10 + use tokio_util::sync::CancellationToken; 11 + use url::Url; 12 + 13 + use crate::client::authority; 14 + use crate::roster::{AclOp, Cursor, Roster}; 15 + 16 + const KNOT_MEMBER_UPDATE_NSID: &str = "sh.tangled.knot.memberUpdate"; 17 + const REPO_COLLABORATOR_UPDATE_NSID: &str = "sh.tangled.repo.collaboratorUpdate"; 18 + const RECONNECT_INITIAL: Duration = Duration::from_secs(1); 19 + const RECONNECT_MAX: Duration = Duration::from_secs(60); 20 + const HEALTHY_SESSION_MIN: Duration = Duration::from_secs(15); 21 + 22 + #[derive(Clone)] 23 + pub struct StreamConfig { 24 + pub ws: Arc<dyn WsTransport>, 25 + pub clock: Arc<dyn Clock>, 26 + pub cancel: CancellationToken, 27 + } 28 + 29 + enum SessionEnd { 30 + Cancelled, 31 + Closed { progressed: bool }, 32 + ConnectFailed, 33 + } 34 + 35 + pub async fn run_stream( 36 + cfg: &StreamConfig, 37 + host: &KnotHost, 38 + roster: &Mutex<Roster>, 39 + initial_cursor: i64, 40 + ) { 41 + let mut cursor = initial_cursor; 42 + let mut backoff = RECONNECT_INITIAL; 43 + loop { 44 + if cfg.cancel.is_cancelled() { 45 + return; 46 + } 47 + let started = cfg.clock.now_instant(); 48 + let end = run_session(cfg, host, roster, &mut cursor).await; 49 + if matches!(&end, SessionEnd::Cancelled) { 50 + return; 51 + } 52 + let elapsed = cfg.clock.now_instant().saturating_duration_since(started); 53 + if matches!(&end, SessionEnd::Closed { progressed: false }) && cursor != 0 { 54 + cursor = 0; 55 + } 56 + if session_was_healthy(&end, elapsed) { 57 + backoff = RECONNECT_INITIAL; 58 + } 59 + let delay = jitter_delay(backoff, cfg.clock.now_unix_micros().raw()); 60 + tokio::select! { 61 + _ = cfg.cancel.cancelled() => return, 62 + _ = cfg.clock.sleep(delay) => {} 63 + } 64 + backoff = (backoff * 2).min(RECONNECT_MAX); 65 + } 66 + } 67 + 68 + fn session_was_healthy(end: &SessionEnd, elapsed: Duration) -> bool { 69 + matches!(end, SessionEnd::Closed { progressed: true }) && elapsed >= HEALTHY_SESSION_MIN 70 + } 71 + 72 + fn jitter_delay(base: Duration, entropy: u64) -> Duration { 73 + let frac = (entropy % 1024) as f64 / 1024.0; 74 + base.mul_f64(0.5 + 0.5 * frac) 75 + } 76 + 77 + async fn run_session( 78 + cfg: &StreamConfig, 79 + host: &KnotHost, 80 + roster: &Mutex<Roster>, 81 + cursor: &mut i64, 82 + ) -> SessionEnd { 83 + let Some(url) = events_url(host, *cursor) else { 84 + return SessionEnd::ConnectFailed; 85 + }; 86 + let conn = tokio::select! { 87 + _ = cfg.cancel.cancelled() => return SessionEnd::Cancelled, 88 + res = cfg.ws.connect(url) => match res { 89 + Ok(conn) => conn, 90 + Err(err) => { 91 + tracing::warn!(host = %authority(host), error = %err, "knot eventstream connect failed"); 92 + return SessionEnd::ConnectFailed; 93 + } 94 + }, 95 + }; 96 + let WsConn { 97 + mut sink, 98 + mut stream, 99 + } = conn; 100 + let mut progressed = false; 101 + loop { 102 + let message = tokio::select! { 103 + _ = cfg.cancel.cancelled() => return SessionEnd::Cancelled, 104 + message = stream.next() => message, 105 + }; 106 + match message { 107 + None => return SessionEnd::Closed { progressed }, 108 + Some(Ok(WsMessage::Text(text))) => { 109 + progressed = true; 110 + process_frame(&text, roster, cursor); 111 + } 112 + Some(Ok(WsMessage::Ping(payload))) => { 113 + progressed = true; 114 + let _ = sink.send(WsMessage::Pong(payload)).await; 115 + } 116 + Some(Ok(WsMessage::Close { .. })) => return SessionEnd::Closed { progressed }, 117 + Some(Ok(_)) => {} 118 + Some(Err(err)) => { 119 + tracing::warn!(host = %authority(host), error = %err, "knot eventstream read error"); 120 + return SessionEnd::Closed { progressed }; 121 + } 122 + } 123 + } 124 + } 125 + 126 + fn process_frame(text: &str, roster: &Mutex<Roster>, cursor: &mut i64) { 127 + let Ok(frame) = serde_json::from_str::<FrameWire>(text) else { 128 + return; 129 + }; 130 + *cursor = frame.created; 131 + match frame.nsid.as_str() { 132 + KNOT_MEMBER_UPDATE_NSID => { 133 + if let Ok(update) = serde_json::from_str::<MemberUpdate>(frame.event.get()) { 134 + roster.lock().unwrap().apply_member( 135 + update.op, 136 + update.subject, 137 + Cursor(frame.created), 138 + ); 139 + } 140 + } 141 + REPO_COLLABORATOR_UPDATE_NSID => { 142 + if let Ok(update) = serde_json::from_str::<CollaboratorUpdate>(frame.event.get()) { 143 + roster.lock().unwrap().apply_collaborator( 144 + update.op, 145 + update.repo, 146 + update.subject, 147 + Cursor(frame.created), 148 + ); 149 + } 150 + } 151 + _ => {} 152 + } 153 + } 154 + 155 + fn events_url(host: &KnotHost, cursor: i64) -> Option<Url> { 156 + let scheme = if host.url().scheme() == "https" { 157 + "wss" 158 + } else { 159 + "ws" 160 + }; 161 + let base = format!("{scheme}://{}/events", authority(host)); 162 + let full = if cursor != 0 { 163 + format!("{base}?cursor={cursor}") 164 + } else { 165 + base 166 + }; 167 + Url::parse(&full).ok() 168 + } 169 + 170 + #[derive(Deserialize)] 171 + struct FrameWire { 172 + nsid: String, 173 + event: Box<RawValue>, 174 + created: i64, 175 + } 176 + 177 + #[derive(Deserialize)] 178 + struct MemberUpdate { 179 + op: AclOp, 180 + subject: Did<DefaultStr>, 181 + } 182 + 183 + #[derive(Deserialize)] 184 + struct CollaboratorUpdate { 185 + op: AclOp, 186 + subject: Did<DefaultStr>, 187 + repo: Did<DefaultStr>, 188 + } 189 + 190 + #[cfg(test)] 191 + mod tests { 192 + use super::*; 193 + use std::collections::VecDeque; 194 + use std::sync::Mutex; 195 + 196 + use bobbin_edge_index::EdgeStore; 197 + use bobbin_runtime::{ 198 + NetworkError, SystemClock, WsConnectFuture, WsMessageFuture, WsSendFuture, WsSink, WsStream, 199 + }; 200 + use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 201 + use bobbin_types::knot_acl; 202 + use bytes::Bytes; 203 + 204 + use crate::registry::KnotRegistry; 205 + 206 + struct ScriptStream { 207 + msgs: VecDeque<WsMessage>, 208 + } 209 + impl WsStream for ScriptStream { 210 + fn next<'a>(&'a mut self) -> WsMessageFuture<'a> { 211 + Box::pin(async move { self.msgs.pop_front().map(Ok) }) 212 + } 213 + } 214 + 215 + struct RecordSink { 216 + sent: Arc<Mutex<Vec<WsMessage>>>, 217 + } 218 + impl WsSink for RecordSink { 219 + fn send<'a>(&'a mut self, message: WsMessage) -> WsSendFuture<'a> { 220 + let sent = self.sent.clone(); 221 + Box::pin(async move { 222 + sent.lock().unwrap().push(message); 223 + Ok(()) 224 + }) 225 + } 226 + } 227 + 228 + struct ScriptWs { 229 + msgs: Mutex<Option<VecDeque<WsMessage>>>, 230 + sent: Arc<Mutex<Vec<WsMessage>>>, 231 + fail: bool, 232 + } 233 + impl WsTransport for ScriptWs { 234 + fn connect(&self, _url: Url) -> WsConnectFuture { 235 + if self.fail { 236 + return Box::pin(async { 237 + Err(NetworkError::Connect("scripted failure".to_owned())) 238 + }); 239 + } 240 + let msgs = self.msgs.lock().unwrap().take().unwrap_or_default(); 241 + let sent = self.sent.clone(); 242 + Box::pin(async move { 243 + Ok(WsConn { 244 + sink: Box::new(RecordSink { sent }), 245 + stream: Box::new(ScriptStream { msgs }), 246 + }) 247 + }) 248 + } 249 + } 250 + 251 + fn member_frame(op: &str, subject: &str, created: i64) -> String { 252 + format!( 253 + r#"{{"rkey":"r{created}","nsid":"sh.tangled.knot.memberUpdate","event":{{"op":"{op}","subject":"{subject}"}},"created":{created}}}"# 254 + ) 255 + } 256 + 257 + fn collab_frame(op: &str, subject: &str, repo: &str, created: i64) -> String { 258 + format!( 259 + r#"{{"rkey":"r{created}","nsid":"sh.tangled.repo.collaboratorUpdate","event":{{"op":"{op}","subject":"{subject}","repo":"{repo}"}},"created":{created}}}"# 260 + ) 261 + } 262 + 263 + fn did(s: &str) -> Did<DefaultStr> { 264 + Did::new_owned(s).unwrap() 265 + } 266 + 267 + fn member_count(store: &EdgeStore, subject: &str) -> u64 { 268 + store.count(&EdgeKey::new( 269 + nsid_static("sh.tangled.knot.member"), 270 + SubjectRef::Did(did(subject)), 271 + )) 272 + } 273 + 274 + fn collaborator_count(store: &EdgeStore, repo: &str) -> u64 { 275 + store.count(&EdgeKey::new( 276 + nsid_static("sh.tangled.repo.collaborator"), 277 + SubjectRef::Did(did(repo)), 278 + )) 279 + } 280 + 281 + fn cfg(ws: Arc<dyn WsTransport>, cancel: CancellationToken) -> StreamConfig { 282 + StreamConfig { 283 + ws, 284 + clock: Arc::new(SystemClock::new()), 285 + cancel, 286 + } 287 + } 288 + 289 + #[tokio::test] 290 + async fn session_dispatches_deltas_pongs_and_advances_cursor() { 291 + let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default())); 292 + let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap(); 293 + let registry = Arc::new(KnotRegistry::new()); 294 + registry.observe_repo( 295 + &knot_acl::KnotHostKey::new("oyster.cafe"), 296 + Did::new_owned("did:plc:scallop").unwrap(), 297 + ); 298 + let roster = Mutex::new(Roster::new( 299 + store.clone(), 300 + knot, 301 + registry, 302 + knot_acl::KnotHostKey::new("oyster.cafe"), 303 + )); 304 + let frames = VecDeque::from(vec![ 305 + WsMessage::Text(member_frame("add", "did:plc:boltless", 100)), 306 + WsMessage::Text(collab_frame( 307 + "add", 308 + "did:plc:olaren", 309 + "did:plc:scallop", 310 + 200, 311 + )), 312 + WsMessage::Ping(Bytes::from_static(b"ka")), 313 + WsMessage::Text(member_frame("remove", "did:plc:boltless", 300)), 314 + WsMessage::Close { 315 + code: 1000, 316 + reason: String::new(), 317 + }, 318 + ]); 319 + let sent = Arc::new(Mutex::new(Vec::new())); 320 + let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs { 321 + msgs: Mutex::new(Some(frames)), 322 + sent: sent.clone(), 323 + fail: false, 324 + }); 325 + let config = cfg(ws, CancellationToken::new()); 326 + let host = KnotHost::parse("http://oyster.cafe").unwrap(); 327 + let mut cursor = 0i64; 328 + 329 + let end = run_session(&config, &host, &roster, &mut cursor).await; 330 + 331 + assert!(matches!(end, SessionEnd::Closed { progressed: true })); 332 + assert_eq!(cursor, 300); 333 + assert_eq!(member_count(&store, "did:plc:boltless"), 0); 334 + assert_eq!(collaborator_count(&store, "did:plc:scallop"), 1); 335 + let sent = sent.lock().unwrap(); 336 + assert_eq!(sent.len(), 1); 337 + assert!(matches!(&sent[0], WsMessage::Pong(p) if p.as_ref() == b"ka")); 338 + } 339 + 340 + #[tokio::test] 341 + async fn session_reports_no_progress_on_immediate_close() { 342 + let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default())); 343 + let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap(); 344 + let roster = Mutex::new(Roster::new( 345 + store, 346 + knot, 347 + Arc::new(KnotRegistry::new()), 348 + knot_acl::KnotHostKey::new("oyster.cafe"), 349 + )); 350 + let frames = VecDeque::from(vec![WsMessage::Close { 351 + code: 1000, 352 + reason: String::new(), 353 + }]); 354 + let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs { 355 + msgs: Mutex::new(Some(frames)), 356 + sent: Arc::new(Mutex::new(Vec::new())), 357 + fail: false, 358 + }); 359 + let config = cfg(ws, CancellationToken::new()); 360 + let host = KnotHost::parse("http://oyster.cafe").unwrap(); 361 + let mut cursor = 99i64; 362 + 363 + let end = run_session(&config, &host, &roster, &mut cursor).await; 364 + 365 + assert!( 366 + matches!(end, SessionEnd::Closed { progressed: false }), 367 + "a session that delivers no frames before closing reports no progress" 368 + ); 369 + } 370 + 371 + #[tokio::test] 372 + async fn pre_cancelled_stream_returns_without_connecting() { 373 + let store = Arc::new(EdgeStore::new(bobbin_runtime::RuntimeHasher::default())); 374 + let knot = knot_acl::host_to_knot_did("oyster.cafe").unwrap(); 375 + let roster = Mutex::new(Roster::new( 376 + store, 377 + knot, 378 + Arc::new(KnotRegistry::new()), 379 + knot_acl::KnotHostKey::new("oyster.cafe"), 380 + )); 381 + let ws: Arc<dyn WsTransport> = Arc::new(ScriptWs { 382 + msgs: Mutex::new(None), 383 + sent: Arc::new(Mutex::new(Vec::new())), 384 + fail: true, 385 + }); 386 + let cancel = CancellationToken::new(); 387 + cancel.cancel(); 388 + let config = cfg(ws, cancel); 389 + let host = KnotHost::parse("http://oyster.cafe").unwrap(); 390 + 391 + run_stream(&config, &host, &roster, 0).await; 392 + } 393 + 394 + #[test] 395 + fn events_url_carries_scheme_and_cursor() { 396 + let host = KnotHost::parse("http://oyster.cafe").unwrap(); 397 + assert_eq!( 398 + events_url(&host, 0).unwrap().as_str(), 399 + "ws://oyster.cafe/events" 400 + ); 401 + assert_eq!( 402 + events_url(&host, 42).unwrap().as_str(), 403 + "ws://oyster.cafe/events?cursor=42" 404 + ); 405 + let secure = KnotHost::parse("https://nel.pet").unwrap(); 406 + assert_eq!( 407 + events_url(&secure, 7).unwrap().as_str(), 408 + "wss://nel.pet/events?cursor=7" 409 + ); 410 + } 411 + 412 + #[test] 413 + fn only_a_long_lived_session_resets_backoff() { 414 + assert!(session_was_healthy( 415 + &SessionEnd::Closed { progressed: true }, 416 + HEALTHY_SESSION_MIN 417 + )); 418 + assert!( 419 + !session_was_healthy( 420 + &SessionEnd::Closed { progressed: true }, 421 + HEALTHY_SESSION_MIN - Duration::from_millis(1) 422 + ), 423 + "a knot that flaps faster than the healthy floor must keep backing off" 424 + ); 425 + assert!(!session_was_healthy( 426 + &SessionEnd::Closed { progressed: false }, 427 + Duration::from_secs(3600) 428 + )); 429 + assert!(!session_was_healthy( 430 + &SessionEnd::ConnectFailed, 431 + Duration::from_secs(3600) 432 + )); 433 + } 434 + 435 + #[test] 436 + fn jitter_delay_stays_within_half_to_full_window() { 437 + let base = Duration::from_secs(8); 438 + [0u64, 1, 511, 512, 1023, 1024, u64::MAX] 439 + .into_iter() 440 + .for_each(|entropy| { 441 + let delay = jitter_delay(base, entropy); 442 + assert!(delay >= base / 2, "delay {delay:?} below half of {base:?}"); 443 + assert!(delay <= base, "delay {delay:?} above {base:?}"); 444 + }); 445 + } 446 + }
+2 -2
bobbin/crates/knot-proxy/src/dns.rs
··· 7 7 8 8 use crate::host::{PrivateHostReason, classify_ip}; 9 9 10 - pub(crate) struct PrivateAddressFilter { 10 + pub struct PrivateAddressFilter { 11 11 allow_private: bool, 12 12 } 13 13 14 14 impl PrivateAddressFilter { 15 - pub(crate) fn new(allow_private: bool) -> Self { 15 + pub fn new(allow_private: bool) -> Self { 16 16 Self { allow_private } 17 17 } 18 18 }
+2 -1
bobbin/crates/knot-proxy/src/lib.rs
··· 22 22 mod host; 23 23 24 24 pub use breaker::{Breaker, BreakerPermit, CircuitOpen, FailureThreshold, ThresholdError}; 25 - pub use host::{KnotHost, KnotHostError, PrivateHostReason, RepoSlug, RepoSlugError}; 25 + pub use dns::PrivateAddressFilter; 26 + pub use host::{KnotHost, KnotHostError, PrivateHostReason, RepoSlug, RepoSlugError, classify_ip}; 26 27 27 28 const USER_AGENT: &str = concat!("bobbin/", env!("CARGO_PKG_VERSION")); 28 29 const HTTPS_SCHEME: &str = "https";
+3 -3
bobbin/crates/runtime/src/lib.rs
··· 14 14 MemWsResponder, MemWsServerFuture, MemWsTransport, 15 15 }; 16 16 pub use network::{ 17 - BodyStream, HttpRequest, HttpResponseFuture, HttpResponseHead, HttpResult, HttpTransport, 18 - NetworkError, ReqwestHttp, TungsteniteWs, WsConn, WsConnectFuture, WsMessage, WsMessageFuture, 19 - WsSendFuture, WsSink, WsStream, WsTransport, 17 + AddrGuard, BodyStream, GuardedWs, HttpRequest, HttpResponseFuture, HttpResponseHead, 18 + HttpResult, HttpTransport, NetworkError, ReqwestHttp, TungsteniteWs, WsConn, WsConnectFuture, 19 + WsMessage, WsMessageFuture, WsSendFuture, WsSink, WsStream, WsTransport, 20 20 };
+49
bobbin/crates/runtime/src/network.rs
··· 1 1 use std::future::Future; 2 + use std::net::SocketAddr; 2 3 use std::pin::Pin; 3 4 use std::sync::Arc; 4 5 ··· 137 138 fn connect(&self, url: Url) -> WsConnectFuture; 138 139 } 139 140 141 + pub type AddrGuard = Arc<dyn Fn(&[SocketAddr]) -> Result<(), NetworkError> + Send + Sync>; 142 + 140 143 #[derive(Clone, Copy, Debug, Default)] 141 144 pub struct TungsteniteWs; 142 145 ··· 151 154 Box::pin(async move { 152 155 let url_str = url.as_str().to_owned(); 153 156 let (ws, _resp) = tokio_tungstenite::connect_async(&url_str) 157 + .await 158 + .map_err(|e| NetworkError::Connect(e.to_string()))?; 159 + let (sink_inner, stream_inner) = futures::StreamExt::split(ws); 160 + let sink: Box<dyn WsSink> = Box::new(TungsteniteSink { inner: sink_inner }); 161 + let stream: Box<dyn WsStream> = Box::new(TungsteniteStream { 162 + inner: stream_inner, 163 + }); 164 + Ok(WsConn { sink, stream }) 165 + }) 166 + } 167 + } 168 + 169 + pub struct GuardedWs { 170 + guard: AddrGuard, 171 + } 172 + 173 + impl GuardedWs { 174 + pub fn shared(guard: AddrGuard) -> Arc<dyn WsTransport> { 175 + Arc::new(Self { guard }) 176 + } 177 + } 178 + 179 + impl WsTransport for GuardedWs { 180 + fn connect(&self, url: Url) -> WsConnectFuture { 181 + let guard = self.guard.clone(); 182 + Box::pin(async move { 183 + let host = url 184 + .host_str() 185 + .ok_or_else(|| NetworkError::Connect("ws url missing host".to_owned()))? 186 + .to_owned(); 187 + let port = url 188 + .port_or_known_default() 189 + .ok_or_else(|| NetworkError::Connect("ws url missing port".to_owned()))?; 190 + let addrs: Vec<SocketAddr> = tokio::net::lookup_host((host.as_str(), port)) 191 + .await 192 + .map_err(|e| NetworkError::Connect(e.to_string()))? 193 + .collect(); 194 + guard(&addrs)?; 195 + let addr = addrs 196 + .into_iter() 197 + .next() 198 + .ok_or_else(|| NetworkError::Connect(format!("no addresses for {host}")))?; 199 + let tcp = tokio::net::TcpStream::connect(addr) 200 + .await 201 + .map_err(|e| NetworkError::Connect(e.to_string()))?; 202 + let (ws, _resp) = tokio_tungstenite::client_async_tls(url.as_str(), tcp) 154 203 .await 155 204 .map_err(|e| NetworkError::Connect(e.to_string()))?; 156 205 let (sink_inner, stream_inner) = futures::StreamExt::split(ws);
+289
bobbin/crates/types/src/knot_acl.rs
··· 1 + use core::fmt; 2 + 3 + use jacquard_common::DefaultStr; 4 + use jacquard_common::types::did::Did; 5 + use jacquard_common::types::ident::AtIdentifier; 6 + use jacquard_common::types::string::AtUri; 7 + 8 + use crate::edges::Edge; 9 + use crate::ids::{SubjectRef, nsid_static}; 10 + 11 + pub const KNOT_MEMBER_COLLECTION: &str = "sh.tangled.bobbin.knotMember"; 12 + pub const KNOT_COLLABORATOR_COLLECTION: &str = "sh.tangled.bobbin.knotCollaborator"; 13 + 14 + const KNOT_MEMBER_KIND: &str = "sh.tangled.knot.member"; 15 + const REPO_COLLABORATOR_KIND: &str = "sh.tangled.repo.collaborator"; 16 + 17 + const DID_WEB_PREFIX: &str = "did:web:"; 18 + 19 + #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] 20 + pub struct KnotHostKey(String); 21 + 22 + impl KnotHostKey { 23 + pub fn new(host: &str) -> Self { 24 + Self(host.trim_end_matches('.').to_ascii_lowercase()) 25 + } 26 + 27 + pub fn as_str(&self) -> &str { 28 + &self.0 29 + } 30 + } 31 + 32 + impl AsRef<str> for KnotHostKey { 33 + fn as_ref(&self) -> &str { 34 + &self.0 35 + } 36 + } 37 + 38 + impl fmt::Display for KnotHostKey { 39 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 40 + f.write_str(&self.0) 41 + } 42 + } 43 + 44 + #[derive(Clone, Debug, Eq, PartialEq)] 45 + pub enum KnotOwnedSource { 46 + Member { 47 + knot: Did<DefaultStr>, 48 + subject: Did<DefaultStr>, 49 + }, 50 + Collaborator { 51 + repo: Did<DefaultStr>, 52 + subject: Did<DefaultStr>, 53 + }, 54 + } 55 + 56 + pub fn host_to_knot_did(host: &str) -> Option<Did<DefaultStr>> { 57 + if host.contains('/') { 58 + return None; 59 + } 60 + let normalized = KnotHostKey::new(host); 61 + let host = normalized.as_str(); 62 + if host.is_empty() { 63 + return None; 64 + } 65 + Did::new_owned(format!("{DID_WEB_PREFIX}{}", host.replace(':', "%3A"))).ok() 66 + } 67 + 68 + pub fn knot_did_host(knot: &Did<DefaultStr>) -> Option<String> { 69 + let encoded = knot.as_ref().strip_prefix(DID_WEB_PREFIX)?; 70 + if encoded.is_empty() { 71 + return None; 72 + } 73 + Some(encoded.replace("%3A", ":")) 74 + } 75 + 76 + pub fn member_source( 77 + knot: &Did<DefaultStr>, 78 + subject: &Did<DefaultStr>, 79 + ) -> Option<AtUri<DefaultStr>> { 80 + build_source(knot.as_ref(), KNOT_MEMBER_COLLECTION, subject.as_ref()) 81 + } 82 + 83 + pub fn collaborator_source( 84 + repo: &Did<DefaultStr>, 85 + subject: &Did<DefaultStr>, 86 + ) -> Option<AtUri<DefaultStr>> { 87 + build_source( 88 + repo.as_ref(), 89 + KNOT_COLLABORATOR_COLLECTION, 90 + subject.as_ref(), 91 + ) 92 + } 93 + 94 + pub fn member_upsert( 95 + knot: &Did<DefaultStr>, 96 + subject: &Did<DefaultStr>, 97 + created_micros: u64, 98 + ) -> Option<(AtUri<DefaultStr>, Vec<Edge>)> { 99 + let source = member_source(knot, subject)?; 100 + let edge = Edge { 101 + kind: nsid_static(KNOT_MEMBER_KIND), 102 + subject: SubjectRef::Did(subject.clone()), 103 + source: source.clone(), 104 + sort_micros: created_micros, 105 + }; 106 + Some((source, vec![edge])) 107 + } 108 + 109 + pub fn collaborator_upsert( 110 + repo: &Did<DefaultStr>, 111 + subject: &Did<DefaultStr>, 112 + created_micros: u64, 113 + ) -> Option<(AtUri<DefaultStr>, Vec<Edge>)> { 114 + let source = collaborator_source(repo, subject)?; 115 + let edge = Edge { 116 + kind: nsid_static(REPO_COLLABORATOR_KIND), 117 + subject: SubjectRef::Did(repo.clone()), 118 + source: source.clone(), 119 + sort_micros: created_micros, 120 + }; 121 + Some((source, vec![edge])) 122 + } 123 + 124 + pub fn decode_knot_owned_source(source: &AtUri<DefaultStr>) -> Option<KnotOwnedSource> { 125 + let collection = source.collection()?; 126 + let collection = collection.as_ref(); 127 + if collection != KNOT_MEMBER_COLLECTION && collection != KNOT_COLLABORATOR_COLLECTION { 128 + return None; 129 + } 130 + let AtIdentifier::Did(authority) = source.authority() else { 131 + return None; 132 + }; 133 + let subject = Did::new_owned(source.rkey()?.as_ref()).ok()?; 134 + match collection { 135 + KNOT_MEMBER_COLLECTION => Some(KnotOwnedSource::Member { 136 + knot: Did::new_owned(authority.as_ref()).ok()?, 137 + subject, 138 + }), 139 + KNOT_COLLABORATOR_COLLECTION => Some(KnotOwnedSource::Collaborator { 140 + repo: Did::new_owned(authority.as_ref()).ok()?, 141 + subject, 142 + }), 143 + _ => None, 144 + } 145 + } 146 + 147 + fn build_source(authority: &str, collection: &str, rkey: &str) -> Option<AtUri<DefaultStr>> { 148 + AtUri::new_owned(format!("at://{authority}/{collection}/{rkey}")).ok() 149 + } 150 + 151 + #[cfg(test)] 152 + mod tests { 153 + use super::*; 154 + 155 + fn did(s: &str) -> Did<DefaultStr> { 156 + Did::new_owned(s).unwrap() 157 + } 158 + 159 + fn at(s: &str) -> AtUri<DefaultStr> { 160 + AtUri::new_owned(s).unwrap() 161 + } 162 + 163 + #[test] 164 + fn host_did_round_trips() { 165 + let knot = host_to_knot_did("oyster.cafe").unwrap(); 166 + assert_eq!(knot.as_ref(), "did:web:oyster.cafe"); 167 + assert_eq!(knot_did_host(&knot), Some("oyster.cafe".to_owned())); 168 + } 169 + 170 + #[test] 171 + fn host_did_round_trips_with_port() { 172 + let knot = host_to_knot_did("oyster.cafe:3000").unwrap(); 173 + assert_eq!(knot.as_ref(), "did:web:oyster.cafe%3A3000"); 174 + assert_eq!(knot_did_host(&knot), Some("oyster.cafe:3000".to_owned())); 175 + } 176 + 177 + #[test] 178 + fn host_rejects_slash_and_empty() { 179 + assert_eq!(host_to_knot_did("oyster.cafe/evil"), None); 180 + assert_eq!(host_to_knot_did(""), None); 181 + assert_eq!(host_to_knot_did("."), None); 182 + } 183 + 184 + #[test] 185 + fn host_key_normalizes_case_and_trailing_dot() { 186 + assert_eq!( 187 + KnotHostKey::new("Kt.Oyster.Cafe.").as_str(), 188 + "kt.oyster.cafe" 189 + ); 190 + assert_eq!( 191 + KnotHostKey::new("KT.OYSTER.CAFE:3000").as_str(), 192 + "kt.oyster.cafe:3000" 193 + ); 194 + assert_eq!( 195 + KnotHostKey::new("kt.oyster.cafe"), 196 + KnotHostKey::new("KT.OYSTER.CAFE") 197 + ); 198 + } 199 + 200 + #[test] 201 + fn host_to_knot_did_normalizes_before_encoding() { 202 + assert_eq!( 203 + host_to_knot_did("KT.Oyster.Cafe").unwrap().as_ref(), 204 + "did:web:kt.oyster.cafe" 205 + ); 206 + assert_eq!( 207 + host_to_knot_did("KT.Oyster.Cafe:3000").unwrap().as_ref(), 208 + "did:web:kt.oyster.cafe%3A3000" 209 + ); 210 + } 211 + 212 + #[test] 213 + fn member_source_round_trips() { 214 + let knot = host_to_knot_did("oyster.cafe").unwrap(); 215 + let subject = did("did:plc:nel"); 216 + let source = member_source(&knot, &subject).expect("build member source"); 217 + assert_eq!( 218 + source.as_ref(), 219 + "at://did:web:oyster.cafe/sh.tangled.bobbin.knotMember/did:plc:nel" 220 + ); 221 + assert_eq!( 222 + decode_knot_owned_source(&source), 223 + Some(KnotOwnedSource::Member { knot, subject }) 224 + ); 225 + } 226 + 227 + #[test] 228 + fn collaborator_source_round_trips() { 229 + let repo = did("did:plc:scallop"); 230 + let subject = did("did:plc:olaren"); 231 + let source = collaborator_source(&repo, &subject).expect("build collaborator source"); 232 + assert_eq!( 233 + source.as_ref(), 234 + "at://did:plc:scallop/sh.tangled.bobbin.knotCollaborator/did:plc:olaren" 235 + ); 236 + assert_eq!( 237 + decode_knot_owned_source(&source), 238 + Some(KnotOwnedSource::Collaborator { repo, subject }) 239 + ); 240 + } 241 + 242 + #[test] 243 + fn decode_ignores_legacy_pds_member_record() { 244 + let source = at("at://did:plc:nel/sh.tangled.knot.member/abcabcabcabcz"); 245 + assert_eq!(decode_knot_owned_source(&source), None); 246 + } 247 + 248 + #[test] 249 + fn decode_ignores_handle_authority() { 250 + let source = at("at://oyster.cafe/sh.tangled.bobbin.knotMember/did:plc:nel"); 251 + assert_eq!(decode_knot_owned_source(&source), None); 252 + } 253 + 254 + #[test] 255 + fn member_upsert_builds_decodable_primary_edge() { 256 + let knot = host_to_knot_did("oyster.cafe").unwrap(); 257 + let subject = did("did:plc:nel"); 258 + let (source, edges) = 259 + member_upsert(&knot, &subject, 1_700_000_000_000_000).expect("build member upsert"); 260 + assert_eq!(edges.len(), 1); 261 + let edge = &edges[0]; 262 + assert_eq!(edge.kind.as_ref(), "sh.tangled.knot.member"); 263 + assert_eq!(edge.subject, SubjectRef::Did(subject.clone())); 264 + assert_eq!(edge.source, source); 265 + assert_eq!(edge.sort_micros, 1_700_000_000_000_000); 266 + assert_eq!( 267 + decode_knot_owned_source(&source), 268 + Some(KnotOwnedSource::Member { knot, subject }) 269 + ); 270 + } 271 + 272 + #[test] 273 + fn collaborator_upsert_keys_on_repo_did() { 274 + let repo = did("did:plc:scallop"); 275 + let subject = did("did:plc:olaren"); 276 + let (source, edges) = 277 + collaborator_upsert(&repo, &subject, 42).expect("build collaborator upsert"); 278 + assert_eq!(edges.len(), 1); 279 + let edge = &edges[0]; 280 + assert_eq!(edge.kind.as_ref(), "sh.tangled.repo.collaborator"); 281 + assert_eq!(edge.subject, SubjectRef::Did(repo.clone())); 282 + assert_eq!(edge.source, source); 283 + assert_eq!(edge.sort_micros, 42); 284 + assert_eq!( 285 + decode_knot_owned_source(&source), 286 + Some(KnotOwnedSource::Collaborator { repo, subject }) 287 + ); 288 + } 289 + }
+1
bobbin/crates/types/src/lib.rs
··· 19 19 20 20 pub mod edges; 21 21 pub mod ids; 22 + pub mod knot_acl; 22 23 pub mod legacy; 23 24 pub mod record; 24 25 pub mod search;
+66 -6
bobbin/crates/xrpc/src/lib.rs
··· 23 23 routing::get, 24 24 }; 25 25 use bobbin_edge_index::{ 26 - Coverage, CoverageWatch, CursorParseError, EdgePage, EdgeStore, IssueStateKind, PageCursor, 27 - PageLimit, PageToken, PullStatusKind, SortDir, StateIndex, StateKind, 26 + Coverage, CoverageWatch, CursorParseError, EdgeItem, EdgePage, EdgeStore, IssueStateKind, 27 + PageCursor, PageLimit, PageToken, PullStatusKind, SortDir, StateIndex, StateKind, 28 28 }; 29 29 use bobbin_knot_proxy::{KnotHost, KnotProxy, KnotProxyError, ProxyResponse, RepoSlug}; 30 30 use bobbin_record_lru::RecordStore; ··· 34 34 }; 35 35 use bobbin_slingshot_client::{SlingshotClient, SlingshotError}; 36 36 use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 37 + use bobbin_types::knot_acl::{KnotOwnedSource, decode_knot_owned_source, knot_did_host}; 37 38 use bobbin_types::record::RecordBody; 38 39 use bobbin_types::search::SearchableRecord; 39 40 use bobbin_types::sh_tangled::actor::profile::{Profile, ProfileGetRecordOutput, ProfileRecord}; ··· 1434 1435 state: &AppState, 1435 1436 nsid: &Nsid<DefaultStr>, 1436 1437 uri: AtUri<DefaultStr>, 1438 + sort_micros: u64, 1437 1439 ) -> Result<Option<RecordView<V>>, XrpcError> 1438 1440 where 1439 1441 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1440 1442 { 1443 + if let Some(source) = decode_knot_owned_source(&uri) { 1444 + return synthesize_knot_owned_view::<V>(state, uri, source, sort_micros).await; 1445 + } 1441 1446 let body = resolve_for_view(state, nsid, uri).await?; 1442 1447 let value: V = deserialize_or_upgrade::<V>(state, nsid, &body.value).await?; 1443 1448 let Some(value) = value.normalize(&state.resolver).await else { ··· 1450 1455 })) 1451 1456 } 1452 1457 1458 + async fn synthesize_knot_owned_view<V>( 1459 + state: &AppState, 1460 + uri: AtUri<DefaultStr>, 1461 + source: KnotOwnedSource, 1462 + sort_micros: u64, 1463 + ) -> Result<Option<RecordView<V>>, XrpcError> 1464 + where 1465 + V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1466 + { 1467 + let Some(body) = synth_knot_owned_value(source, sort_micros) else { 1468 + return Ok(None); 1469 + }; 1470 + let Ok(value) = serde_json::from_value::<V>(body) else { 1471 + return Ok(None); 1472 + }; 1473 + let Some(value) = value.normalize(&state.resolver).await else { 1474 + return Ok(None); 1475 + }; 1476 + Ok(Some(RecordView { 1477 + uri, 1478 + cid: None, 1479 + value, 1480 + })) 1481 + } 1482 + 1483 + fn synth_knot_owned_value(source: KnotOwnedSource, sort_micros: u64) -> Option<serde_json::Value> { 1484 + let created_at = micros_to_rfc3339(sort_micros)?; 1485 + match source { 1486 + KnotOwnedSource::Member { knot, subject } => Some(serde_json::json!({ 1487 + "domain": knot_did_host(&knot)?, 1488 + "subject": subject.as_ref(), 1489 + "createdAt": created_at, 1490 + })), 1491 + KnotOwnedSource::Collaborator { repo, subject } => Some(serde_json::json!({ 1492 + "repo": repo.as_ref(), 1493 + "subject": subject.as_ref(), 1494 + "createdAt": created_at, 1495 + })), 1496 + } 1497 + } 1498 + 1499 + fn micros_to_rfc3339(micros: u64) -> Option<String> { 1500 + let micros = i64::try_from(micros).ok()?; 1501 + chrono::DateTime::from_timestamp_micros(micros) 1502 + .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)) 1503 + } 1504 + 1453 1505 #[derive(Clone, Copy)] 1454 1506 enum HitProvenance { 1455 1507 ClientSupplied, ··· 1533 1585 fn hydrate_record_stream<V>( 1534 1586 state: &AppState, 1535 1587 nsid: Nsid<DefaultStr>, 1536 - uris: Vec<AtUri<DefaultStr>>, 1588 + items: Vec<EdgeItem>, 1537 1589 provenance: HitProvenance, 1538 1590 ) -> impl Stream<Item = Result<RecordView<V>, XrpcError>> + Send + 'static 1539 1591 where 1540 1592 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1541 1593 { 1542 1594 let owned = state.clone(); 1543 - hydrate_stream(uris, move |uri| { 1595 + hydrate_stream(items, move |item| { 1544 1596 let owned = owned.clone(); 1545 1597 let nsid = nsid.clone(); 1546 1598 async move { 1547 - let result = hydrate_record_view::<V>(&owned, &nsid, uri.clone()).await; 1599 + let EdgeItem { uri, sort_micros } = item; 1600 + let result = hydrate_record_view::<V>(&owned, &nsid, uri.clone(), sort_micros).await; 1548 1601 if matches!(provenance, HitProvenance::Indexed) 1549 1602 && let Err(err) = &result 1550 1603 && is_index_evictable(err) ··· 1673 1726 ))); 1674 1727 } 1675 1728 let permit = state.heavy_permit()?; 1676 - let views = hydrate_record_stream::<V>(state, nsid, parsed, HitProvenance::ClientSupplied); 1729 + let items = parsed 1730 + .into_iter() 1731 + .map(|uri| EdgeItem { 1732 + uri, 1733 + sort_micros: 0, 1734 + }) 1735 + .collect(); 1736 + let views = hydrate_record_stream::<V>(state, nsid, items, HitProvenance::ClientSupplied); 1677 1737 Ok(json_stream::<RecordView<V>, _>( 1678 1738 "items", 1679 1739 views,
+76
bobbin/crates/xrpc/tests/aggregation.rs
··· 2061 2061 ); 2062 2062 assert_eq!(items[0]["uri"], json!(closed_uri.as_ref())); 2063 2063 } 2064 + 2065 + #[tokio::test] 2066 + async fn knot_owned_member_is_synthesized_without_slingshot() { 2067 + let harness = Harness::new().await; 2068 + let knot = bobbin_types::knot_acl::host_to_knot_did("kt.oyster.cafe").unwrap(); 2069 + let subject = did("did:plc:boltless"); 2070 + let created = chrono::DateTime::parse_from_rfc3339("2026-06-01T00:00:00Z").unwrap(); 2071 + let micros = created.timestamp_micros() as u64; 2072 + let (source, edges) = bobbin_types::knot_acl::member_upsert(&knot, &subject, micros).unwrap(); 2073 + harness.edges.upsert_source(&source, edges); 2074 + harness.promote_ready(1, 1); 2075 + 2076 + let (status, body) = json_response( 2077 + router(harness.state.clone()) 2078 + .oneshot(list_request( 2079 + "sh.tangled.knot.listMembers", 2080 + subject.as_ref(), 2081 + &[], 2082 + )) 2083 + .await 2084 + .unwrap(), 2085 + ) 2086 + .await; 2087 + 2088 + assert_eq!(status, StatusCode::OK); 2089 + let items = body["items"].as_array().expect("items array"); 2090 + assert_eq!( 2091 + items.len(), 2092 + 1, 2093 + "synthesized member must hydrate with no slingshot mock mounted" 2094 + ); 2095 + assert_eq!(items[0]["uri"], json!(source.as_ref())); 2096 + assert!(items[0]["cid"].is_null()); 2097 + assert_eq!(items[0]["value"]["domain"], json!("kt.oyster.cafe")); 2098 + assert_eq!(items[0]["value"]["subject"], json!("did:plc:boltless")); 2099 + let got = chrono::DateTime::parse_from_rfc3339( 2100 + items[0]["value"]["createdAt"] 2101 + .as_str() 2102 + .expect("createdAt string"), 2103 + ) 2104 + .unwrap(); 2105 + assert_eq!(got.timestamp_micros(), micros as i64); 2106 + } 2107 + 2108 + #[tokio::test] 2109 + async fn knot_owned_collaborator_is_synthesized_without_slingshot() { 2110 + let harness = Harness::new().await; 2111 + let repo = did("did:plc:scallop"); 2112 + let subject = did("did:plc:olaren"); 2113 + let created = chrono::DateTime::parse_from_rfc3339("2026-06-03T12:00:00Z").unwrap(); 2114 + let micros = created.timestamp_micros() as u64; 2115 + let (source, edges) = 2116 + bobbin_types::knot_acl::collaborator_upsert(&repo, &subject, micros).unwrap(); 2117 + harness.edges.upsert_source(&source, edges); 2118 + harness.promote_ready(1, 1); 2119 + 2120 + let (status, body) = json_response( 2121 + router(harness.state.clone()) 2122 + .oneshot(list_request( 2123 + "sh.tangled.repo.listCollaborators", 2124 + repo.as_ref(), 2125 + &[], 2126 + )) 2127 + .await 2128 + .unwrap(), 2129 + ) 2130 + .await; 2131 + 2132 + assert_eq!(status, StatusCode::OK); 2133 + let items = body["items"].as_array().expect("items array"); 2134 + assert_eq!(items.len(), 1); 2135 + assert_eq!(items[0]["uri"], json!(source.as_ref())); 2136 + assert!(items[0]["cid"].is_null()); 2137 + assert_eq!(items[0]["value"]["repo"], json!("did:plc:scallop")); 2138 + assert_eq!(items[0]["value"]["subject"], json!("did:plc:olaren")); 2139 + }