Now let's take a silly one
1mod reconstruct;
2
3use std::collections::BTreeSet;
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Duration;
8
9use tokio_util::sync::CancellationToken;
10
11use anyhow::Context;
12use axum::Json;
13use axum::routing::get;
14use base64::Engine;
15use knot_atproto::Atproto;
16use knot_index::{Index, Resolved};
17use knot_runtime::{Clock, OsEntropy, ReqwestHttp, SystemClock};
18use knot_secrets::SealedStore;
19use knot_types::ActorId;
20use knot_xrpc::XrpcState;
21use zeroize::Zeroizing;
22
23const MAINTENANCE_SHUTDOWN_DRAIN: Duration = Duration::from_secs(30);
24const EDGE_SHUTDOWN_DRAIN: Duration = Duration::from_secs(40);
25
26struct IndexRepos(Arc<Index>);
27
28impl knot_maintenance::RepoSource for IndexRepos {
29 fn repos(&self) -> Vec<knot_types::RepoDid> {
30 self.0.hosted_repos()
31 }
32}
33
34fn init_tracing() {
35 let filter = tracing_subscriber::EnvFilter::try_from_default_env()
36 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
37 tracing_subscriber::fmt()
38 .with_env_filter(filter)
39 .with_writer(std::io::stderr)
40 .init();
41}
42
43#[tokio::main]
44async fn main() -> anyhow::Result<()> {
45 rustix::process::set_dumpable_behavior(rustix::process::DumpableBehavior::NotDumpable)
46 .context("disable core dumps and ptrace attachment")?;
47
48 if std::env::args().nth(1).as_deref() == Some("config-template") {
49 print!("{}", knot_config::template());
50 return Ok(());
51 }
52
53 init_tracing();
54
55 tracing::info!("!");
56 tracing::info!("!");
57 tracing::info!("!");
58 tracing::info!("> If knot1 was so good then why isn't there a... ( ˶°ㅁ°)");
59 tracing::info!("...");
60 tracing::info!("Welcome to knot2!");
61 tracing::info!("This code was made with love.");
62 tracing::info!("Hachapuri is sho tasty, definitely worth a try. Better than pizza tbh.");
63 tracing::info!("!");
64 tracing::info!("!");
65 tracing::info!("!");
66
67 let config_path = std::env::args().nth(1).map(PathBuf::from);
68 let config = knot_config::load(config_path.as_deref()).context("load configuration")?;
69 config
70 .verify_environment()
71 .context("verify runtime environment")?;
72
73 let knot_did = config.knot_did().context("derive knot DID")?;
74 let object_format = config.object_format().context("parse git.object_format")?;
75 let layout = knot_git::Layout::new(&config.repo.scan_path)
76 .with_default_branch(&config.repo.default_branch)
77 .context("configure default branch")?
78 .with_object_format(object_format)
79 .reserving_meta(&knot_did)
80 .context("reserve meta-repo path")?;
81
82 let swept = knot_pack::sweep_incoming(&config.repo.scan_path);
83 if swept > 0 {
84 tracing::info!(swept, "swept abandoned receive staging directories");
85 }
86
87 layout
88 .bootstrap_meta(&knot_did)
89 .context("bootstrap meta-repo")?;
90 let meta_path = layout
91 .meta_path(&knot_did)
92 .context("resolve meta-repo path")?;
93
94 let index = Arc::new(Index::new(meta_path.clone(), layout.clone()));
95 index.rebuild().context("rebuild index from meta-repo")?;
96 tracing::info!(coverage = ?index.coverage(), "index ready");
97
98 let warm = Arc::clone(&index);
99 tokio::task::spawn_blocking(move || warm.warm_collaborators());
100
101 let http = ReqwestHttp::new(config.http_limits()).context("build outbound HTTP client")?;
102 let git_http: Arc<dyn knot_runtime::HttpTransport> = Arc::new(
103 ReqwestHttp::new(config.fork_http_limits()).context("build outbound git fetch client")?,
104 );
105 let atproto = Arc::new(Atproto::new(
106 http,
107 SystemClock,
108 knot_did.clone(),
109 config.atproto.plc_directory.clone(),
110 ));
111 let admins: BTreeSet<_> = config.server.admins.iter().cloned().collect();
112 let admission = config.acl.admission;
113 let service_owner = config
114 .server
115 .admins
116 .first()
117 .cloned()
118 .context("at least one admin is configured")?;
119
120 let master_key = Zeroizing::new(
121 base64::engine::general_purpose::STANDARD
122 .decode(
123 std::env::var(&config.secrets.master_key_env)
124 .context("read master key from environment")?
125 .trim(),
126 )
127 .context("decode master key as base64")?,
128 );
129 let secrets = Arc::new(
130 SealedStore::open(
131 &config.secrets.sealed_key_file,
132 &master_key,
133 Box::new(OsEntropy),
134 )
135 .context("open sealed key store")?,
136 );
137 let knot_signing_key = secrets
138 .ensure(&knot_did)
139 .context("seal knot's own signing key")?;
140 let knot_actor = ActorId::from_secp256k1(knot_signing_key.as_bytes());
141 let hostname = config.server.hostname.clone();
142 let appview_endpoint = config
143 .server
144 .appview_endpoint
145 .as_str()
146 .trim_end_matches('/')
147 .to_string();
148 let knot_service_url = format!("https://{}", config.server.hostname);
149 let did_document =
150 knot_atproto::knot_did_document(&knot_did, &knot_signing_key, &knot_service_url);
151
152 let http_addr = config.server.listen_addr;
153 let listen_limits = knot_edge::ListenLimits::new(
154 NonZeroU64::new(config.server.listen_header_timeout_ms)
155 .context("server.listen_header_timeout_ms must be greater than zero")?,
156 NonZeroU64::new(config.server.listen_idle_timeout_ms)
157 .context("server.listen_idle_timeout_ms must be greater than zero")?,
158 NonZeroU32::new(config.server.listen_max_connections)
159 .context("server.listen_max_connections must be greater than zero")?,
160 );
161 let tls_setup = build_tls_setup(&config).context("assemble TLS configuration")?;
162 if config.tls.http3 && tls_setup.is_none() {
163 tracing::warn!(
164 "tls.http3 is set without any TLS certificate, so HTTP/3 will not start. Configure a static cert or ACME to serve h3."
165 );
166 }
167 if config.tls.acme_enabled && http_addr.port() != 443 {
168 tracing::warn!(
169 listen_port = http_addr.port(),
170 "ACME validation over TLS-ALPN-01 needs the certificate authority to reach this host on TCP 443. Map 443 to the listen port if it differs."
171 );
172 }
173 if config.tls.acme_enabled && config.tls.acme_staging {
174 tracing::warn!(
175 "ACME is using the Let's Encrypt staging directory. Its certificates are not browser-trusted. Unset tls.acme_staging for real certificates."
176 );
177 }
178 let ssh_addr = config.server.ssh_listen_addr;
179 let ssh_max_pack_bytes = config.server.ssh_max_pack_bytes as usize;
180 let host_key = knot_ssh::load_or_create_host_key(&config.server.ssh_host_key_file)
181 .context("load or create SSH host key")?;
182
183 let xrpc_limits = knot_xrpc::LimitConfig {
184 burst: config.xrpc.preauth_burst,
185 refill_micros: config.xrpc.preauth_refill_ms.saturating_mul(1_000),
186 per_peer_inflight: config.xrpc.per_peer_inflight as usize,
187 global_inflight: config.xrpc.global_inflight as usize,
188 };
189 let xrpc_max_body_bytes = config.xrpc.max_body_bytes as usize;
190 let xrpc_max_patch_bytes = config.xrpc.max_patch_bytes as usize;
191 let xrpc_max_patch_decompressed_bytes = config.xrpc.max_patch_decompressed_bytes;
192 let xrpc_max_response_bytes = config.xrpc.max_response_bytes as usize;
193 let xrpc_max_archive_bytes = config.xrpc.max_archive_bytes;
194 let xrpc_tree_last_commit_budget =
195 Duration::from_millis(config.xrpc.tree_last_commit_budget_ms);
196 let xrpc_blob_last_commit_budget =
197 Duration::from_millis(config.xrpc.blob_last_commit_budget_ms);
198 let xrpc_languages_budget = Duration::from_millis(config.xrpc.languages_budget_ms);
199 let xrpc_languages_push_budget = Duration::from_millis(config.xrpc.languages_push_budget_ms);
200 let fork_max_pack_bytes = config.xrpc.fork_max_pack_bytes;
201 let committer = knot_xrpc::Committer {
202 name: config.git.user_name.clone(),
203 email: config.git.user_email.clone(),
204 };
205 let reservations = Arc::new(knot_xrpc::Reservations::new(
206 config.xrpc.reservation_ttl_secs as i64,
207 config.xrpc.per_actor_reservations as usize,
208 config.xrpc.max_pending_reservations as usize,
209 ));
210 let trusted_proxy_header = config.xrpc.trusted_proxy_header.clone();
211 let events = Arc::new(knot_events::EventLog::new(
212 SystemClock,
213 config.xrpc.events_replay_buffer as usize,
214 ));
215 let subscriber_gate = Arc::new(knot_events::SubscriberGate::new(
216 config.xrpc.events_max_subscribers as usize,
217 config.xrpc.events_max_per_peer as usize,
218 ));
219
220 let maintenance_enabled = config.maintenance.enabled;
221 let maintenance_options = knot_maintenance::Options::from_config(&config.maintenance);
222 let maintenance_interval = Duration::from_secs(config.maintenance.interval_secs);
223 let maintenance_large_push = config.maintenance.large_push_bytes;
224
225 let recon_index = Arc::clone(&index);
226 let recon_layout = layout.clone();
227 let recon_knot = hostname.clone();
228 let recon_events = Arc::clone(&events);
229 tokio::task::spawn_blocking(move || {
230 let repos: Vec<reconstruct::RepoContext> = recon_index
231 .hosted_repos()
232 .into_iter()
233 .filter_map(
234 |repo| match (recon_index.owner_of(&repo), recon_index.rkey_of(&repo)) {
235 (Resolved::Ready(Some(owner)), Resolved::Ready(Some(rkey))) => {
236 Some(reconstruct::RepoContext { repo, owner, rkey })
237 }
238 _ => None,
239 },
240 )
241 .collect();
242 let now_seconds = (SystemClock.now_unix_micros().get() / 1_000_000) as i64;
243 let capacity = recon_events.capacity();
244 let seeds =
245 reconstruct::pipeline_seeds(&recon_layout, &recon_knot, &repos, now_seconds, capacity);
246 if !seeds.is_empty() {
247 recon_events.seed(seeds);
248 }
249 });
250
251 knot_config::init(config);
252
253 let (maintenance_handle, maintenance_shutdown, maintenance_task) = if maintenance_enabled {
254 let (scheduler, handle) = knot_maintenance::Scheduler::new(
255 layout.clone(),
256 Arc::new(IndexRepos(Arc::clone(&index))),
257 SystemClock,
258 maintenance_options,
259 maintenance_interval,
260 maintenance_large_push,
261 );
262 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
263 let task = tokio::spawn(scheduler.run(shutdown_rx));
264 tracing::info!(
265 interval_secs = maintenance_interval.as_secs(),
266 "maintenance scheduler running"
267 );
268 (handle, Some(shutdown_tx), Some(task))
269 } else {
270 (knot_maintenance::MaintenanceHandle::disabled(), None, None)
271 };
272
273 let ssh_state = Arc::new(
274 knot_ssh::SshState::new(
275 layout.clone(),
276 Arc::clone(&index),
277 Arc::clone(&atproto),
278 knot_actor,
279 Arc::clone(&events),
280 hostname.clone(),
281 appview_endpoint,
282 admins.clone(),
283 admission,
284 ssh_max_pack_bytes,
285 xrpc_languages_push_budget,
286 )
287 .with_maintenance(maintenance_handle),
288 );
289
290 let xrpc_state = Arc::new(XrpcState {
291 layout: layout.clone(),
292 index: Arc::clone(&index),
293 atproto: Arc::clone(&atproto),
294 secrets,
295 entropy: Arc::new(OsEntropy),
296 admins,
297 admission,
298 knot_did,
299 meta_path,
300 knot_service_url,
301 limiter: Arc::new(knot_xrpc::PreAuthLimiter::with_config(xrpc_limits)),
302 cob_locks: Arc::new(knot_xrpc::CobLocks::default()),
303 reservations,
304 trusted_proxy_header,
305 committer,
306 max_body_bytes: xrpc_max_body_bytes,
307 max_patch_bytes: xrpc_max_patch_bytes,
308 max_patch_decompressed_bytes: xrpc_max_patch_decompressed_bytes,
309 max_response_bytes: xrpc_max_response_bytes,
310 max_archive_bytes: xrpc_max_archive_bytes,
311 tree_last_commit_budget: knot_xrpc::ReadBudget::Within(xrpc_tree_last_commit_budget),
312 blob_last_commit_budget: knot_xrpc::ReadBudget::Within(xrpc_blob_last_commit_budget),
313 languages_budget: knot_xrpc::ReadBudget::Within(xrpc_languages_budget),
314 languages_push_budget: xrpc_languages_push_budget,
315 git_http,
316 fork_max_pack_bytes,
317 service_owner,
318 subscriber_gate,
319 events,
320 });
321
322 let resolver: Arc<dyn knot_pack::RepoResolver> = {
323 let index = Arc::clone(&index);
324 Arc::new(move |target: &knot_pack::RepoTarget| match target {
325 knot_pack::RepoTarget::Did(did) => match index.owner_of(did) {
326 Resolved::Ready(Some(_)) => knot_pack::RepoLookup::Hosted(did.clone()),
327 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted,
328 Resolved::Warming => knot_pack::RepoLookup::Unavailable,
329 },
330 knot_pack::RepoTarget::OwnerRkey(owner, rkey) => {
331 match index.resolve_repo(owner, rkey) {
332 Resolved::Ready(Some(found)) => knot_pack::RepoLookup::Hosted(found),
333 Resolved::Ready(None) => knot_pack::RepoLookup::Unhosted,
334 Resolved::Warming => knot_pack::RepoLookup::Unavailable,
335 }
336 }
337 })
338 };
339 let (write_routes, early_data_safe) = knot_pack::edge_routes(layout, resolver);
340 let app = knot_edge::RequiresFullHandshake::new(
341 write_routes.merge(knot_xrpc::router(xrpc_state)).route(
342 "/.well-known/did.json",
343 get(move || {
344 let document = did_document.clone();
345 async move { Json(document) }
346 }),
347 ),
348 );
349 let scheme = if tls_setup.is_some() { "https" } else { "http" };
350 let edge_config = knot_edge::EdgeConfig {
351 http_addr,
352 limits: listen_limits,
353 tls: tls_setup,
354 };
355 tracing::info!("listening on {scheme}://{http_addr} and ssh://{ssh_addr}");
356
357 let shutdown = CancellationToken::new();
358 let mut edge_task = tokio::spawn(knot_edge::serve(
359 edge_config,
360 app,
361 early_data_safe,
362 shutdown.clone(),
363 ));
364 let mut ssh_task = {
365 let shutdown = shutdown.clone();
366 tokio::spawn(async move { knot_ssh::serve(ssh_addr, host_key, ssh_state, shutdown).await })
367 };
368
369 tokio::select! {
370 result = &mut edge_task => result.context("edge server task panicked")?.context("serve edge")?,
371 result = &mut ssh_task => result.context("ssh server task panicked")?.context("serve ssh")?,
372 () = shutdown_signal() => tracing::info!("shutdown signal received"),
373 }
374 shutdown.cancel();
375 if tokio::time::timeout(EDGE_SHUTDOWN_DRAIN, async {
376 let _ = (&mut edge_task).await;
377 let _ = (&mut ssh_task).await;
378 })
379 .await
380 .is_err()
381 {
382 tracing::warn!(
383 timeout_secs = EDGE_SHUTDOWN_DRAIN.as_secs(),
384 "aborting edge drain after timeout"
385 );
386 }
387 if let Some(shutdown) = maintenance_shutdown {
388 let _ = shutdown.send(true);
389 }
390 if let Some(task) = maintenance_task
391 && tokio::time::timeout(MAINTENANCE_SHUTDOWN_DRAIN, task)
392 .await
393 .is_err()
394 {
395 tracing::warn!(
396 timeout_secs = MAINTENANCE_SHUTDOWN_DRAIN.as_secs(),
397 "aborting maintenance drain after timeout :3"
398 );
399 }
400 Ok(())
401}
402
403fn build_tls_setup(
404 config: &knot_config::KnotConfig,
405) -> anyhow::Result<Option<knot_edge::TlsSetup>> {
406 let tls = &config.tls;
407 let source = if tls.acme_enabled {
408 knot_edge::CertSource::Acme(knot_edge::AcmeParams {
409 domains: vec![config.server.hostname.clone()],
410 contact: tls
411 .acme_contact
412 .clone()
413 .context("tls.acme_contact is required when ACME is enabled")?,
414 cache_dir: tls
415 .acme_cache_dir
416 .clone()
417 .context("tls.acme_cache_dir is required when ACME is enabled")?,
418 production: !tls.acme_staging,
419 })
420 } else {
421 match (&tls.cert_path, &tls.key_path) {
422 (Some(cert_path), Some(key_path)) => knot_edge::CertSource::Static {
423 cert_path: cert_path.clone(),
424 key_path: key_path.clone(),
425 },
426 _ => return Ok(None),
427 }
428 };
429
430 let internal = match tls.mtls_enabled {
431 true => Some(knot_edge::InternalTls {
432 addr: config.server.internal_listen_addr,
433 client_ca_path: tls
434 .mtls_client_ca_path
435 .clone()
436 .context("tls.mtls_client_ca_path is required when mTLS is enabled")?,
437 spki_pin: knot_edge::SpkiPin::from_base64(
438 tls.mtls_admin_spki_pin
439 .as_deref()
440 .context("tls.mtls_admin_spki_pin is required when mTLS is enabled")?,
441 )
442 .context("parse tls.mtls_admin_spki_pin")?,
443 }),
444 false => None,
445 };
446
447 Ok(Some(knot_edge::TlsSetup {
448 source,
449 http3: tls.http3,
450 internal,
451 }))
452}
453
454async fn shutdown_signal() {
455 let interrupt = async {
456 let _ = tokio::signal::ctrl_c().await;
457 };
458 #[cfg(unix)]
459 let terminate = async {
460 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
461 Ok(mut stream) => {
462 stream.recv().await;
463 }
464 Err(_) => std::future::pending::<()>().await,
465 }
466 };
467 #[cfg(not(unix))]
468 let terminate = std::future::pending::<()>();
469
470 tokio::select! {
471 () = interrupt => {}
472 () = terminate => {}
473 }
474}