Now let's take a silly one
1

Configure Feed

Select the types of activity you want to include in your feed.

at main 18 kB View raw
1mod reconstruct; 2 3use std::collections::BTreeSet; 4use std::num::{NonZeroU32, NonZeroU64}; 5use std::path::PathBuf; 6use std::sync::Arc; 7use std::time::Duration; 8 9use tokio_util::sync::CancellationToken; 10 11use anyhow::Context; 12use axum::Json; 13use axum::routing::get; 14use base64::Engine; 15use knot_atproto::Atproto; 16use knot_index::{Index, Resolved}; 17use knot_runtime::{Clock, OsEntropy, ReqwestHttp, SystemClock}; 18use knot_secrets::SealedStore; 19use knot_types::ActorId; 20use knot_xrpc::XrpcState; 21use zeroize::Zeroizing; 22 23const MAINTENANCE_SHUTDOWN_DRAIN: Duration = Duration::from_secs(30); 24const EDGE_SHUTDOWN_DRAIN: Duration = Duration::from_secs(40); 25 26struct IndexRepos(Arc<Index>); 27 28impl knot_maintenance::RepoSource for IndexRepos { 29 fn repos(&self) -> Vec<knot_types::RepoDid> { 30 self.0.hosted_repos() 31 } 32} 33 34fn init_tracing() { 35 let filter = tracing_subscriber::EnvFilter::try_from_default_env() 36 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); 37 tracing_subscriber::fmt() 38 .with_env_filter(filter) 39 .with_writer(std::io::stderr) 40 .init(); 41} 42 43#[tokio::main] 44async fn main() -> anyhow::Result<()> { 45 rustix::process::set_dumpable_behavior(rustix::process::DumpableBehavior::NotDumpable) 46 .context("disable core dumps and ptrace attachment")?; 47 48 if std::env::args().nth(1).as_deref() == Some("config-template") { 49 print!("{}", knot_config::template()); 50 return Ok(()); 51 } 52 53 init_tracing(); 54 55 tracing::info!("!"); 56 tracing::info!("!"); 57 tracing::info!("!"); 58 tracing::info!("> If knot1 was so good then why isn't there a... ( ˶°ㅁ°)"); 59 tracing::info!("..."); 60 tracing::info!("Welcome to knot2!"); 61 tracing::info!("This code was made with love."); 62 tracing::info!("Hachapuri is sho tasty, definitely worth a try. Better than pizza tbh."); 63 tracing::info!("!"); 64 tracing::info!("!"); 65 tracing::info!("!"); 66 67 let config_path = std::env::args().nth(1).map(PathBuf::from); 68 let config = knot_config::load(config_path.as_deref()).context("load configuration")?; 69 config 70 .verify_environment() 71 .context("verify runtime environment")?; 72 73 let knot_did = config.knot_did().context("derive knot DID")?; 74 let object_format = config.object_format().context("parse git.object_format")?; 75 let layout = knot_git::Layout::new(&config.repo.scan_path) 76 .with_default_branch(&config.repo.default_branch) 77 .context("configure default branch")? 78 .with_object_format(object_format) 79 .reserving_meta(&knot_did) 80 .context("reserve meta-repo path")?; 81 82 let swept = knot_pack::sweep_incoming(&config.repo.scan_path); 83 if swept > 0 { 84 tracing::info!(swept, "swept abandoned receive staging directories"); 85 } 86 87 layout 88 .bootstrap_meta(&knot_did) 89 .context("bootstrap meta-repo")?; 90 let meta_path = layout 91 .meta_path(&knot_did) 92 .context("resolve meta-repo path")?; 93 94 let index = Arc::new(Index::new(meta_path.clone(), layout.clone())); 95 index.rebuild().context("rebuild index from meta-repo")?; 96 tracing::info!(coverage = ?index.coverage(), "index ready"); 97 98 let warm = Arc::clone(&index); 99 tokio::task::spawn_blocking(move || warm.warm_collaborators()); 100 101 let http = ReqwestHttp::new(config.http_limits()).context("build outbound HTTP client")?; 102 let git_http: Arc<dyn knot_runtime::HttpTransport> = Arc::new( 103 ReqwestHttp::new(config.fork_http_limits()).context("build outbound git fetch client")?, 104 ); 105 let atproto = Arc::new(Atproto::new( 106 http, 107 SystemClock, 108 knot_did.clone(), 109 config.atproto.plc_directory.clone(), 110 )); 111 let admins: BTreeSet<_> = config.server.admins.iter().cloned().collect(); 112 let admission = config.acl.admission; 113 let service_owner = config 114 .server 115 .admins 116 .first() 117 .cloned() 118 .context("at least one admin is configured")?; 119 120 let master_key = Zeroizing::new( 121 base64::engine::general_purpose::STANDARD 122 .decode( 123 std::env::var(&config.secrets.master_key_env) 124 .context("read master key from environment")? 125 .trim(), 126 ) 127 .context("decode master key as base64")?, 128 ); 129 let secrets = Arc::new( 130 SealedStore::open( 131 &config.secrets.sealed_key_file, 132 &master_key, 133 Box::new(OsEntropy), 134 ) 135 .context("open sealed key store")?, 136 ); 137 let knot_signing_key = secrets 138 .ensure(&knot_did) 139 .context("seal knot's own signing key")?; 140 let knot_actor = ActorId::from_secp256k1(knot_signing_key.as_bytes()); 141 let hostname = config.server.hostname.clone(); 142 let appview_endpoint = config 143 .server 144 .appview_endpoint 145 .as_str() 146 .trim_end_matches('/') 147 .to_string(); 148 let knot_service_url = format!("https://{}", config.server.hostname); 149 let did_document = 150 knot_atproto::knot_did_document(&knot_did, &knot_signing_key, &knot_service_url); 151 152 let http_addr = config.server.listen_addr; 153 let listen_limits = knot_edge::ListenLimits::new( 154 NonZeroU64::new(config.server.listen_header_timeout_ms) 155 .context("server.listen_header_timeout_ms must be greater than zero")?, 156 NonZeroU64::new(config.server.listen_idle_timeout_ms) 157 .context("server.listen_idle_timeout_ms must be greater than zero")?, 158 NonZeroU32::new(config.server.listen_max_connections) 159 .context("server.listen_max_connections must be greater than zero")?, 160 ); 161 let tls_setup = build_tls_setup(&config).context("assemble TLS configuration")?; 162 if config.tls.http3 && tls_setup.is_none() { 163 tracing::warn!( 164 "tls.http3 is set without any TLS certificate, so HTTP/3 will not start. Configure a static cert or ACME to serve h3." 165 ); 166 } 167 if config.tls.acme_enabled && http_addr.port() != 443 { 168 tracing::warn!( 169 listen_port = http_addr.port(), 170 "ACME validation over TLS-ALPN-01 needs the certificate authority to reach this host on TCP 443. Map 443 to the listen port if it differs." 171 ); 172 } 173 if config.tls.acme_enabled && config.tls.acme_staging { 174 tracing::warn!( 175 "ACME is using the Let's Encrypt staging directory. Its certificates are not browser-trusted. Unset tls.acme_staging for real certificates." 176 ); 177 } 178 let ssh_addr = config.server.ssh_listen_addr; 179 let ssh_max_pack_bytes = config.server.ssh_max_pack_bytes as usize; 180 let host_key = knot_ssh::load_or_create_host_key(&config.server.ssh_host_key_file) 181 .context("load or create SSH host key")?; 182 183 let xrpc_limits = knot_xrpc::LimitConfig { 184 burst: config.xrpc.preauth_burst, 185 refill_micros: config.xrpc.preauth_refill_ms.saturating_mul(1_000), 186 per_peer_inflight: config.xrpc.per_peer_inflight as usize, 187 global_inflight: config.xrpc.global_inflight as usize, 188 }; 189 let xrpc_max_body_bytes = config.xrpc.max_body_bytes as usize; 190 let xrpc_max_patch_bytes = config.xrpc.max_patch_bytes as usize; 191 let xrpc_max_patch_decompressed_bytes = config.xrpc.max_patch_decompressed_bytes; 192 let xrpc_max_response_bytes = config.xrpc.max_response_bytes as usize; 193 let xrpc_max_archive_bytes = config.xrpc.max_archive_bytes; 194 let xrpc_tree_last_commit_budget = 195 Duration::from_millis(config.xrpc.tree_last_commit_budget_ms); 196 let xrpc_blob_last_commit_budget = 197 Duration::from_millis(config.xrpc.blob_last_commit_budget_ms); 198 let xrpc_languages_budget = Duration::from_millis(config.xrpc.languages_budget_ms); 199 let xrpc_languages_push_budget = Duration::from_millis(config.xrpc.languages_push_budget_ms); 200 let fork_max_pack_bytes = config.xrpc.fork_max_pack_bytes; 201 let committer = knot_xrpc::Committer { 202 name: config.git.user_name.clone(), 203 email: config.git.user_email.clone(), 204 }; 205 let reservations = Arc::new(knot_xrpc::Reservations::new( 206 config.xrpc.reservation_ttl_secs as i64, 207 config.xrpc.per_actor_reservations as usize, 208 config.xrpc.max_pending_reservations as usize, 209 )); 210 let trusted_proxy_header = config.xrpc.trusted_proxy_header.clone(); 211 let events = Arc::new(knot_events::EventLog::new( 212 SystemClock, 213 config.xrpc.events_replay_buffer as usize, 214 )); 215 let subscriber_gate = Arc::new(knot_events::SubscriberGate::new( 216 config.xrpc.events_max_subscribers as usize, 217 config.xrpc.events_max_per_peer as usize, 218 )); 219 220 let maintenance_enabled = config.maintenance.enabled; 221 let maintenance_options = knot_maintenance::Options::from_config(&config.maintenance); 222 let maintenance_interval = Duration::from_secs(config.maintenance.interval_secs); 223 let maintenance_large_push = config.maintenance.large_push_bytes; 224 225 let recon_index = Arc::clone(&index); 226 let recon_layout = layout.clone(); 227 let recon_knot = hostname.clone(); 228 let recon_events = Arc::clone(&events); 229 tokio::task::spawn_blocking(move || { 230 let repos: Vec<reconstruct::RepoContext> = recon_index 231 .hosted_repos() 232 .into_iter() 233 .filter_map( 234 |repo| match (recon_index.owner_of(&repo), recon_index.rkey_of(&repo)) { 235 (Resolved::Ready(Some(owner)), Resolved::Ready(Some(rkey))) => { 236 Some(reconstruct::RepoContext { repo, owner, rkey }) 237 } 238 _ => None, 239 }, 240 ) 241 .collect(); 242 let now_seconds = (SystemClock.now_unix_micros().get() / 1_000_000) as i64; 243 let capacity = recon_events.capacity(); 244 let seeds = 245 reconstruct::pipeline_seeds(&recon_layout, &recon_knot, &repos, now_seconds, capacity); 246 if !seeds.is_empty() { 247 recon_events.seed(seeds); 248 } 249 }); 250 251 knot_config::init(config); 252 253 let (maintenance_handle, maintenance_shutdown, maintenance_task) = if maintenance_enabled { 254 let (scheduler, handle) = knot_maintenance::Scheduler::new( 255 layout.clone(), 256 Arc::new(IndexRepos(Arc::clone(&index))), 257 SystemClock, 258 maintenance_options, 259 maintenance_interval, 260 maintenance_large_push, 261 ); 262 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); 263 let task = tokio::spawn(scheduler.run(shutdown_rx)); 264 tracing::info!( 265 interval_secs = maintenance_interval.as_secs(), 266 "maintenance scheduler running" 267 ); 268 (handle, Some(shutdown_tx), Some(task)) 269 } else { 270 (knot_maintenance::MaintenanceHandle::disabled(), None, None) 271 }; 272 273 let ssh_state = Arc::new( 274 knot_ssh::SshState::new( 275 layout.clone(), 276 Arc::clone(&index), 277 Arc::clone(&atproto), 278 knot_actor, 279 Arc::clone(&events), 280 hostname.clone(), 281 appview_endpoint, 282 admins.clone(), 283 admission, 284 ssh_max_pack_bytes, 285 xrpc_languages_push_budget, 286 ) 287 .with_maintenance(maintenance_handle), 288 ); 289 290 let xrpc_state = Arc::new(XrpcState { 291 layout: layout.clone(), 292 index: Arc::clone(&index), 293 atproto: Arc::clone(&atproto), 294 secrets, 295 entropy: Arc::new(OsEntropy), 296 admins, 297 admission, 298 knot_did, 299 meta_path, 300 knot_service_url, 301 limiter: Arc::new(knot_xrpc::PreAuthLimiter::with_config(xrpc_limits)), 302 cob_locks: Arc::new(knot_xrpc::CobLocks::default()), 303 reservations, 304 trusted_proxy_header, 305 committer, 306 max_body_bytes: xrpc_max_body_bytes, 307 max_patch_bytes: xrpc_max_patch_bytes, 308 max_patch_decompressed_bytes: xrpc_max_patch_decompressed_bytes, 309 max_response_bytes: xrpc_max_response_bytes, 310 max_archive_bytes: xrpc_max_archive_bytes, 311 tree_last_commit_budget: knot_xrpc::ReadBudget::Within(xrpc_tree_last_commit_budget), 312 blob_last_commit_budget: knot_xrpc::ReadBudget::Within(xrpc_blob_last_commit_budget), 313 languages_budget: knot_xrpc::ReadBudget::Within(xrpc_languages_budget), 314 languages_push_budget: xrpc_languages_push_budget, 315 git_http, 316 fork_max_pack_bytes, 317 service_owner, 318 subscriber_gate, 319 events, 320 }); 321 322 let resolver: Arc<dyn knot_pack::RepoResolver> = { 323 let index = Arc::clone(&index); 324 Arc::new(move |target: &knot_pack::RepoTarget| match target { 325 knot_pack::RepoTarget::Did(did) => match index.owner_of(did) { 326 Resolved::Ready(Some(_)) => knot_pack::RepoLookup::Hosted(did.clone()), 327 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted, 328 Resolved::Warming => knot_pack::RepoLookup::Unavailable, 329 }, 330 knot_pack::RepoTarget::OwnerRkey(owner, rkey) => { 331 match index.resolve_repo(owner, rkey) { 332 Resolved::Ready(Some(found)) => knot_pack::RepoLookup::Hosted(found), 333 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted, 334 Resolved::Warming => knot_pack::RepoLookup::Unavailable, 335 } 336 } 337 }) 338 }; 339 let (write_routes, early_data_safe) = knot_pack::edge_routes(layout, resolver); 340 let app = knot_edge::RequiresFullHandshake::new( 341 write_routes.merge(knot_xrpc::router(xrpc_state)).route( 342 "/.well-known/did.json", 343 get(move || { 344 let document = did_document.clone(); 345 async move { Json(document) } 346 }), 347 ), 348 ); 349 let scheme = if tls_setup.is_some() { "https" } else { "http" }; 350 let edge_config = knot_edge::EdgeConfig { 351 http_addr, 352 limits: listen_limits, 353 tls: tls_setup, 354 }; 355 tracing::info!("listening on {scheme}://{http_addr} and ssh://{ssh_addr}"); 356 357 let shutdown = CancellationToken::new(); 358 let mut edge_task = tokio::spawn(knot_edge::serve( 359 edge_config, 360 app, 361 early_data_safe, 362 shutdown.clone(), 363 )); 364 let mut ssh_task = { 365 let shutdown = shutdown.clone(); 366 tokio::spawn(async move { knot_ssh::serve(ssh_addr, host_key, ssh_state, shutdown).await }) 367 }; 368 369 tokio::select! { 370 result = &mut edge_task => result.context("edge server task panicked")?.context("serve edge")?, 371 result = &mut ssh_task => result.context("ssh server task panicked")?.context("serve ssh")?, 372 () = shutdown_signal() => tracing::info!("shutdown signal received"), 373 } 374 shutdown.cancel(); 375 if tokio::time::timeout(EDGE_SHUTDOWN_DRAIN, async { 376 let _ = (&mut edge_task).await; 377 let _ = (&mut ssh_task).await; 378 }) 379 .await 380 .is_err() 381 { 382 tracing::warn!( 383 timeout_secs = EDGE_SHUTDOWN_DRAIN.as_secs(), 384 "aborting edge drain after timeout" 385 ); 386 } 387 if let Some(shutdown) = maintenance_shutdown { 388 let _ = shutdown.send(true); 389 } 390 if let Some(task) = maintenance_task 391 && tokio::time::timeout(MAINTENANCE_SHUTDOWN_DRAIN, task) 392 .await 393 .is_err() 394 { 395 tracing::warn!( 396 timeout_secs = MAINTENANCE_SHUTDOWN_DRAIN.as_secs(), 397 "aborting maintenance drain after timeout :3" 398 ); 399 } 400 Ok(()) 401} 402 403fn build_tls_setup( 404 config: &knot_config::KnotConfig, 405) -> anyhow::Result<Option<knot_edge::TlsSetup>> { 406 let tls = &config.tls; 407 let source = if tls.acme_enabled { 408 knot_edge::CertSource::Acme(knot_edge::AcmeParams { 409 domains: vec![config.server.hostname.clone()], 410 contact: tls 411 .acme_contact 412 .clone() 413 .context("tls.acme_contact is required when ACME is enabled")?, 414 cache_dir: tls 415 .acme_cache_dir 416 .clone() 417 .context("tls.acme_cache_dir is required when ACME is enabled")?, 418 production: !tls.acme_staging, 419 }) 420 } else { 421 match (&tls.cert_path, &tls.key_path) { 422 (Some(cert_path), Some(key_path)) => knot_edge::CertSource::Static { 423 cert_path: cert_path.clone(), 424 key_path: key_path.clone(), 425 }, 426 _ => return Ok(None), 427 } 428 }; 429 430 let internal = match tls.mtls_enabled { 431 true => Some(knot_edge::InternalTls { 432 addr: config.server.internal_listen_addr, 433 client_ca_path: tls 434 .mtls_client_ca_path 435 .clone() 436 .context("tls.mtls_client_ca_path is required when mTLS is enabled")?, 437 spki_pin: knot_edge::SpkiPin::from_base64( 438 tls.mtls_admin_spki_pin 439 .as_deref() 440 .context("tls.mtls_admin_spki_pin is required when mTLS is enabled")?, 441 ) 442 .context("parse tls.mtls_admin_spki_pin")?, 443 }), 444 false => None, 445 }; 446 447 Ok(Some(knot_edge::TlsSetup { 448 source, 449 http3: tls.http3, 450 internal, 451 })) 452} 453 454async fn shutdown_signal() { 455 let interrupt = async { 456 let _ = tokio::signal::ctrl_c().await; 457 }; 458 #[cfg(unix)] 459 let terminate = async { 460 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 461 Ok(mut stream) => { 462 stream.recv().await; 463 } 464 Err(_) => std::future::pending::<()>().await, 465 } 466 }; 467 #[cfg(not(unix))] 468 let terminate = std::future::pending::<()>(); 469 470 tokio::select! { 471 () = interrupt => {} 472 () = terminate => {} 473 } 474}