Now let's take a silly one
1mod archive;
2mod error;
3mod fetch;
4mod frame;
5mod meter;
6mod objects;
7mod pkt;
8mod quarantine;
9mod receive;
10mod resolve;
11mod upload;
12
13use std::collections::HashMap;
14use std::io::{self, Read};
15
16use axum::Router;
17use axum::body::{Body, Bytes};
18use axum::extract::{DefaultBodyLimit, Path, Query, State};
19use axum::http::{HeaderMap, header};
20use axum::response::Response;
21use axum::routing::post;
22use knot_git::{Layout, Repo};
23use knot_types::{Oid, OwnerDid, RepoDid, RepoRkey};
24use std::sync::Arc;
25use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
26use tokio_stream::wrappers::ReceiverStream;
27
28pub use error::{PackError, PackLimit};
29pub use fetch::{FetchError, UpstreamRefs, local_pack, local_refs, remote_pack, remote_refs};
30pub use frame::{
31 ReceiveFramer, UploadFramer, archive_request_complete, receive_request_complete, upload_v0_nak,
32};
33pub use meter::PackLimits;
34pub use objects::{ExpandedPack, count_expanded, write_expanded, write_pack};
35pub use pkt::frame_report;
36pub use quarantine::sweep_incoming;
37pub use receive::{Preflight, ReceiveCommand, ReceiveGuard, ReceiveOutcome, RefDecision};
38pub use upload::selection_budget;
39
40use upload::UploadOutcome;
41
42pub fn advertise_upload(repo: &Repo) -> Result<Vec<u8>, PackError> {
43 upload::advertise(repo)
44}
45
46pub fn advertise_upload_v0(repo: &Repo) -> Result<Vec<u8>, PackError> {
47 upload::advertise_v0(repo)
48}
49
50pub fn upload_pack(repo: &Repo, request: &[u8]) -> Result<Vec<u8>, PackError> {
51 upload::buffered(repo, request)
52}
53
54pub fn upload_pack_streamed(
55 repo: &Repo,
56 request: &[u8],
57 sink: &mut dyn FnMut(&[u8]) -> io::Result<()>,
58) -> Result<(), PackError> {
59 upload::streamed(repo, request, sink)
60}
61
62pub fn advertise_receive(repo: &Repo) -> Result<Vec<u8>, PackError> {
63 receive::advertise(repo)
64}
65
66pub fn advertise_upload_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> {
67 upload::advertise_ssh(repo)
68}
69
70pub fn advertise_upload_v0_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> {
71 upload::advertise_v0_ssh(repo)
72}
73
74pub fn advertise_receive_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> {
75 receive::advertise_ssh(repo)
76}
77
78#[doc(hidden)]
79pub fn receive_pack(repo: &Repo, request: &[u8]) -> Result<Vec<u8>, PackError> {
80 receive::handle(repo, request, &PackLimits::default())
81}
82
83#[doc(hidden)]
84pub fn receive_pack_with_limits(
85 repo: &Repo,
86 request: &[u8],
87 limits: &PackLimits,
88) -> Result<Vec<u8>, PackError> {
89 receive::handle(repo, request, limits)
90}
91
92pub fn receive_pack_guarded(
93 repo: &Repo,
94 request: &[u8],
95 limits: &PackLimits,
96 guard: &dyn ReceiveGuard,
97 seal: &dyn Fn(&[knot_git::RefUpdate]),
98) -> Result<ReceiveOutcome, PackError> {
99 receive::handle_guarded(repo, request, limits, guard, seal)
100}
101
102pub fn receive_preflight(request: &[u8]) -> Preflight {
103 receive::preflight(request)
104}
105
106pub fn upload_archive_streamed(
107 repo: &Repo,
108 request: &[u8],
109 sink: &mut dyn FnMut(&[u8]) -> io::Result<()>,
110) -> Result<(), PackError> {
111 archive::stream(repo, request, sink)
112}
113
114pub fn upload_archive(repo: &Repo, request: &[u8]) -> Result<Vec<u8>, PackError> {
115 let mut buf = Vec::new();
116 upload_archive_streamed(repo, request, &mut |chunk| {
117 buf.extend_from_slice(chunk);
118 Ok(())
119 })?;
120 Ok(buf)
121}
122
123pub fn meter_pack(
124 pack: &[u8],
125 limits: &PackLimits,
126 kind: gix::hash::Kind,
127) -> Result<(), PackError> {
128 meter::meter(pack, limits, kind)
129}
130
131pub fn ingest_pack(
132 objects_dir: &std::path::Path,
133 pack: &[u8],
134 limits: &PackLimits,
135 kind: gix::hash::Kind,
136) -> Result<(), PackError> {
137 objects::index_pack(objects_dir, pack, limits, kind)
138}
139
140#[doc(hidden)]
141pub mod fuzz {
142 pub fn pkt(data: &[u8]) {
143 let _ = crate::pkt::data_payloads(data);
144 let _ = crate::pkt::data_payloads_all(data);
145 let _ = crate::pkt::split_receive(data);
146 }
147
148 pub fn pack(data: &[u8]) {
149 let _ = crate::meter::meter(data, &crate::PackLimits::default(), gix::hash::Kind::Sha1);
150 }
151
152 pub fn receive_commands(data: &[u8]) {
153 crate::receive::fuzz(data);
154 }
155
156 pub fn upload_args(data: &[u8]) {
157 crate::upload::fuzz(data);
158 }
159}
160
161const MAX_REQUEST_BYTES: usize = 16 * 1024 * 1024;
162
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub enum RepoTarget {
165 Did(RepoDid),
166 OwnerRkey(OwnerDid, RepoRkey),
167}
168
169#[derive(Debug, Clone, PartialEq, Eq)]
170pub enum RepoLookup {
171 Hosted(RepoDid),
172 Unhosted,
173 Unavailable,
174}
175
176impl RepoLookup {
177 pub fn or_else(self, next: impl FnOnce() -> RepoLookup) -> RepoLookup {
178 match self {
179 RepoLookup::Unhosted => next(),
180 decided => decided,
181 }
182 }
183
184 fn open(self, layout: &Layout) -> Result<Repo, PackError> {
185 match self {
186 RepoLookup::Hosted(did) => layout.open(&did).map_err(|_| PackError::NotFound),
187 RepoLookup::Unhosted => Err(PackError::NotFound),
188 RepoLookup::Unavailable => Err(PackError::Unavailable),
189 }
190 }
191}
192
193pub trait RepoResolver: Send + Sync + 'static {
194 fn resolve(&self, target: &RepoTarget) -> RepoLookup;
195}
196
197impl<F> RepoResolver for F
198where
199 F: Fn(&RepoTarget) -> RepoLookup + Send + Sync + 'static,
200{
201 fn resolve(&self, target: &RepoTarget) -> RepoLookup {
202 self(target)
203 }
204}
205
206#[derive(Clone)]
207struct PackState {
208 layout: Layout,
209 resolver: Arc<dyn RepoResolver>,
210 pack_slots: Arc<Semaphore>,
211}
212
213fn default_pack_slots() -> usize {
214 std::thread::available_parallelism()
215 .map(|cores| cores.get())
216 .unwrap_or(4)
217}
218
219pub fn router(layout: Layout, resolver: Arc<dyn RepoResolver>) -> Router {
220 router_with_pack_slots(layout, resolver, default_pack_slots())
221}
222
223pub fn router_with_pack_slots(
224 layout: Layout,
225 resolver: Arc<dyn RepoResolver>,
226 pack_slots: usize,
227) -> Router {
228 let state = pack_state(layout, resolver, pack_slots);
229 write_routes(state.clone()).merge(advertisement_routes(state).into_router())
230}
231
232pub fn edge_routes(
233 layout: Layout,
234 resolver: Arc<dyn RepoResolver>,
235) -> (Router, knot_edge::ZeroRttRoutes) {
236 let state = pack_state(layout, resolver, default_pack_slots());
237 (write_routes(state.clone()), advertisement_routes(state))
238}
239
240fn pack_state(layout: Layout, resolver: Arc<dyn RepoResolver>, pack_slots: usize) -> PackState {
241 PackState {
242 layout,
243 resolver,
244 pack_slots: Arc::new(Semaphore::new(pack_slots.max(1))),
245 }
246}
247
248fn write_routes(state: PackState) -> Router {
249 Router::new()
250 .route("/{did}/{name}/git-upload-pack", post(upload_named))
251 .route(
252 "/{did}/{name}/git-upload-archive",
253 post(upload_archive_named),
254 )
255 .route("/{did}/git-upload-pack", post(upload_did))
256 .route("/{did}/git-upload-archive", post(upload_archive_did))
257 .layer(DefaultBodyLimit::max(MAX_REQUEST_BYTES))
258 .with_state(state)
259}
260
261fn advertisement_routes(state: PackState) -> knot_edge::ZeroRttRoutes {
262 let named_state = state.clone();
263 let did_state = state;
264 knot_edge::ZeroRttRoutes::new()
265 .get(
266 "/{did}/{name}/info/refs",
267 knot_edge::ZeroRttSafe::new(
268 move |Path((did, name)): Path<(String, String)>,
269 Query(query): Query<HashMap<String, String>>,
270 headers: HeaderMap| {
271 let state = named_state.clone();
272 async move {
273 let repo = open_named(&state, &did, &name)?;
274 read_advertisement(
275 query.get("service").map(String::as_str),
276 &headers,
277 &repo,
278 )
279 }
280 },
281 ),
282 )
283 .get(
284 "/{did}/info/refs",
285 knot_edge::ZeroRttSafe::new(
286 move |Path(did): Path<String>,
287 Query(query): Query<HashMap<String, String>>,
288 headers: HeaderMap| {
289 let state = did_state.clone();
290 async move {
291 let repo = open_did(&state, &did)?;
292 read_advertisement(
293 query.get("service").map(String::as_str),
294 &headers,
295 &repo,
296 )
297 }
298 },
299 ),
300 )
301}
302
303fn open_named(state: &PackState, did: &str, name: &str) -> Result<Repo, PackError> {
304 let owner = OwnerDid::new(did).map_err(|error| PackError::BadPath(error.to_string()))?;
305 RepoRkey::clone_path_candidates(name)
306 .fold(RepoLookup::Unhosted, |acc, rkey| {
307 acc.or_else(|| {
308 state
309 .resolver
310 .resolve(&RepoTarget::OwnerRkey(owner.clone(), rkey))
311 })
312 })
313 .open(&state.layout)
314}
315
316fn open_did(state: &PackState, did: &str) -> Result<Repo, PackError> {
317 let did = RepoDid::new(did).map_err(|error| PackError::BadPath(error.to_string()))?;
318 state
319 .resolver
320 .resolve(&RepoTarget::Did(did))
321 .open(&state.layout)
322}
323
324fn read_advertisement(
325 service: Option<&str>,
326 headers: &HeaderMap,
327 repo: &Repo,
328) -> Result<Response, PackError> {
329 match service {
330 Some("git-upload-pack") => {
331 let body = if wants_v2(headers) {
332 advertise_upload(repo)?
333 } else {
334 upload::advertise_v0(repo)?
335 };
336 Ok(git_response(
337 "application/x-git-upload-pack-advertisement",
338 body,
339 ))
340 }
341 Some("git-receive-pack") => Err(PackError::PushOverSsh),
342 _ => Err(PackError::UnsupportedService),
343 }
344}
345
346fn decode_request(headers: &HeaderMap, body: Bytes) -> Result<Vec<u8>, PackError> {
347 let tokens: Vec<&str> = headers
348 .get(header::CONTENT_ENCODING)
349 .and_then(|value| value.to_str().ok())
350 .map(|value| {
351 value
352 .split(',')
353 .map(str::trim)
354 .filter(|token| !token.is_empty())
355 .collect()
356 })
357 .unwrap_or_default();
358 if let Some(token) = tokens.iter().find(|token| {
359 !token.eq_ignore_ascii_case("gzip") && !token.eq_ignore_ascii_case("identity")
360 }) {
361 return Err(PackError::UnsupportedEncoding((*token).to_string()));
362 }
363 let gzipped = tokens
364 .iter()
365 .any(|token| token.eq_ignore_ascii_case("gzip"));
366 if !gzipped {
367 return Ok(body.to_vec());
368 }
369 let mut out = Vec::new();
370 flate2::read::GzDecoder::new(body.as_ref())
371 .take(MAX_REQUEST_BYTES as u64 + 1)
372 .read_to_end(&mut out)
373 .map_err(|error| PackError::Protocol(format!("gzip request body: {error}")))?;
374 if out.len() > MAX_REQUEST_BYTES {
375 return Err(PackError::LimitExceeded(PackLimit::TotalBytes));
376 }
377 Ok(out)
378}
379
380fn wants_v2(headers: &HeaderMap) -> bool {
381 headers
382 .get("git-protocol")
383 .and_then(|value| value.to_str().ok())
384 .is_some_and(|value| value.split(':').any(|token| token.trim() == "version=2"))
385}
386
387async fn upload_dispatch(
388 state: &PackState,
389 repo: Repo,
390 body: &[u8],
391) -> Result<Response, PackError> {
392 match upload::plan(&repo, body)? {
393 UploadOutcome::Buffered(bytes) => {
394 Ok(git_response("application/x-git-upload-pack-result", bytes))
395 }
396 UploadOutcome::Streaming {
397 preamble,
398 wants,
399 haves,
400 opts,
401 } => {
402 let permit = Arc::clone(&state.pack_slots)
403 .acquire_owned()
404 .await
405 .expect("pack concurrency semaphore is never closed");
406 Ok(stream_response(repo, preamble, wants, haves, opts, permit))
407 }
408 }
409}
410
411fn stream_response(
412 repo: Repo,
413 preamble: Vec<u8>,
414 wants: Vec<Oid>,
415 haves: Vec<Oid>,
416 opts: upload::StreamOpts,
417 permit: OwnedSemaphorePermit,
418) -> Response {
419 let side_band = opts.side_band;
420 let (tx, rx) = mpsc::channel::<Result<Bytes, io::Error>>(16);
421 tokio::task::spawn_blocking(move || {
422 let _permit = permit;
423 if tx.blocking_send(Ok(Bytes::from(preamble))).is_err() {
424 return;
425 }
426 let result = {
427 let mut sink = |chunk: &[u8]| -> io::Result<()> {
428 tx.blocking_send(Ok(Bytes::copy_from_slice(chunk)))
429 .map_err(|_| io::Error::other("client disconnected"))
430 };
431 upload::stream_pack(&repo, &wants, &haves, &opts, &mut sink)
432 };
433 match result {
434 Ok(()) if side_band => {
435 let mut flush = Vec::new();
436 if pkt::write_flush(&mut flush).is_ok() {
437 let _ = tx.blocking_send(Ok(Bytes::from(flush)));
438 }
439 }
440 Ok(()) => {}
441 Err(error) if side_band => {
442 let mut tail = Vec::new();
443 let message = format!("knot2: {}\n", error.to_string().replace('\n', " "));
444 if pkt::write_band_error(&mut tail, message.as_bytes()).is_ok() {
445 let _ = pkt::write_flush(&mut tail);
446 let _ = tx.blocking_send(Ok(Bytes::from(tail)));
447 }
448 }
449 Err(error) => {
450 let _ = tx.blocking_send(Err(io::Error::other(error.to_string())));
451 }
452 }
453 });
454 nocache(
455 Response::builder().header(header::CONTENT_TYPE, "application/x-git-upload-pack-result"),
456 )
457 .body(Body::from_stream(ReceiverStream::new(rx)))
458 .expect("valid response")
459}
460
461async fn upload_named(
462 State(state): State<PackState>,
463 Path((did, name)): Path<(String, String)>,
464 headers: HeaderMap,
465 body: Bytes,
466) -> Result<Response, PackError> {
467 let repo = open_named(&state, &did, &name)?;
468 upload_dispatch(&state, repo, &decode_request(&headers, body)?).await
469}
470
471async fn upload_did(
472 State(state): State<PackState>,
473 Path(did): Path<String>,
474 headers: HeaderMap,
475 body: Bytes,
476) -> Result<Response, PackError> {
477 let repo = open_did(&state, &did)?;
478 upload_dispatch(&state, repo, &decode_request(&headers, body)?).await
479}
480
481async fn archive_dispatch(
482 state: &PackState,
483 repo: Repo,
484 body: Vec<u8>,
485) -> Result<Response, PackError> {
486 let permit = Arc::clone(&state.pack_slots)
487 .acquire_owned()
488 .await
489 .expect("pack concurrency semaphore is never closed");
490 Ok(archive_response(repo, body, permit))
491}
492
493fn archive_response(repo: Repo, body: Vec<u8>, permit: OwnedSemaphorePermit) -> Response {
494 let (tx, rx) = mpsc::channel::<Result<Bytes, io::Error>>(16);
495 tokio::task::spawn_blocking(move || {
496 let _permit = permit;
497 let mut sink = |chunk: &[u8]| -> io::Result<()> {
498 tx.blocking_send(Ok(Bytes::copy_from_slice(chunk)))
499 .map_err(|_| io::Error::other("client disconnected"))
500 };
501 if let Err(error) = upload_archive_streamed(&repo, &body, &mut sink) {
502 let _ = tx.blocking_send(Err(io::Error::other(error.to_string())));
503 }
504 });
505 nocache(Response::builder().header(
506 header::CONTENT_TYPE,
507 "application/x-git-upload-archive-result",
508 ))
509 .body(Body::from_stream(ReceiverStream::new(rx)))
510 .expect("valid response")
511}
512
513async fn upload_archive_named(
514 State(state): State<PackState>,
515 Path((did, name)): Path<(String, String)>,
516 headers: HeaderMap,
517 body: Bytes,
518) -> Result<Response, PackError> {
519 let repo = open_named(&state, &did, &name)?;
520 archive_dispatch(&state, repo, decode_request(&headers, body)?).await
521}
522
523async fn upload_archive_did(
524 State(state): State<PackState>,
525 Path(did): Path<String>,
526 headers: HeaderMap,
527 body: Bytes,
528) -> Result<Response, PackError> {
529 let repo = open_did(&state, &did)?;
530 archive_dispatch(&state, repo, decode_request(&headers, body)?).await
531}
532
533fn nocache(builder: axum::http::response::Builder) -> axum::http::response::Builder {
534 builder
535 .header(header::EXPIRES, "Fri, 01 Jan 1980 00:00:00 GMT")
536 .header(header::PRAGMA, "no-cache")
537 .header(
538 header::CACHE_CONTROL,
539 "no-cache, max-age=0, must-revalidate",
540 )
541}
542
543fn git_response(content_type: &'static str, body: Vec<u8>) -> Response {
544 nocache(Response::builder().header(header::CONTENT_TYPE, content_type))
545 .body(Body::from(body))
546 .expect("valid response")
547}