Now let's take a silly one
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 17 kB View raw
1mod archive; 2mod error; 3mod fetch; 4mod frame; 5mod meter; 6mod objects; 7mod pkt; 8mod quarantine; 9mod receive; 10mod resolve; 11mod upload; 12 13use std::collections::HashMap; 14use std::io::{self, Read}; 15 16use axum::Router; 17use axum::body::{Body, Bytes}; 18use axum::extract::{DefaultBodyLimit, Path, Query, State}; 19use axum::http::{HeaderMap, header}; 20use axum::response::Response; 21use axum::routing::post; 22use knot_git::{Layout, Repo}; 23use knot_types::{Oid, OwnerDid, RepoDid, RepoRkey}; 24use std::sync::Arc; 25use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc}; 26use tokio_stream::wrappers::ReceiverStream; 27 28pub use error::{PackError, PackLimit}; 29pub use fetch::{FetchError, UpstreamRefs, local_pack, local_refs, remote_pack, remote_refs}; 30pub use frame::{ 31 ReceiveFramer, UploadFramer, archive_request_complete, receive_request_complete, upload_v0_nak, 32}; 33pub use meter::PackLimits; 34pub use objects::{ExpandedPack, count_expanded, write_expanded, write_pack}; 35pub use pkt::frame_report; 36pub use quarantine::sweep_incoming; 37pub use receive::{Preflight, ReceiveCommand, ReceiveGuard, ReceiveOutcome, RefDecision}; 38pub use upload::selection_budget; 39 40use upload::UploadOutcome; 41 42pub fn advertise_upload(repo: &Repo) -> Result<Vec<u8>, PackError> { 43 upload::advertise(repo) 44} 45 46pub fn advertise_upload_v0(repo: &Repo) -> Result<Vec<u8>, PackError> { 47 upload::advertise_v0(repo) 48} 49 50pub fn upload_pack(repo: &Repo, request: &[u8]) -> Result<Vec<u8>, PackError> { 51 upload::buffered(repo, request) 52} 53 54pub fn upload_pack_streamed( 55 repo: &Repo, 56 request: &[u8], 57 sink: &mut dyn FnMut(&[u8]) -> io::Result<()>, 58) -> Result<(), PackError> { 59 upload::streamed(repo, request, sink) 60} 61 62pub fn advertise_receive(repo: &Repo) -> Result<Vec<u8>, PackError> { 63 receive::advertise(repo) 64} 65 66pub fn advertise_upload_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> { 67 upload::advertise_ssh(repo) 68} 69 70pub fn advertise_upload_v0_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> { 71 upload::advertise_v0_ssh(repo) 72} 73 74pub fn advertise_receive_ssh(repo: &Repo) -> Result<Vec<u8>, PackError> { 75 receive::advertise_ssh(repo) 76} 77 78#[doc(hidden)] 79pub fn receive_pack(repo: &Repo, request: &[u8]) -> Result<Vec<u8>, PackError> { 80 receive::handle(repo, request, &PackLimits::default()) 81} 82 83#[doc(hidden)] 84pub fn receive_pack_with_limits( 85 repo: &Repo, 86 request: &[u8], 87 limits: &PackLimits, 88) -> Result<Vec<u8>, PackError> { 89 receive::handle(repo, request, limits) 90} 91 92pub fn receive_pack_guarded( 93 repo: &Repo, 94 request: &[u8], 95 limits: &PackLimits, 96 guard: &dyn ReceiveGuard, 97 seal: &dyn Fn(&[knot_git::RefUpdate]), 98) -> Result<ReceiveOutcome, PackError> { 99 receive::handle_guarded(repo, request, limits, guard, seal) 100} 101 102pub fn receive_preflight(request: &[u8]) -> Preflight { 103 receive::preflight(request) 104} 105 106pub fn upload_archive_streamed( 107 repo: &Repo, 108 request: &[u8], 109 sink: &mut dyn FnMut(&[u8]) -> io::Result<()>, 110) -> Result<(), PackError> { 111 archive::stream(repo, request, sink) 112} 113 114pub fn upload_archive(repo: &Repo, request: &[u8]) -> Result<Vec<u8>, PackError> { 115 let mut buf = Vec::new(); 116 upload_archive_streamed(repo, request, &mut |chunk| { 117 buf.extend_from_slice(chunk); 118 Ok(()) 119 })?; 120 Ok(buf) 121} 122 123pub fn meter_pack( 124 pack: &[u8], 125 limits: &PackLimits, 126 kind: gix::hash::Kind, 127) -> Result<(), PackError> { 128 meter::meter(pack, limits, kind) 129} 130 131pub fn ingest_pack( 132 objects_dir: &std::path::Path, 133 pack: &[u8], 134 limits: &PackLimits, 135 kind: gix::hash::Kind, 136) -> Result<(), PackError> { 137 objects::index_pack(objects_dir, pack, limits, kind) 138} 139 140#[doc(hidden)] 141pub mod fuzz { 142 pub fn pkt(data: &[u8]) { 143 let _ = crate::pkt::data_payloads(data); 144 let _ = crate::pkt::data_payloads_all(data); 145 let _ = crate::pkt::split_receive(data); 146 } 147 148 pub fn pack(data: &[u8]) { 149 let _ = crate::meter::meter(data, &crate::PackLimits::default(), gix::hash::Kind::Sha1); 150 } 151 152 pub fn receive_commands(data: &[u8]) { 153 crate::receive::fuzz(data); 154 } 155 156 pub fn upload_args(data: &[u8]) { 157 crate::upload::fuzz(data); 158 } 159} 160 161const MAX_REQUEST_BYTES: usize = 16 * 1024 * 1024; 162 163#[derive(Debug, Clone, PartialEq, Eq)] 164pub enum RepoTarget { 165 Did(RepoDid), 166 OwnerRkey(OwnerDid, RepoRkey), 167} 168 169#[derive(Debug, Clone, PartialEq, Eq)] 170pub enum RepoLookup { 171 Hosted(RepoDid), 172 Unhosted, 173 Unavailable, 174} 175 176impl RepoLookup { 177 pub fn or_else(self, next: impl FnOnce() -> RepoLookup) -> RepoLookup { 178 match self { 179 RepoLookup::Unhosted => next(), 180 decided => decided, 181 } 182 } 183 184 fn open(self, layout: &Layout) -> Result<Repo, PackError> { 185 match self { 186 RepoLookup::Hosted(did) => layout.open(&did).map_err(|_| PackError::NotFound), 187 RepoLookup::Unhosted => Err(PackError::NotFound), 188 RepoLookup::Unavailable => Err(PackError::Unavailable), 189 } 190 } 191} 192 193pub trait RepoResolver: Send + Sync + 'static { 194 fn resolve(&self, target: &RepoTarget) -> RepoLookup; 195} 196 197impl<F> RepoResolver for F 198where 199 F: Fn(&RepoTarget) -> RepoLookup + Send + Sync + 'static, 200{ 201 fn resolve(&self, target: &RepoTarget) -> RepoLookup { 202 self(target) 203 } 204} 205 206#[derive(Clone)] 207struct PackState { 208 layout: Layout, 209 resolver: Arc<dyn RepoResolver>, 210 pack_slots: Arc<Semaphore>, 211} 212 213fn default_pack_slots() -> usize { 214 std::thread::available_parallelism() 215 .map(|cores| cores.get()) 216 .unwrap_or(4) 217} 218 219pub fn router(layout: Layout, resolver: Arc<dyn RepoResolver>) -> Router { 220 router_with_pack_slots(layout, resolver, default_pack_slots()) 221} 222 223pub fn router_with_pack_slots( 224 layout: Layout, 225 resolver: Arc<dyn RepoResolver>, 226 pack_slots: usize, 227) -> Router { 228 let state = pack_state(layout, resolver, pack_slots); 229 write_routes(state.clone()).merge(advertisement_routes(state).into_router()) 230} 231 232pub fn edge_routes( 233 layout: Layout, 234 resolver: Arc<dyn RepoResolver>, 235) -> (Router, knot_edge::ZeroRttRoutes) { 236 let state = pack_state(layout, resolver, default_pack_slots()); 237 (write_routes(state.clone()), advertisement_routes(state)) 238} 239 240fn pack_state(layout: Layout, resolver: Arc<dyn RepoResolver>, pack_slots: usize) -> PackState { 241 PackState { 242 layout, 243 resolver, 244 pack_slots: Arc::new(Semaphore::new(pack_slots.max(1))), 245 } 246} 247 248fn write_routes(state: PackState) -> Router { 249 Router::new() 250 .route("/{did}/{name}/git-upload-pack", post(upload_named)) 251 .route( 252 "/{did}/{name}/git-upload-archive", 253 post(upload_archive_named), 254 ) 255 .route("/{did}/git-upload-pack", post(upload_did)) 256 .route("/{did}/git-upload-archive", post(upload_archive_did)) 257 .layer(DefaultBodyLimit::max(MAX_REQUEST_BYTES)) 258 .with_state(state) 259} 260 261fn advertisement_routes(state: PackState) -> knot_edge::ZeroRttRoutes { 262 let named_state = state.clone(); 263 let did_state = state; 264 knot_edge::ZeroRttRoutes::new() 265 .get( 266 "/{did}/{name}/info/refs", 267 knot_edge::ZeroRttSafe::new( 268 move |Path((did, name)): Path<(String, String)>, 269 Query(query): Query<HashMap<String, String>>, 270 headers: HeaderMap| { 271 let state = named_state.clone(); 272 async move { 273 let repo = open_named(&state, &did, &name)?; 274 read_advertisement( 275 query.get("service").map(String::as_str), 276 &headers, 277 &repo, 278 ) 279 } 280 }, 281 ), 282 ) 283 .get( 284 "/{did}/info/refs", 285 knot_edge::ZeroRttSafe::new( 286 move |Path(did): Path<String>, 287 Query(query): Query<HashMap<String, String>>, 288 headers: HeaderMap| { 289 let state = did_state.clone(); 290 async move { 291 let repo = open_did(&state, &did)?; 292 read_advertisement( 293 query.get("service").map(String::as_str), 294 &headers, 295 &repo, 296 ) 297 } 298 }, 299 ), 300 ) 301} 302 303fn open_named(state: &PackState, did: &str, name: &str) -> Result<Repo, PackError> { 304 let owner = OwnerDid::new(did).map_err(|error| PackError::BadPath(error.to_string()))?; 305 RepoRkey::clone_path_candidates(name) 306 .fold(RepoLookup::Unhosted, |acc, rkey| { 307 acc.or_else(|| { 308 state 309 .resolver 310 .resolve(&RepoTarget::OwnerRkey(owner.clone(), rkey)) 311 }) 312 }) 313 .open(&state.layout) 314} 315 316fn open_did(state: &PackState, did: &str) -> Result<Repo, PackError> { 317 let did = RepoDid::new(did).map_err(|error| PackError::BadPath(error.to_string()))?; 318 state 319 .resolver 320 .resolve(&RepoTarget::Did(did)) 321 .open(&state.layout) 322} 323 324fn read_advertisement( 325 service: Option<&str>, 326 headers: &HeaderMap, 327 repo: &Repo, 328) -> Result<Response, PackError> { 329 match service { 330 Some("git-upload-pack") => { 331 let body = if wants_v2(headers) { 332 advertise_upload(repo)? 333 } else { 334 upload::advertise_v0(repo)? 335 }; 336 Ok(git_response( 337 "application/x-git-upload-pack-advertisement", 338 body, 339 )) 340 } 341 Some("git-receive-pack") => Err(PackError::PushOverSsh), 342 _ => Err(PackError::UnsupportedService), 343 } 344} 345 346fn decode_request(headers: &HeaderMap, body: Bytes) -> Result<Vec<u8>, PackError> { 347 let tokens: Vec<&str> = headers 348 .get(header::CONTENT_ENCODING) 349 .and_then(|value| value.to_str().ok()) 350 .map(|value| { 351 value 352 .split(',') 353 .map(str::trim) 354 .filter(|token| !token.is_empty()) 355 .collect() 356 }) 357 .unwrap_or_default(); 358 if let Some(token) = tokens.iter().find(|token| { 359 !token.eq_ignore_ascii_case("gzip") && !token.eq_ignore_ascii_case("identity") 360 }) { 361 return Err(PackError::UnsupportedEncoding((*token).to_string())); 362 } 363 let gzipped = tokens 364 .iter() 365 .any(|token| token.eq_ignore_ascii_case("gzip")); 366 if !gzipped { 367 return Ok(body.to_vec()); 368 } 369 let mut out = Vec::new(); 370 flate2::read::GzDecoder::new(body.as_ref()) 371 .take(MAX_REQUEST_BYTES as u64 + 1) 372 .read_to_end(&mut out) 373 .map_err(|error| PackError::Protocol(format!("gzip request body: {error}")))?; 374 if out.len() > MAX_REQUEST_BYTES { 375 return Err(PackError::LimitExceeded(PackLimit::TotalBytes)); 376 } 377 Ok(out) 378} 379 380fn wants_v2(headers: &HeaderMap) -> bool { 381 headers 382 .get("git-protocol") 383 .and_then(|value| value.to_str().ok()) 384 .is_some_and(|value| value.split(':').any(|token| token.trim() == "version=2")) 385} 386 387async fn upload_dispatch( 388 state: &PackState, 389 repo: Repo, 390 body: &[u8], 391) -> Result<Response, PackError> { 392 match upload::plan(&repo, body)? { 393 UploadOutcome::Buffered(bytes) => { 394 Ok(git_response("application/x-git-upload-pack-result", bytes)) 395 } 396 UploadOutcome::Streaming { 397 preamble, 398 wants, 399 haves, 400 opts, 401 } => { 402 let permit = Arc::clone(&state.pack_slots) 403 .acquire_owned() 404 .await 405 .expect("pack concurrency semaphore is never closed"); 406 Ok(stream_response(repo, preamble, wants, haves, opts, permit)) 407 } 408 } 409} 410 411fn stream_response( 412 repo: Repo, 413 preamble: Vec<u8>, 414 wants: Vec<Oid>, 415 haves: Vec<Oid>, 416 opts: upload::StreamOpts, 417 permit: OwnedSemaphorePermit, 418) -> Response { 419 let side_band = opts.side_band; 420 let (tx, rx) = mpsc::channel::<Result<Bytes, io::Error>>(16); 421 tokio::task::spawn_blocking(move || { 422 let _permit = permit; 423 if tx.blocking_send(Ok(Bytes::from(preamble))).is_err() { 424 return; 425 } 426 let result = { 427 let mut sink = |chunk: &[u8]| -> io::Result<()> { 428 tx.blocking_send(Ok(Bytes::copy_from_slice(chunk))) 429 .map_err(|_| io::Error::other("client disconnected")) 430 }; 431 upload::stream_pack(&repo, &wants, &haves, &opts, &mut sink) 432 }; 433 match result { 434 Ok(()) if side_band => { 435 let mut flush = Vec::new(); 436 if pkt::write_flush(&mut flush).is_ok() { 437 let _ = tx.blocking_send(Ok(Bytes::from(flush))); 438 } 439 } 440 Ok(()) => {} 441 Err(error) if side_band => { 442 let mut tail = Vec::new(); 443 let message = format!("knot2: {}\n", error.to_string().replace('\n', " ")); 444 if pkt::write_band_error(&mut tail, message.as_bytes()).is_ok() { 445 let _ = pkt::write_flush(&mut tail); 446 let _ = tx.blocking_send(Ok(Bytes::from(tail))); 447 } 448 } 449 Err(error) => { 450 let _ = tx.blocking_send(Err(io::Error::other(error.to_string()))); 451 } 452 } 453 }); 454 nocache( 455 Response::builder().header(header::CONTENT_TYPE, "application/x-git-upload-pack-result"), 456 ) 457 .body(Body::from_stream(ReceiverStream::new(rx))) 458 .expect("valid response") 459} 460 461async fn upload_named( 462 State(state): State<PackState>, 463 Path((did, name)): Path<(String, String)>, 464 headers: HeaderMap, 465 body: Bytes, 466) -> Result<Response, PackError> { 467 let repo = open_named(&state, &did, &name)?; 468 upload_dispatch(&state, repo, &decode_request(&headers, body)?).await 469} 470 471async fn upload_did( 472 State(state): State<PackState>, 473 Path(did): Path<String>, 474 headers: HeaderMap, 475 body: Bytes, 476) -> Result<Response, PackError> { 477 let repo = open_did(&state, &did)?; 478 upload_dispatch(&state, repo, &decode_request(&headers, body)?).await 479} 480 481async fn archive_dispatch( 482 state: &PackState, 483 repo: Repo, 484 body: Vec<u8>, 485) -> Result<Response, PackError> { 486 let permit = Arc::clone(&state.pack_slots) 487 .acquire_owned() 488 .await 489 .expect("pack concurrency semaphore is never closed"); 490 Ok(archive_response(repo, body, permit)) 491} 492 493fn archive_response(repo: Repo, body: Vec<u8>, permit: OwnedSemaphorePermit) -> Response { 494 let (tx, rx) = mpsc::channel::<Result<Bytes, io::Error>>(16); 495 tokio::task::spawn_blocking(move || { 496 let _permit = permit; 497 let mut sink = |chunk: &[u8]| -> io::Result<()> { 498 tx.blocking_send(Ok(Bytes::copy_from_slice(chunk))) 499 .map_err(|_| io::Error::other("client disconnected")) 500 }; 501 if let Err(error) = upload_archive_streamed(&repo, &body, &mut sink) { 502 let _ = tx.blocking_send(Err(io::Error::other(error.to_string()))); 503 } 504 }); 505 nocache(Response::builder().header( 506 header::CONTENT_TYPE, 507 "application/x-git-upload-archive-result", 508 )) 509 .body(Body::from_stream(ReceiverStream::new(rx))) 510 .expect("valid response") 511} 512 513async fn upload_archive_named( 514 State(state): State<PackState>, 515 Path((did, name)): Path<(String, String)>, 516 headers: HeaderMap, 517 body: Bytes, 518) -> Result<Response, PackError> { 519 let repo = open_named(&state, &did, &name)?; 520 archive_dispatch(&state, repo, decode_request(&headers, body)?).await 521} 522 523async fn upload_archive_did( 524 State(state): State<PackState>, 525 Path(did): Path<String>, 526 headers: HeaderMap, 527 body: Bytes, 528) -> Result<Response, PackError> { 529 let repo = open_did(&state, &did)?; 530 archive_dispatch(&state, repo, decode_request(&headers, body)?).await 531} 532 533fn nocache(builder: axum::http::response::Builder) -> axum::http::response::Builder { 534 builder 535 .header(header::EXPIRES, "Fri, 01 Jan 1980 00:00:00 GMT") 536 .header(header::PRAGMA, "no-cache") 537 .header( 538 header::CACHE_CONTROL, 539 "no-cache, max-age=0, must-revalidate", 540 ) 541} 542 543fn git_response(content_type: &'static str, body: Vec<u8>) -> Response { 544 nocache(Response::builder().header(header::CONTENT_TYPE, content_type)) 545 .body(Body::from(body)) 546 .expect("valid response") 547}