Now let's take a silly one
1

Configure Feed

Select the types of activity you want to include in your feed.

pack & auth new typular

Lewis: May this revision serve well! <lu5a@proton.me>

author
Lewis
date (Jun 28, 2026, 1:43 PM +0300) commit f381a2f6 parent 3d78ad26 change-id quqrmvmq
+474 -226
+5 -3
crates/knot-atproto/src/auth.rs
··· 1 1 use http::HeaderValue; 2 2 use knot_runtime::{Entropy, HttpRequest, Signer}; 3 - use knot_types::{KnotId, Nsid, ServiceDid}; 3 + use knot_types::{KnotId, Nsid, ServiceDid, UnixSeconds}; 4 4 5 5 use crate::AtprotoError; 6 6 use crate::jwt; 7 + use crate::jwt::JwtNonce; 7 8 8 9 const POINTER_NONCE_BYTES: usize = 16; 9 10 ··· 11 12 pub issuer: &'a KnotId, 12 13 pub audience: &'a ServiceDid, 13 14 pub lxm: &'a Nsid, 14 - pub now_unix: i64, 15 + pub now_unix: UnixSeconds, 15 16 } 16 17 17 18 pub trait PointerAuthorizer { ··· 41 42 ) -> Result<(), AtprotoError> { 42 43 let mut bytes = [0u8; POINTER_NONCE_BYTES]; 43 44 self.entropy.fill(&mut bytes); 44 - let nonce: String = bytes.iter().map(|byte| format!("{byte:02x}")).collect(); 45 + let hex: String = bytes.iter().map(|byte| format!("{byte:02x}")).collect(); 46 + let nonce = JwtNonce::new(hex); 45 47 let token = jwt::mint( 46 48 self.signer, 47 49 ctx.issuer,
+122 -89
crates/knot-atproto/src/jwt.rs
··· 5 5 use knot_types::service_auth::{ 6 6 JwtHeader, ParsedJwt, PublicKey as VerifyKey, ServiceAuthClaims, ServiceAuthError, parse_jwt, 7 7 }; 8 - use knot_types::{AccountDid, CowStr, Did, DidService, KnotId, Nsid, ServiceDid}; 8 + use knot_types::{AccountDid, CowStr, Did, DidService, KnotId, Nsid, ServiceDid, UnixSeconds}; 9 9 10 10 pub(crate) const CLOCK_SKEW_SECS: i64 = 60; 11 11 pub(crate) const SERVICE_TOKEN_LIFETIME_SECS: i64 = 60; 12 12 const MAX_TOKEN_LIFETIME_SECS: i64 = 300; 13 13 const MAX_NONCE_BYTES: usize = 256; 14 + 15 + #[derive(Debug, Clone, PartialEq, Eq, Hash)] 16 + pub struct JwtNonce(String); 17 + 18 + impl JwtNonce { 19 + pub fn new(value: impl Into<String>) -> Self { 20 + Self(value.into()) 21 + } 22 + 23 + pub fn as_str(&self) -> &str { 24 + &self.0 25 + } 26 + } 14 27 15 28 #[derive(Debug, thiserror::Error)] 16 29 pub enum JwtError { ··· 33 46 #[error("audience mismatch: token addressed {actual}, expected {expected}")] 34 47 AudienceMismatch { expected: String, actual: String }, 35 48 #[error("token expired at {exp}, now {now}")] 36 - Expired { exp: i64, now: i64 }, 49 + Expired { exp: UnixSeconds, now: UnixSeconds }, 37 50 #[error("token issued in future: iat {iat}, now {now}")] 38 - IssuedInFuture { iat: i64, now: i64 }, 51 + IssuedInFuture { iat: UnixSeconds, now: UnixSeconds }, 39 52 #[error("token lifetime is too long: iat {iat}, exp {exp}, cap {max}s")] 40 - LifetimeTooLong { exp: i64, iat: i64, max: i64 }, 53 + LifetimeTooLong { 54 + exp: UnixSeconds, 55 + iat: UnixSeconds, 56 + max: i64, 57 + }, 41 58 #[error("token expires at {exp}, before its issue at {iat}")] 42 - ExpiresBeforeIssued { exp: i64, iat: i64 }, 59 + ExpiresBeforeIssued { exp: UnixSeconds, iat: UnixSeconds }, 43 60 #[error("method binding mismatch: token bound to {actual:?}, expected {expected}")] 44 61 MethodMismatch { 45 62 expected: String, ··· 47 64 }, 48 65 } 49 66 50 - pub fn mint( 67 + pub(crate) fn mint( 51 68 signer: &dyn Signer, 52 69 issuer: &KnotId, 53 70 audience: &ServiceDid, 54 71 method: &Nsid, 55 - nonce: String, 56 - now_unix: i64, 72 + nonce: JwtNonce, 73 + now_unix: UnixSeconds, 57 74 ) -> String { 58 75 let alg = match signer.scheme() { 59 76 SignatureScheme::Secp256k1 => "ES256K", ··· 66 83 let claims = ServiceAuthClaims { 67 84 iss: Did::new_owned(issuer.as_str()).expect("knot DID parses as a DID"), 68 85 aud: DidService::new_owned(audience.as_str()).expect("service DID parses as a DID"), 69 - exp: now_unix.saturating_add(SERVICE_TOKEN_LIFETIME_SECS), 70 - iat: now_unix, 71 - jti: Some(nonce.into()), 86 + exp: now_unix 87 + .saturating_add_secs(SERVICE_TOKEN_LIFETIME_SECS) 88 + .get(), 89 + iat: now_unix.get(), 90 + jti: Some(nonce.as_str().into()), 72 91 lxm: Some(method.clone()), 73 92 }; 74 93 let header_b64 = ··· 140 159 parsed: &ParsedJwt, 141 160 audience: &impl TokenAudience, 142 161 method: &Nsid, 143 - now_unix: i64, 162 + now_unix: UnixSeconds, 144 163 ) -> Result<(), JwtError> { 145 164 let claims = parsed.claims(); 165 + let exp = UnixSeconds::new(claims.exp); 166 + let iat = UnixSeconds::new(claims.iat); 146 167 147 168 if !audience.canonicalizes(claims.aud.as_str()) { 148 169 return Err(JwtError::AudienceMismatch { ··· 151 172 }); 152 173 } 153 174 154 - if claims.exp.saturating_add(CLOCK_SKEW_SECS) < now_unix { 155 - return Err(JwtError::Expired { 156 - exp: claims.exp, 157 - now: now_unix, 158 - }); 175 + if exp.saturating_add_secs(CLOCK_SKEW_SECS) < now_unix { 176 + return Err(JwtError::Expired { exp, now: now_unix }); 159 177 } 160 178 161 - if claims.iat.saturating_sub(CLOCK_SKEW_SECS) > now_unix { 162 - return Err(JwtError::IssuedInFuture { 163 - iat: claims.iat, 164 - now: now_unix, 165 - }); 179 + if iat.saturating_sub_secs(CLOCK_SKEW_SECS) > now_unix { 180 + return Err(JwtError::IssuedInFuture { iat, now: now_unix }); 166 181 } 167 182 168 - if claims.exp < claims.iat { 169 - return Err(JwtError::ExpiresBeforeIssued { 170 - exp: claims.exp, 171 - iat: claims.iat, 172 - }); 183 + if exp < iat { 184 + return Err(JwtError::ExpiresBeforeIssued { exp, iat }); 173 185 } 174 186 175 - if claims.exp.saturating_sub(claims.iat) > MAX_TOKEN_LIFETIME_SECS { 187 + if exp.get().saturating_sub(iat.get()) > MAX_TOKEN_LIFETIME_SECS { 176 188 return Err(JwtError::LifetimeTooLong { 177 - exp: claims.exp, 178 - iat: claims.iat, 189 + exp, 190 + iat, 179 191 max: MAX_TOKEN_LIFETIME_SECS, 180 192 }); 181 193 } ··· 191 203 Ok(()) 192 204 } 193 205 194 - pub fn nonce(parsed: &ParsedJwt) -> Result<&str, JwtError> { 195 - parsed 206 + pub(crate) fn nonce(parsed: &ParsedJwt) -> Result<JwtNonce, JwtError> { 207 + let jti: &str = parsed 196 208 .claims() 197 209 .jti 198 210 .as_ref() 199 211 .map(|jti| jti.as_ref()) 200 - .ok_or(JwtError::MissingNonce) 201 - .and_then(|jti: &str| { 202 - (jti.len() <= MAX_NONCE_BYTES) 203 - .then_some(jti) 204 - .ok_or(JwtError::OversizedNonce { len: jti.len() }) 205 - }) 212 + .ok_or(JwtError::MissingNonce)?; 213 + (jti.len() <= MAX_NONCE_BYTES) 214 + .then(|| JwtNonce::new(jti)) 215 + .ok_or(JwtError::OversizedNonce { len: jti.len() }) 206 216 } 207 217 208 218 pub fn verify_signature(parsed: &ParsedJwt, issuer_key: &CryptoKey<'_>) -> Result<(), JwtError> { ··· 244 254 format!("{signing_input}.{sig}") 245 255 } 246 256 247 - fn claims(iss: &str, aud: &str, exp: i64, lxm: &str) -> serde_json::Value { 257 + fn claims(iss: &str, aud: &str, exp: UnixSeconds, lxm: &str) -> serde_json::Value { 248 258 serde_json::json!({ 249 259 "iss": iss, 250 260 "aud": aud, 251 - "exp": exp, 252 - "iat": exp - 60, 261 + "exp": exp.get(), 262 + "iat": exp.saturating_sub_secs(60).get(), 253 263 "lxm": lxm, 254 264 }) 255 265 } ··· 269 279 #[test] 270 280 fn a_well_formed_token_authenticates_its_issuer() { 271 281 let (signing, public) = k256_key(1); 272 - let token = mint(&signing, &claims(ISS, KNOT, 1_000, METHOD)); 282 + let token = mint( 283 + &signing, 284 + &claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD), 285 + ); 273 286 let parsed = parse(&token).unwrap(); 274 - check_claims(&parsed, &knot(), &method(), 900).unwrap(); 287 + check_claims(&parsed, &knot(), &method(), UnixSeconds::new(900)).unwrap(); 275 288 verify_signature(&parsed, &public).unwrap(); 276 289 assert_eq!(issuer(&parsed).unwrap(), AccountDid::new(ISS).unwrap()); 277 290 } ··· 280 293 fn a_signature_under_the_wrong_key_is_rejected() { 281 294 let (signing, _) = k256_key(1); 282 295 let (_, stranger) = k256_key(2); 283 - let token = mint(&signing, &claims(ISS, KNOT, 1_000, METHOD)); 296 + let token = mint( 297 + &signing, 298 + &claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD), 299 + ); 284 300 let parsed = parse(&token).unwrap(); 285 301 let error = verify_signature(&parsed, &stranger).unwrap_err(); 286 302 assert!(matches!(error, JwtError::InvalidSignature)); ··· 289 305 #[test] 290 306 fn an_expired_token_past_the_skew_window_is_rejected() { 291 307 let (signing, _) = k256_key(1); 292 - let token = mint(&signing, &claims(ISS, KNOT, 1_000, METHOD)); 308 + let token = mint( 309 + &signing, 310 + &claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD), 311 + ); 293 312 let parsed = parse(&token).unwrap(); 294 - let error = check_claims(&parsed, &knot(), &method(), 1_100).unwrap_err(); 313 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(1_100)).unwrap_err(); 295 314 assert!(matches!( 296 315 error, 297 - JwtError::Expired { 298 - exp: 1_000, 299 - now: 1_100 300 - } 316 + JwtError::Expired { exp, now } 317 + if exp == UnixSeconds::new(1_000) && now == UnixSeconds::new(1_100) 301 318 )); 302 319 } 303 320 304 321 #[test] 305 322 fn a_token_within_the_skew_window_past_exp_is_accepted() { 306 323 let (signing, _) = k256_key(1); 307 - let token = mint(&signing, &claims(ISS, KNOT, 1_000, METHOD)); 324 + let token = mint( 325 + &signing, 326 + &claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD), 327 + ); 308 328 let parsed = parse(&token).unwrap(); 309 - check_claims(&parsed, &knot(), &method(), 1_030).unwrap(); 329 + check_claims(&parsed, &knot(), &method(), UnixSeconds::new(1_030)).unwrap(); 310 330 } 311 331 312 332 #[test] 313 333 fn a_token_for_another_knot_is_rejected() { 314 334 let (signing, _) = k256_key(1); 315 - let token = mint(&signing, &claims(ISS, "did:web:oyster.cafe", 1_000, METHOD)); 335 + let token = mint( 336 + &signing, 337 + &claims(ISS, "did:web:oyster.cafe", UnixSeconds::new(1_000), METHOD), 338 + ); 316 339 let parsed = parse(&token).unwrap(); 317 - let error = check_claims(&parsed, &knot(), &method(), 900).unwrap_err(); 340 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(900)).unwrap_err(); 318 341 assert!(matches!(error, JwtError::AudienceMismatch { .. })); 319 342 } 320 343 ··· 323 346 let (signing, _) = k256_key(1); 324 347 let token = mint( 325 348 &signing, 326 - &claims(ISS, KNOT, 1_000, "sh.tangled.repo.delete"), 349 + &claims(ISS, KNOT, UnixSeconds::new(1_000), "sh.tangled.repo.delete"), 327 350 ); 328 351 let parsed = parse(&token).unwrap(); 329 - let error = check_claims(&parsed, &knot(), &method(), 900).unwrap_err(); 352 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(900)).unwrap_err(); 330 353 assert!(matches!(error, JwtError::MethodMismatch { .. })); 331 354 } 332 355 ··· 338 361 }); 339 362 let token = mint(&signing, &unbound); 340 363 let parsed = parse(&token).unwrap(); 341 - let error = check_claims(&parsed, &knot(), &method(), 900).unwrap_err(); 364 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(900)).unwrap_err(); 342 365 assert!(matches!( 343 366 error, 344 367 JwtError::MethodMismatch { actual: None, .. } ··· 348 371 #[test] 349 372 fn a_token_without_a_nonce_is_rejected() { 350 373 let (signing, _) = k256_key(1); 351 - let token = mint(&signing, &claims(ISS, KNOT, 1_000, METHOD)); 374 + let token = mint( 375 + &signing, 376 + &claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD), 377 + ); 352 378 let parsed = parse(&token).unwrap(); 353 379 assert!(matches!(nonce(&parsed), Err(JwtError::MissingNonce))); 354 380 } ··· 356 382 #[test] 357 383 fn a_present_nonce_is_returned() { 358 384 let (signing, _) = k256_key(1); 359 - let mut body = claims(ISS, KNOT, 1_000, METHOD); 385 + let mut body = claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD); 360 386 body["jti"] = serde_json::json!("nonce-1"); 361 387 let token = mint(&signing, &body); 362 388 let parsed = parse(&token).unwrap(); 363 - assert_eq!(nonce(&parsed).unwrap(), "nonce-1"); 389 + assert_eq!(nonce(&parsed).unwrap().as_str(), "nonce-1"); 364 390 } 365 391 366 392 #[test] 367 393 fn an_oversized_nonce_is_rejected() { 368 394 let (signing, _) = k256_key(1); 369 - let mut body = claims(ISS, KNOT, 1_000, METHOD); 395 + let mut body = claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD); 370 396 body["jti"] = serde_json::json!("n".repeat(MAX_NONCE_BYTES + 1)); 371 397 let token = mint(&signing, &body); 372 398 let parsed = parse(&token).unwrap(); ··· 375 401 Err(JwtError::OversizedNonce { len }) if len == MAX_NONCE_BYTES + 1 376 402 )); 377 403 378 - let mut at_cap = claims(ISS, KNOT, 1_000, METHOD); 404 + let mut at_cap = claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD); 379 405 at_cap["jti"] = serde_json::json!("n".repeat(MAX_NONCE_BYTES)); 380 406 let parsed = parse(&mint(&signing, &at_cap)).unwrap(); 381 407 assert!(nonce(&parsed).is_ok()); ··· 389 415 }); 390 416 let token = mint(&signing, &future); 391 417 let parsed = parse(&token).unwrap(); 392 - let error = check_claims(&parsed, &knot(), &method(), 900).unwrap_err(); 418 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(900)).unwrap_err(); 393 419 assert!(matches!( 394 420 error, 395 - JwtError::IssuedInFuture { 396 - iat: 2_000, 397 - now: 900 398 - } 421 + JwtError::IssuedInFuture { iat, now } 422 + if iat == UnixSeconds::new(2_000) && now == UnixSeconds::new(900) 399 423 )); 400 424 } 401 425 ··· 407 431 }); 408 432 let token = mint(&signing, &overlong); 409 433 let parsed = parse(&token).unwrap(); 410 - let error = check_claims(&parsed, &knot(), &method(), 1_000).unwrap_err(); 434 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(1_000)).unwrap_err(); 411 435 assert!(matches!( 412 436 error, 413 - JwtError::LifetimeTooLong { 414 - exp: 1_400, 415 - iat: 1_000, 416 - max: 300 417 - } 437 + JwtError::LifetimeTooLong { exp, iat, max } 438 + if exp == UnixSeconds::new(1_400) && iat == UnixSeconds::new(1_000) && max == 300 418 439 )); 419 440 } 420 441 ··· 426 447 }); 427 448 let token = mint(&signing, &inverted); 428 449 let parsed = parse(&token).unwrap(); 429 - let error = check_claims(&parsed, &knot(), &method(), 1_000).unwrap_err(); 450 + let error = check_claims(&parsed, &knot(), &method(), UnixSeconds::new(1_000)).unwrap_err(); 430 451 assert!(matches!( 431 452 error, 432 - JwtError::ExpiresBeforeIssued { 433 - exp: 1_040, 434 - iat: 1_050 435 - } 453 + JwtError::ExpiresBeforeIssued { exp, iat } 454 + if exp == UnixSeconds::new(1_040) && iat == UnixSeconds::new(1_050) 436 455 )); 437 456 } 438 457 439 458 #[test] 440 459 fn an_audience_in_a_different_case_still_matches() { 441 460 let (signing, public) = k256_key(1); 442 - let token = mint(&signing, &claims(ISS, "did:web:NEL.PET", 1_000, METHOD)); 461 + let token = mint( 462 + &signing, 463 + &claims(ISS, "did:web:NEL.PET", UnixSeconds::new(1_000), METHOD), 464 + ); 443 465 let parsed = parse(&token).unwrap(); 444 - check_claims(&parsed, &knot(), &method(), 900).unwrap(); 466 + check_claims(&parsed, &knot(), &method(), UnixSeconds::new(900)).unwrap(); 445 467 verify_signature(&parsed, &public).unwrap(); 446 468 assert_eq!(issuer(&parsed).unwrap(), AccountDid::new(ISS).unwrap()); 447 469 } ··· 458 480 &knot_issuer, 459 481 &audience, 460 482 &bound, 461 - "nonce-minted".to_string(), 462 - 900, 483 + JwtNonce::new("nonce-minted"), 484 + UnixSeconds::new(900), 463 485 ); 464 486 let parsed = parse(&token).unwrap(); 465 - check_claims(&parsed, &audience, &bound, 905).unwrap(); 466 - assert_eq!(nonce(&parsed).unwrap(), "nonce-minted"); 487 + check_claims(&parsed, &audience, &bound, UnixSeconds::new(905)).unwrap(); 488 + assert_eq!(nonce(&parsed).unwrap().as_str(), "nonce-minted"); 467 489 assert_eq!(issuer(&parsed).unwrap(), AccountDid::new(KNOT).unwrap()); 468 490 let public = CryptoKey { 469 491 codec: KeyCodec::Secp256k1, ··· 480 502 &KnotId::new(KNOT).unwrap(), 481 503 &ServiceDid::new("did:web:pds.oyster.cafe").unwrap(), 482 504 &Nsid::new_owned("com.atproto.repo.putRecord").unwrap(), 483 - "nonce-lifetime".to_string(), 484 - 1_000, 505 + JwtNonce::new("nonce-lifetime"), 506 + UnixSeconds::new(1_000), 485 507 ); 486 508 let parsed = parse(&token).unwrap(); 487 509 assert_eq!(parsed.claims().iat, 1_000); ··· 497 519 &KnotId::new(KNOT).unwrap(), 498 520 &ServiceDid::new("did:web:pds.oyster.cafe").unwrap(), 499 521 &Nsid::new_owned("com.atproto.repo.putRecord").unwrap(), 500 - "nonce-stranger".to_string(), 501 - 900, 522 + JwtNonce::new("nonce-stranger"), 523 + UnixSeconds::new(900), 502 524 ); 503 525 let parsed = parse(&token).unwrap(); 504 526 let error = verify_signature(&parsed, &stranger).unwrap_err(); ··· 506 528 } 507 529 508 530 #[test] 531 + fn a_jwt_nonce_preserves_its_string_and_compares_by_value() { 532 + let nonce = JwtNonce::new("nonce-value"); 533 + assert_eq!(nonce.as_str(), "nonce-value"); 534 + assert_eq!(nonce, JwtNonce::new("nonce-value".to_string())); 535 + assert_ne!(nonce, JwtNonce::new("other")); 536 + } 537 + 538 + #[test] 509 539 fn an_ed25519_issuer_key_is_unsupported() { 510 540 let (signing, _) = k256_key(1); 511 - let token = mint(&signing, &claims(ISS, KNOT, 1_000, METHOD)); 541 + let token = mint( 542 + &signing, 543 + &claims(ISS, KNOT, UnixSeconds::new(1_000), METHOD), 544 + ); 512 545 let parsed = parse(&token).unwrap(); 513 546 let ed = CryptoKey { 514 547 codec: KeyCodec::Ed25519,
+30 -26
crates/knot-atproto/src/lib.rs
··· 9 9 pub use identity::{ 10 10 IdentityError, MintNonce, PreparedRepoDid, knot_did_document, prepare_repo_did, 11 11 }; 12 - pub use jwt::JwtError; 12 + pub use jwt::{JwtError, JwtNonce}; 13 13 pub use pointer::PointerReceipt; 14 14 pub use pubkeys::{KeyParseError, parse_authorized_key}; 15 15 pub use resolve::{Identity, ResolveError}; ··· 38 38 39 39 use futures::stream::{self, TryStreamExt}; 40 40 use knot_runtime::{Clock, HttpRequest, HttpTransport, NetworkError, UnixMicros}; 41 - use knot_types::{AccountDid, Collection, KnotId, Nsid, OfferedKey, RepoDid, RepoRkey, Rkey}; 41 + use knot_types::{ 42 + AccountDid, Collection, HttpStatus, KnotId, Nsid, OfferedKey, RepoDid, RepoRkey, Rkey, 43 + UnixSeconds, 44 + }; 45 + use pubkeys::Cursor; 42 46 use serde::{Deserialize, Serialize}; 43 47 use url::Url; 44 48 ··· 68 72 #[error("PDS endpoint {pds:?} is not usable base URL")] 69 73 BadPdsEndpoint { pds: String }, 70 74 #[error("service token {jti:?} has already been presented")] 71 - Replay { jti: String }, 75 + Replay { jti: JwtNonce }, 72 76 #[error("replay-protection store is full and cannot accept another nonce")] 73 77 ReplayStoreSaturated, 74 78 #[error("issuer {issuer} holds too many live replay nonces")] ··· 124 128 knot_did: KnotId, 125 129 plc_directory: Url, 126 130 identities: moka::future::Cache<AccountDid, Cached, IdentityHasher>, 127 - seen_jti: scc::HashMap<(AccountDid, String), UnixMicros>, 131 + seen_jti: scc::HashMap<(AccountDid, JwtNonce), UnixMicros>, 128 132 issuer_jti_counts: scc::HashMap<AccountDid, usize>, 129 133 } 130 134 ··· 192 196 .map_err(ResolveError::from)?; 193 197 if !response.status.is_success() { 194 198 return Err(ResolveError::Status { 195 - status: response.status.as_u16(), 199 + status: HttpStatus::new(response.status.as_u16()), 196 200 }); 197 201 } 198 202 resolve::identity_from_document(did, &response.body) ··· 210 214 Page::First(budget) => (None, budget), 211 215 Page::Next(cursor, budget) => (Some(cursor), budget), 212 216 }; 213 - let url = list_records_url(pds, did, cursor.as_deref())?; 217 + let url = list_records_url(pds, did, cursor.as_ref())?; 214 218 resolve::guard_fetch_url(&url)?; 215 219 let response = http.execute(HttpRequest::get(url)).await?; 216 220 if !response.status.is_success() { ··· 263 267 let parsed = jwt::parse(token)?; 264 268 let issuer = jwt::issuer(&parsed)?; 265 269 let now_micros = self.clock.now_unix_micros(); 266 - let now = (now_micros.get() / 1_000_000) as i64; 270 + let now = UnixSeconds::new((now_micros.get() / 1_000_000) as i64); 267 271 268 272 jwt::check_claims(&parsed, &self.knot_did, method, now)?; 269 273 let jti = jwt::nonce(&parsed)?; 270 - let exp = parsed.claims().exp; 274 + let exp = UnixSeconds::new(parsed.claims().exp); 271 275 272 276 let identity = self.resolve_identity(&issuer).await?; 273 277 jwt::verify_signature(&parsed, &identity.signing_key)?; ··· 311 315 let audience = pointer::pds_service_did(&identity.pds)?; 312 316 let method = 313 317 Nsid::new_static(pointer::PUT_RECORD_METHOD).expect("literal method nsid parses"); 314 - let now = (self.clock.now_unix_micros().get() / 1_000_000) as i64; 318 + let now = UnixSeconds::new((self.clock.now_unix_micros().get() / 1_000_000) as i64); 315 319 let body = pointer::put_record_body(subject, rkey, record)?; 316 320 let mut request = json_post(url, bytes::Bytes::from(body)); 317 321 authorizer.authorize( ··· 347 351 .map_err(ResolveError::from)?; 348 352 if !response.status.is_success() { 349 353 return Err(ResolveError::Status { 350 - status: response.status.as_u16(), 354 + status: HttpStatus::new(response.status.as_u16()), 351 355 } 352 356 .into()); 353 357 } ··· 357 361 fn record_jti( 358 362 &self, 359 363 issuer: &AccountDid, 360 - jti: &str, 361 - exp: i64, 364 + jti: JwtNonce, 365 + exp: UnixSeconds, 362 366 now: UnixMicros, 363 367 ) -> Result<(), AtprotoError> { 364 - let horizon = exp.saturating_add(jwt::CLOCK_SKEW_SECS).max(0) as u64; 368 + let horizon = exp.saturating_add_secs(jwt::CLOCK_SKEW_SECS).get().max(0) as u64; 365 369 let expires_at = UnixMicros::new(horizon.saturating_mul(1_000_000)); 366 370 if self.seen_jti.len() >= MAX_SEEN_JTI { 367 371 self.prune(now); ··· 378 382 } 379 383 } 380 384 self.seen_jti 381 - .insert_sync((issuer.clone(), jti.to_string()), expires_at) 382 - .map_err(|_| AtprotoError::Replay { 383 - jti: jti.to_string(), 384 - })?; 385 + .insert_sync((issuer.clone(), jti.clone()), expires_at) 386 + .map_err(|_| AtprotoError::Replay { jti })?; 385 387 self.issuer_jti_counts 386 388 .entry_sync(issuer.clone()) 387 389 .and_modify(|count| *count += 1) ··· 416 418 417 419 fn warrants_negative_cache(error: &ResolveError) -> bool { 418 420 match error { 419 - ResolveError::Status { status } => (400..500).contains(status) && *status != 429, 421 + ResolveError::Status { status } => { 422 + (400..500).contains(&status.get()) && status.get() != 429 423 + } 420 424 ResolveError::Malformed(_) 421 425 | ResolveError::IdMismatch { .. } 422 426 | ResolveError::BadSigningKey(_) ··· 427 431 428 432 enum Page { 429 433 First(usize), 430 - Next(String, usize), 434 + Next(Cursor, usize), 431 435 Done, 432 436 } 433 437 ··· 454 458 fn list_records_url( 455 459 pds: &Url, 456 460 did: &AccountDid, 457 - cursor: Option<&str>, 461 + cursor: Option<&Cursor>, 458 462 ) -> Result<Url, AtprotoError> { 459 463 let mut url = xrpc_url(pds, "com.atproto.repo.listRecords")?; 460 464 url.query_pairs_mut() ··· 462 466 .append_pair("collection", "sh.tangled.publicKey") 463 467 .append_pair("limit", &PUBKEY_PAGE_LIMIT.to_string()); 464 468 if let Some(cursor) = cursor { 465 - url.query_pairs_mut().append_pair("cursor", cursor); 469 + url.query_pairs_mut().append_pair("cursor", cursor.as_str()); 466 470 } 467 471 Ok(url) 468 472 } ··· 654 658 let error = atproto.resolve_identity(&did(SQUID)).await.unwrap_err(); 655 659 assert!(matches!( 656 660 error, 657 - AtprotoError::Resolve(ResolveError::Status { status: 404 }) 661 + AtprotoError::Resolve(ResolveError::Status { status }) if status.get() == 404 658 662 )); 659 663 } 660 664 ··· 810 814 let url = list_records_url( 811 815 &Url::parse("https://shared.host/account-pds").unwrap(), 812 816 &did(SQUID), 813 - Some("page2"), 817 + Some(&Cursor::new("page2")), 814 818 ) 815 819 .unwrap(); 816 820 assert_eq!(url.path(), "/account-pds/xrpc/com.atproto.repo.listRecords"); ··· 1097 1101 assert!( 1098 1102 results.iter().all(|outcome| matches!( 1099 1103 outcome, 1100 - Err(AtprotoError::Resolve(ResolveError::Status { status: 429 })) 1104 + Err(AtprotoError::Resolve(ResolveError::Status { status })) if status.get() == 429 1101 1105 )), 1102 1106 "every caller in the wave receives real 429, never a poisoned RecentlyFailed" 1103 1107 ); ··· 1152 1156 .filter(|outcome| { 1153 1157 matches!( 1154 1158 outcome, 1155 - Err(AtprotoError::Resolve(ResolveError::Status { status: 404 })) 1159 + Err(AtprotoError::Resolve(ResolveError::Status { status })) if status.get() == 404 1156 1160 ) 1157 1161 }) 1158 1162 .count(); ··· 1337 1341 .unwrap_err(); 1338 1342 assert!(matches!( 1339 1343 error, 1340 - AtprotoError::Resolve(ResolveError::Status { status: 404 }) 1344 + AtprotoError::Resolve(ResolveError::Status { status }) if status.get() == 404 1341 1345 )); 1342 1346 } 1343 1347
+29 -5
crates/knot-atproto/src/pubkeys.rs
··· 1 1 use base64::Engine; 2 2 use base64::engine::general_purpose::STANDARD; 3 3 use knot_types::OfferedKey; 4 - use serde::Deserialize; 4 + use serde::{Deserialize, Serialize}; 5 + 6 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 7 + #[serde(transparent)] 8 + pub(crate) struct Cursor(String); 9 + 10 + impl Cursor { 11 + #[cfg(test)] 12 + pub(crate) fn new(value: impl Into<String>) -> Self { 13 + Self(value.into()) 14 + } 15 + 16 + pub(crate) fn as_str(&self) -> &str { 17 + &self.0 18 + } 19 + } 5 20 6 21 #[derive(Debug, thiserror::Error)] 7 22 pub enum KeyParseError { ··· 46 61 struct ListRecords { 47 62 records: Vec<Envelope>, 48 63 #[serde(default)] 49 - cursor: Option<String>, 64 + cursor: Option<Cursor>, 50 65 } 51 66 52 67 #[derive(Deserialize)] ··· 61 76 62 77 pub(crate) struct PubkeyPage { 63 78 pub keys: Vec<OfferedKey>, 64 - pub cursor: Option<String>, 79 + pub cursor: Option<Cursor>, 65 80 } 66 81 67 82 pub(crate) fn offered_page(body: &[u8], max_keys: usize) -> Result<PubkeyPage, serde_json::Error> { ··· 74 89 .collect(); 75 90 Ok(PubkeyPage { 76 91 keys, 77 - cursor: listing.cursor.filter(|cursor| !cursor.is_empty()), 92 + cursor: listing.cursor.filter(|cursor| !cursor.as_str().is_empty()), 78 93 }) 79 94 } 80 95 ··· 177 192 assert_eq!(page.keys.len(), 2); 178 193 assert_eq!(page.keys[0], parse_authorized_key(&good_one).unwrap()); 179 194 assert_eq!(page.keys[1], parse_authorized_key(&good_two).unwrap()); 180 - assert_eq!(page.cursor.as_deref(), Some("c")); 195 + assert_eq!(page.cursor.as_ref().map(Cursor::as_str), Some("c")); 196 + } 197 + 198 + #[test] 199 + fn a_cursor_serializes_as_a_plain_json_string() { 200 + let cursor = Cursor::new("page-token"); 201 + assert_eq!(cursor.as_str(), "page-token"); 202 + assert_eq!(serde_json::to_string(&cursor).unwrap(), "\"page-token\""); 203 + let parsed: Cursor = serde_json::from_str("\"page-token\"").unwrap(); 204 + assert_eq!(parsed, cursor); 181 205 } 182 206 183 207 #[test]
+3 -3
crates/knot-atproto/src/resolve.rs
··· 2 2 3 3 use knot_types::crypto::{KeyCodec, PublicKey as CryptoKey}; 4 4 use knot_types::did_doc::DidDocument; 5 - use knot_types::{AccountDid, Handle}; 5 + use knot_types::{AccountDid, Handle, HttpStatus}; 6 6 use url::{Host, Url}; 7 7 8 8 const LEGACY_K256_KIND: &str = "EcdsaSecp256k1VerificationKey2019"; ··· 15 15 #[error("DID {value:?} does not form a resolvable document location")] 16 16 Unresolvable { value: String }, 17 17 #[error("DID document fetch returned HTTP {status}")] 18 - Status { status: u16 }, 18 + Status { status: HttpStatus }, 19 19 #[error("network failure resolving DID document: {0}")] 20 20 Network(#[from] knot_runtime::NetworkError), 21 21 #[error("DID document is not valid JSON: {0}")] ··· 44 44 pub fn is_transient(&self) -> bool { 45 45 match self { 46 46 ResolveError::Network(_) => true, 47 - ResolveError::Status { status } => *status == 429 || (500..600).contains(status), 47 + ResolveError::Status { status } => status.is_transient(), 48 48 _ => false, 49 49 } 50 50 }
+1 -1
crates/knot-atproto/tests/security.rs
··· 552 552 assert!( 553 553 matches!( 554 554 first, 555 - AtprotoError::Resolve(ResolveError::Status { status: 404 }) 555 + AtprotoError::Resolve(ResolveError::Status { status }) if status.get() == 404 556 556 ), 557 557 "got {first:?}" 558 558 );
+19 -9
crates/knot-bench/benches/pack.rs
··· 5 5 use knot_bench::{ChurnCount, CommitCount, HistorySpec, PathCount, build_history}; 6 6 use knot_git::{Filter, PackBudget, Repo}; 7 7 use knot_pack::{ 8 - PackLimits, ReceiveCommand, ReceiveGuard, RefDecision, count_expanded, local_pack, 9 - receive_pack_guarded, upload_archive, write_expanded, write_pack, 8 + HaveOids, PackLimits, ReceiveCommand, ReceiveGuard, RefDecision, WantOids, count_expanded, 9 + local_pack, receive_pack_guarded, upload_archive, write_expanded, write_pack, 10 10 }; 11 11 use knot_types::Oid; 12 12 ··· 63 63 #[divan::bench(args = GRADES)] 64 64 fn clone(bencher: Bencher, commits: u32) { 65 65 let history = build_history(spec_for(commits)); 66 - let tips = history.tips(); 67 - let bytes = local_pack(history.repo(), &tips, &[], CAP).unwrap().len(); 66 + let wants = WantOids::new(history.tips()); 67 + let no_haves = HaveOids::default(); 68 + let bytes = local_pack(history.repo(), &wants, &no_haves, CAP) 69 + .unwrap() 70 + .len(); 68 71 bencher 69 72 .counter(BytesCount::new(bytes)) 70 - .bench_local(|| local_pack(history.repo(), &tips, &[], CAP).unwrap()); 73 + .bench_local(|| local_pack(history.repo(), &wants, &no_haves, CAP).unwrap()); 71 74 } 72 75 73 76 #[divan::bench(args = GRADES)] ··· 127 130 let history = build_history(spec_for(commits)); 128 131 let tips = history.tips(); 129 132 let walk = history.repo().rev_walk(&tips, &[]).unwrap(); 130 - let haves = vec![walk[walk.len() / 2]]; 131 - let bytes = local_pack(history.repo(), &tips, &haves, CAP) 133 + let haves = HaveOids::new(vec![walk[walk.len() / 2]]); 134 + let wants = WantOids::new(tips); 135 + let bytes = local_pack(history.repo(), &wants, &haves, CAP) 132 136 .unwrap() 133 137 .len(); 134 138 bencher 135 139 .counter(BytesCount::new(bytes)) 136 - .bench_local(|| local_pack(history.repo(), &tips, &haves, CAP).unwrap()); 140 + .bench_local(|| local_pack(history.repo(), &wants, &haves, CAP).unwrap()); 137 141 } 138 142 139 143 #[divan::bench(args = GRADES)] 140 144 fn push(bencher: Bencher, commits: u32) { 141 145 let history = build_history(spec_for(commits)); 142 146 let tips = history.tips(); 143 - let pack = local_pack(history.repo(), &tips, &[], CAP).unwrap(); 147 + let pack = local_pack( 148 + history.repo(), 149 + &WantOids::new(tips.clone()), 150 + &HaveOids::default(), 151 + CAP, 152 + ) 153 + .unwrap(); 144 154 let walk = history.repo().rev_walk(&tips, &[]).unwrap(); 145 155 let stride = walk.len().max(1) / 8 + 1; 146 156 let branch_tips: Vec<Oid> = walk.iter().step_by(stride).copied().collect();
+9 -8
crates/knot-pack/src/fetch.rs
··· 8 8 9 9 use crate::error::PackError; 10 10 use crate::pkt::{self, Frame}; 11 + use crate::{HaveOids, WantOids}; 11 12 12 13 #[derive(Debug, thiserror::Error)] 13 14 pub enum FetchError { ··· 149 150 ) 150 151 } 151 152 152 - pub fn fetch_request(wants: &[Oid], haves: &[Oid]) -> Result<Vec<u8>, PackError> { 153 + pub fn fetch_request(wants: &WantOids, haves: &HaveOids) -> Result<Vec<u8>, PackError> { 153 154 let mut buf = Vec::new(); 154 155 pkt::write_data(&mut buf, b"command=fetch\n")?; 155 156 pkt::write_data(&mut buf, b"agent=knot2/0\n")?; ··· 244 245 pub async fn remote_pack( 245 246 http: &dyn HttpTransport, 246 247 base: &Url, 247 - wants: &[Oid], 248 - haves: &[Oid], 248 + wants: &WantOids, 249 + haves: &HaveOids, 249 250 max_pack_bytes: u64, 250 251 ) -> Result<Vec<u8>, FetchError> { 251 252 if wants.is_empty() { ··· 302 303 303 304 pub fn local_pack( 304 305 source: &Repo, 305 - wants: &[Oid], 306 - haves: &[Oid], 306 + wants: &WantOids, 307 + haves: &HaveOids, 307 308 max_pack_bytes: u64, 308 309 ) -> Result<Vec<u8>, FetchError> { 309 310 if wants.is_empty() { ··· 311 312 } 312 313 let oids = source 313 314 .select_pack_objects_filtered( 314 - wants, 315 - haves, 315 + wants.as_slice(), 316 + haves.as_slice(), 316 317 Filter::None, 317 318 crate::upload::selection_budget(), 318 319 ) ··· 459 460 fn the_fetch_request_carries_wants_haves_and_done() { 460 461 let want = Oid::from_hex("95d09f2b10159347eece71399a7e2e907ea3df4f").unwrap(); 461 462 let have = Oid::from_hex("2222222222222222222222222222222222222222").unwrap(); 462 - let body = fetch_request(&[want], &[have]).unwrap(); 463 + let body = fetch_request(&WantOids::new(vec![want]), &HaveOids::new(vec![have])).unwrap(); 463 464 let lines = pkt::data_payloads_all(&body).unwrap(); 464 465 let text: Vec<&str> = lines 465 466 .iter()
+5 -3
crates/knot-pack/src/lib.rs
··· 4 4 mod frame; 5 5 mod meter; 6 6 mod objects; 7 + mod oids; 7 8 mod pkt; 8 9 mod quarantine; 9 10 mod receive; ··· 20 21 use axum::response::Response; 21 22 use axum::routing::post; 22 23 use knot_git::{Layout, Repo}; 23 - use knot_types::{Oid, OwnerDid, RepoDid, RepoRkey}; 24 + use knot_types::{OwnerDid, RepoDid, RepoRkey}; 24 25 use std::sync::Arc; 25 26 use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc}; 26 27 use tokio_stream::wrappers::ReceiverStream; ··· 32 33 }; 33 34 pub use meter::PackLimits; 34 35 pub use objects::{ExpandedPack, count_expanded, write_expanded, write_pack}; 36 + pub use oids::{HaveOids, WantOids}; 35 37 pub use pkt::frame_report; 36 38 pub use quarantine::sweep_incoming; 37 39 pub use receive::{Preflight, ReceiveCommand, ReceiveGuard, ReceiveOutcome, RefDecision}; ··· 411 413 fn stream_response( 412 414 repo: Repo, 413 415 preamble: Vec<u8>, 414 - wants: Vec<Oid>, 415 - haves: Vec<Oid>, 416 + wants: WantOids, 417 + haves: HaveOids, 416 418 opts: upload::StreamOpts, 417 419 permit: OwnedSemaphorePermit, 418 420 ) -> Response {
+63
crates/knot-pack/src/oids.rs
··· 1 + use knot_types::Oid; 2 + 3 + #[derive(Debug, Clone, Default, PartialEq, Eq)] 4 + pub struct WantOids(Vec<Oid>); 5 + 6 + #[derive(Debug, Clone, Default, PartialEq, Eq)] 7 + pub struct HaveOids(Vec<Oid>); 8 + 9 + impl WantOids { 10 + pub fn new(oids: Vec<Oid>) -> Self { 11 + Self(oids) 12 + } 13 + 14 + pub fn as_slice(&self) -> &[Oid] { 15 + &self.0 16 + } 17 + 18 + pub fn iter(&self) -> std::slice::Iter<'_, Oid> { 19 + self.0.iter() 20 + } 21 + 22 + pub fn len(&self) -> usize { 23 + self.0.len() 24 + } 25 + 26 + pub fn is_empty(&self) -> bool { 27 + self.0.is_empty() 28 + } 29 + } 30 + 31 + impl FromIterator<Oid> for WantOids { 32 + fn from_iter<I: IntoIterator<Item = Oid>>(iter: I) -> Self { 33 + Self(iter.into_iter().collect()) 34 + } 35 + } 36 + 37 + impl HaveOids { 38 + pub fn new(oids: Vec<Oid>) -> Self { 39 + Self(oids) 40 + } 41 + 42 + pub fn as_slice(&self) -> &[Oid] { 43 + &self.0 44 + } 45 + 46 + pub fn iter(&self) -> std::slice::Iter<'_, Oid> { 47 + self.0.iter() 48 + } 49 + 50 + pub fn len(&self) -> usize { 51 + self.0.len() 52 + } 53 + 54 + pub fn is_empty(&self) -> bool { 55 + self.0.is_empty() 56 + } 57 + } 58 + 59 + impl FromIterator<Oid> for HaveOids { 60 + fn from_iter<I: IntoIterator<Item = Oid>>(iter: I) -> Self { 61 + Self(iter.into_iter().collect()) 62 + } 63 + }
+12 -8
crates/knot-pack/src/receive.rs
··· 8 8 use crate::objects; 9 9 use crate::pkt; 10 10 use crate::quarantine::Quarantine; 11 + use crate::{HaveOids, WantOids}; 11 12 12 13 #[derive(Debug, Clone, PartialEq, Eq)] 13 14 pub enum RefDecision { ··· 157 158 Ok(RefSnapshot { by_name }) 158 159 } 159 160 160 - fn tips(&self) -> Vec<Oid> { 161 + fn tips(&self) -> HaveOids { 161 162 self.by_name.values().copied().collect() 162 163 } 163 164 ··· 183 184 }) 184 185 } 185 186 186 - fn objects_present(repo: &Repo, wants: &[Oid], haves: &[Oid]) -> bool { 187 + fn objects_present(repo: &Repo, wants: &WantOids, haves: &HaveOids) -> bool { 187 188 matches!( 188 189 repo.select_pack_objects_filtered( 189 - wants, 190 - haves, 190 + wants.as_slice(), 191 + haves.as_slice(), 191 192 Filter::None, 192 193 crate::upload::selection_budget(), 193 194 ), ··· 198 199 fn connectivity_reasons( 199 200 repo: &Repo, 200 201 commands: &[ReceiveCommand], 201 - haves: &[Oid], 202 + haves: &HaveOids, 202 203 ) -> Vec<Option<String>> { 203 - let news: Vec<Oid> = commands 204 + let news: WantOids = commands 204 205 .iter() 205 206 .map(|command| command.new) 206 207 .filter(|new| !new.is_null()) ··· 209 210 commands 210 211 .iter() 211 212 .map(|command| { 212 - if command.new.is_null() || batched_ok || objects_present(repo, &[command.new], haves) { 213 + if command.new.is_null() 214 + || batched_ok 215 + || objects_present(repo, &WantOids::new(vec![command.new]), haves) 216 + { 213 217 None 214 218 } else { 215 219 Some("missing necessary objects".to_string()) ··· 311 315 repo: &Repo, 312 316 snapshot: &RefSnapshot, 313 317 commands: &[ReceiveCommand], 314 - haves: &[Oid], 318 + haves: &HaveOids, 315 319 ) -> Vec<(String, Option<String>)> { 316 320 let fail = |reason: String| -> Vec<(String, Option<String>)> { 317 321 commands
+36 -22
crates/knot-pack/src/upload.rs
··· 3 3 use std::time::{Duration, Instant}; 4 4 5 5 use knot_git::{Deepen, Filter, PackBudget, Repo}; 6 - use knot_types::{ObjectFormat, Oid}; 6 + use knot_types::{ObjectFormat, Oid, UnixSeconds}; 7 7 8 8 use crate::error::PackError; 9 9 use crate::objects; 10 10 use crate::pkt; 11 + use crate::{HaveOids, WantOids}; 11 12 12 13 const AGENT: &[u8] = b"agent=knot2/0\n"; 13 14 const SELECTION_MAX_OBJECTS: usize = 5_000_000; ··· 33 34 Buffered(Vec<u8>), 34 35 Streaming { 35 36 preamble: Vec<u8>, 36 - wants: Vec<Oid>, 37 - haves: Vec<Oid>, 37 + wants: WantOids, 38 + haves: HaveOids, 38 39 opts: StreamOpts, 39 40 }, 40 41 } ··· 250 251 } 251 252 252 253 fn plan_v2_fetch(repo: &Repo, lines: &[&[u8]]) -> Result<UploadOutcome, PackError> { 253 - let wants = parse_oids(lines, b"want "); 254 + let wants = WantOids::new(parse_oids(lines, b"want ")); 254 255 ensure_wanted(repo, &wants)?; 255 - let haves = parse_oids(lines, b"have "); 256 + let haves = HaveOids::new(parse_oids(lines, b"have ")); 256 257 let done = lines.iter().any(|line| line.starts_with(b"done")); 257 258 let wait_for_done = lines.iter().any(|line| line.starts_with(b"wait-for-done")); 258 259 let sideband_all = lines.iter().any(|line| line.starts_with(b"sideband-all")); ··· 261 262 let filter = parse_filter(lines)?; 262 263 let deepen = parse_deepen(repo, lines)?; 263 264 let client_shallow = parse_oids(lines, b"shallow "); 264 - let common: Vec<Oid> = haves 265 + let common: HaveOids = haves 265 266 .iter() 266 267 .copied() 267 268 .filter(|oid| repo.contains(*oid)) ··· 291 292 } 292 293 293 294 let shallow_commits = if deepen.is_shallow_request() || repo.is_shallow() { 294 - let plan = repo.shallow_walk(&wants, &deepen, &client_shallow)?; 295 + let plan = repo.shallow_walk(wants.as_slice(), &deepen, &client_shallow)?; 295 296 seg(&mut preamble, sideband_all, b"shallow-info\n")?; 296 297 plan.shallow.iter().try_for_each(|oid| { 297 298 seg( ··· 399 400 }) 400 401 }; 401 402 let depth = value("deepen ").and_then(|text| text.parse::<u32>().ok()); 402 - let since = value("deepen-since ").and_then(|text| text.parse::<i64>().ok()); 403 + let since = value("deepen-since ") 404 + .and_then(|text| text.parse::<i64>().ok()) 405 + .map(UnixSeconds::new); 403 406 let relative = lines 404 407 .iter() 405 408 .any(|line| line.starts_with(b"deepen-relative")); ··· 435 438 436 439 fn plan_v0(repo: &Repo, body: &[u8]) -> Result<UploadOutcome, PackError> { 437 440 let lines = pkt::data_payloads_all(body)?; 438 - let wants = parse_oids(&lines, b"want "); 441 + let wants = WantOids::new(parse_oids(&lines, b"want ")); 439 442 ensure_wanted(repo, &wants)?; 440 - let haves = parse_oids(&lines, b"have "); 443 + let haves = HaveOids::new(parse_oids(&lines, b"have ")); 441 444 let done = lines.iter().any(|line| line.starts_with(b"done")); 442 445 let caps = first_caps(&lines); 443 446 let side_band = caps ··· 461 464 let filter = parse_filter(&lines)?; 462 465 let deepen = parse_deepen(repo, &lines)?; 463 466 let client_shallow = parse_oids(&lines, b"shallow "); 464 - let common: Vec<Oid> = haves 467 + let common: HaveOids = haves 465 468 .iter() 466 469 .copied() 467 470 .filter(|oid| repo.contains(*oid)) ··· 469 472 470 473 let mut preamble = Vec::new(); 471 474 let shallow_commits = if deepen.is_shallow_request() || repo.is_shallow() { 472 - let plan = repo.shallow_walk(&wants, &deepen, &client_shallow)?; 475 + let plan = repo.shallow_walk(wants.as_slice(), &deepen, &client_shallow)?; 473 476 plan.shallow.iter().try_for_each(|oid| { 474 477 pkt::write_data(&mut preamble, format!("shallow {oid}\n").as_bytes()) 475 478 })?; ··· 493 496 && !done 494 497 && !common.is_empty() 495 498 && common.len() == haves.len() 496 - && repo.wants_satisfied_by(&wants, &common)?; 499 + && repo.wants_satisfied_by(wants.as_slice(), common.as_slice())?; 497 500 498 - let stream = move |preamble: Vec<u8>, common: Vec<Oid>| UploadOutcome::Streaming { 501 + let stream = move |preamble: Vec<u8>, common: HaveOids| UploadOutcome::Streaming { 499 502 preamble, 500 503 wants, 501 504 haves: common, ··· 515 518 common.iter().try_for_each(|oid| { 516 519 pkt::write_data(&mut preamble, format!("ACK {oid} common\n").as_bytes()) 517 520 })?; 518 - let last = common.last().copied(); 521 + let last = common.as_slice().last().copied(); 519 522 if done { 520 523 match last { 521 524 Some(oid) => { ··· 539 542 return Ok(UploadOutcome::Buffered(preamble)); 540 543 } 541 544 542 - match common.first() { 545 + match common.as_slice().first() { 543 546 Some(oid) => pkt::write_data(&mut preamble, format!("ACK {oid}\n").as_bytes())?, 544 547 None => pkt::write_data(&mut preamble, b"NAK\n")?, 545 548 } ··· 551 554 552 555 pub fn stream_pack( 553 556 repo: &Repo, 554 - wants: &[Oid], 555 - haves: &[Oid], 557 + wants: &WantOids, 558 + haves: &HaveOids, 556 559 opts: &StreamOpts, 557 560 sink: &mut dyn FnMut(&[u8]) -> io::Result<()>, 558 561 ) -> Result<(), PackError> { ··· 563 566 && opts.packfile_uris.is_empty() 564 567 { 565 568 write_packfile_header(opts, sink)?; 566 - return stream_full_clone(repo, wants, opts, progress, sink); 569 + return stream_full_clone(repo, wants.as_slice(), opts, progress, sink); 567 570 } 568 571 let budget = selection_budget(); 569 572 let mut selection = match &opts.shallow_commits { 570 - Some(commits) => repo.select_shallow_objects(wants, commits, haves, opts.filter, budget)?, 571 - None => repo.select_pack_objects_filtered(wants, haves, opts.filter, budget)?, 573 + Some(commits) => repo.select_shallow_objects( 574 + wants.as_slice(), 575 + commits, 576 + haves.as_slice(), 577 + opts.filter, 578 + budget, 579 + )?, 580 + None => repo.select_pack_objects_filtered( 581 + wants.as_slice(), 582 + haves.as_slice(), 583 + opts.filter, 584 + budget, 585 + )?, 572 586 }; 573 587 if !opts.packfile_uris.is_empty() { 574 588 offload_packfile_uris( ··· 724 738 ) 725 739 } 726 740 727 - fn ensure_wanted(repo: &Repo, wants: &[Oid]) -> Result<(), PackError> { 741 + fn ensure_wanted(repo: &Repo, wants: &WantOids) -> Result<(), PackError> { 728 742 let tips: Vec<Oid> = repo 729 743 .advertised_refs()? 730 744 .iter()
+40 -10
crates/knot-pack/tests/fetch.rs
··· 5 5 6 6 use axum::http; 7 7 use knot_git::{Layout, Repo, Staging}; 8 - use knot_pack::{FetchError, PackLimits, ingest_pack, local_pack, local_refs}; 8 + use knot_pack::{FetchError, HaveOids, PackLimits, WantOids, ingest_pack, local_pack, local_refs}; 9 9 use knot_runtime::{FakeHttp, HttpRequest, HttpResponse, HttpTransport}; 10 10 use knot_types::{Oid, RefName, RepoDid}; 11 11 use url::Url; ··· 127 127 assert_eq!(refnames(&got), refnames(&expected)); 128 128 assert_eq!(got, expected); 129 129 130 - let pack = knot_pack::remote_pack(http, &base_url(), &refs.tips(), &[], CAP) 131 - .await 132 - .unwrap(); 130 + let pack = knot_pack::remote_pack( 131 + http, 132 + &base_url(), 133 + &WantOids::new(refs.tips()), 134 + &HaveOids::default(), 135 + CAP, 136 + ) 137 + .await 138 + .unwrap(); 133 139 ingest_pack( 134 140 &target.objects_dir(), 135 141 &pack, ··· 213 219 .unwrap(); 214 220 assert_ne!(old_tip, new_tip); 215 221 216 - let pack = knot_pack::remote_pack(http.as_ref(), &base_url(), &[new_tip], &[old_tip], CAP) 217 - .await 218 - .unwrap(); 222 + let pack = knot_pack::remote_pack( 223 + http.as_ref(), 224 + &base_url(), 225 + &WantOids::new(vec![new_tip]), 226 + &HaveOids::new(vec![old_tip]), 227 + CAP, 228 + ) 229 + .await 230 + .unwrap(); 219 231 let staging = Staging::new(&clone).unwrap(); 220 232 ingest_pack( 221 233 &staging.repo().objects_dir(), ··· 245 257 .map(|record| record.target) 246 258 .collect(); 247 259 let http = knot2_server(source_path); 248 - let result = knot_pack::remote_pack(http.as_ref(), &base_url(), &tips, &[], 16).await; 260 + let result = knot_pack::remote_pack( 261 + http.as_ref(), 262 + &base_url(), 263 + &WantOids::new(tips), 264 + &HaveOids::default(), 265 + 16, 266 + ) 267 + .await; 249 268 assert!(matches!( 250 269 result, 251 270 Err(FetchError::PackTooLarge { limit: 16 }) ··· 283 302 let fork = layout 284 303 .create(&RepoDid::new("did:plc:limpet").unwrap()) 285 304 .unwrap(); 286 - let pack = local_pack(&source, &refs.tips(), &[], CAP).unwrap(); 305 + let pack = local_pack( 306 + &source, 307 + &WantOids::new(refs.tips()), 308 + &HaveOids::default(), 309 + CAP, 310 + ) 311 + .unwrap(); 287 312 ingest_pack( 288 313 &fork.objects_dir(), 289 314 &pack, ··· 301 326 let source_path = seed_source(dir.path()); 302 327 let source = Repo::open(&source_path).unwrap(); 303 328 let refs = local_refs(&source, &[]).unwrap(); 304 - let result = local_pack(&source, &refs.tips(), &[], 16); 329 + let result = local_pack( 330 + &source, 331 + &WantOids::new(refs.tips()), 332 + &HaveOids::default(), 333 + 16, 334 + ); 305 335 assert!(matches!( 306 336 result, 307 337 Err(FetchError::PackTooLarge { limit: 16 })
+83 -34
crates/knot-secrets/src/lib.rs
··· 1 1 use std::collections::BTreeMap; 2 + use std::fmt; 2 3 use std::path::{Path, PathBuf}; 3 4 use std::sync::atomic::{AtomicU64, Ordering}; 4 5 use std::sync::{Mutex, MutexGuard, RwLock}; ··· 42 43 WeakMasterKey { len: usize }, 43 44 } 44 45 45 - #[derive(Debug, Clone, PartialEq, Eq)] 46 + #[derive(Clone, Zeroize, ZeroizeOnDrop)] 47 + pub struct MasterKey(Vec<u8>); 48 + 49 + impl MasterKey { 50 + pub fn new(bytes: impl Into<Vec<u8>>) -> Result<Self, SecretsError> { 51 + let bytes = bytes.into(); 52 + if bytes.len() < MIN_MASTER_KEY_LEN { 53 + return Err(SecretsError::WeakMasterKey { len: bytes.len() }); 54 + } 55 + Ok(Self(bytes)) 56 + } 57 + 58 + pub fn as_bytes(&self) -> &[u8] { 59 + &self.0 60 + } 61 + } 62 + 63 + impl fmt::Debug for MasterKey { 64 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 65 + f.write_str("MasterKey(<redacted>)") 66 + } 67 + } 68 + 69 + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] 70 + #[serde(transparent)] 46 71 pub struct SealedKeyId(String); 47 72 48 73 impl SealedKeyId { ··· 106 131 } 107 132 108 133 #[derive(Serialize, Deserialize)] 134 + #[serde(transparent)] 135 + struct EncodedSecret(String); 136 + 137 + impl EncodedSecret { 138 + fn new(value: String) -> Self { 139 + Self(value) 140 + } 141 + } 142 + 143 + #[derive(Serialize, Deserialize)] 109 144 struct VaultFile { 110 145 version: u32, 111 - entries: BTreeMap<String, String>, 146 + entries: BTreeMap<SealedKeyId, EncodedSecret>, 112 147 } 113 148 114 149 pub struct SealedStore { 115 150 path: PathBuf, 116 151 enc_key: Zeroizing<[u8; 32]>, 117 152 entropy: Box<dyn Entropy>, 118 - entries: RwLock<BTreeMap<String, SecretScalar>>, 153 + entries: RwLock<BTreeMap<SealedKeyId, SecretScalar>>, 119 154 persist_lock: Mutex<()>, 120 155 } 121 156 122 157 impl SealedStore { 123 158 pub fn open( 124 159 path: impl Into<PathBuf>, 125 - master_key: &[u8], 160 + master_key: &MasterKey, 126 161 entropy: Box<dyn Entropy>, 127 162 ) -> Result<Self, SecretsError> { 128 - if master_key.len() < MIN_MASTER_KEY_LEN { 129 - return Err(SecretsError::WeakMasterKey { 130 - len: master_key.len(), 131 - }); 132 - } 133 163 let path = path.into(); 134 164 sweep_stale_temps(&path); 135 - let enc_key = derive_enc_key(master_key); 165 + let enc_key = derive_enc_key(master_key.as_bytes()); 136 166 let entries = match std::fs::read(&path) { 137 167 Ok(sealed) => decode_vault(&unseal(&enc_key, &sealed)?)?, 138 168 Err(error) if error.kind() == std::io::ErrorKind::NotFound => BTreeMap::new(), ··· 157 187 self.entries 158 188 .read() 159 189 .expect("sealed store lock") 160 - .get(id.as_str()) 190 + .get(&id) 161 191 .map(SecretScalar::signer) 162 192 .ok_or(SecretsError::Missing(id.0)) 163 193 } ··· 167 197 self.entries 168 198 .read() 169 199 .expect("sealed store lock") 170 - .get(id.as_str()) 200 + .get(&id) 171 201 .map(|scalar| scalar.signer().public_key()) 172 202 .ok_or(SecretsError::Missing(id.0)) 173 203 } ··· 186 216 let id = id.into(); 187 217 let guard = self.persist_guard(); 188 218 let mut staged = self.staged(); 189 - if staged.contains_key(id.as_str()) { 219 + if staged.contains_key(&id) { 190 220 return Err(SecretsError::Occupied(id.0)); 191 221 } 192 - staged.insert(id.0, material.scalar.clone()); 222 + staged.insert(id, material.scalar.clone()); 193 223 self.commit_locked(&guard, staged) 194 224 } 195 225 ··· 200 230 .entries 201 231 .read() 202 232 .expect("sealed store lock") 203 - .get(id.as_str()) 233 + .get(&id) 204 234 .map(|scalar| scalar.signer().public_key()) 205 235 { 206 236 return Ok(public); ··· 208 238 let material = self.generate(); 209 239 let public = material.public_key(); 210 240 let mut staged = self.staged(); 211 - staged.insert(id.0, material.scalar.clone()); 241 + staged.insert(id, material.scalar.clone()); 212 242 self.commit_locked(&guard, staged)?; 213 243 Ok(public) 214 244 } ··· 225 255 let id = id.into(); 226 256 let guard = self.persist_guard(); 227 257 let mut staged = self.staged(); 228 - if staged.remove(id.as_str()).is_none() { 258 + if staged.remove(&id).is_none() { 229 259 return Ok(false); 230 260 } 231 261 self.commit_locked(&guard, staged)?; ··· 238 268 .unwrap_or_else(|poisoned| poisoned.into_inner()) 239 269 } 240 270 241 - fn staged(&self) -> BTreeMap<String, SecretScalar> { 271 + fn staged(&self) -> BTreeMap<SealedKeyId, SecretScalar> { 242 272 self.entries.read().expect("sealed store lock").clone() 243 273 } 244 274 245 275 fn commit_locked( 246 276 &self, 247 277 _guard: &MutexGuard<'_, ()>, 248 - staged: BTreeMap<String, SecretScalar>, 278 + staged: BTreeMap<SealedKeyId, SecretScalar>, 249 279 ) -> Result<(), SecretsError> { 250 280 let plaintext = encode_vault(&staged); 251 281 let sealed = seal(&self.enc_key, &plaintext, &*self.entropy); ··· 288 318 .map_err(|_| SecretsError::Decrypt) 289 319 } 290 320 291 - fn encode_vault(entries: &BTreeMap<String, SecretScalar>) -> Zeroizing<Vec<u8>> { 321 + fn encode_vault(entries: &BTreeMap<SealedKeyId, SecretScalar>) -> Zeroizing<Vec<u8>> { 292 322 let mut file = VaultFile { 293 323 version: VAULT_VERSION, 294 324 entries: entries 295 325 .iter() 296 - .map(|(did, scalar)| (did.clone(), STANDARD.encode(scalar.0.as_slice()))) 326 + .map(|(id, scalar)| { 327 + ( 328 + id.clone(), 329 + EncodedSecret::new(STANDARD.encode(scalar.0.as_slice())), 330 + ) 331 + }) 297 332 .collect(), 298 333 }; 299 334 let plaintext = Zeroizing::new(serde_json::to_vec(&file).expect("vault always serializes")); 300 - file.entries.values_mut().for_each(String::zeroize); 335 + file.entries 336 + .values_mut() 337 + .for_each(|encoded| encoded.0.zeroize()); 301 338 plaintext 302 339 } 303 340 304 - fn decode_vault(plaintext: &[u8]) -> Result<BTreeMap<String, SecretScalar>, SecretsError> { 341 + fn decode_vault(plaintext: &[u8]) -> Result<BTreeMap<SealedKeyId, SecretScalar>, SecretsError> { 305 342 let VaultFile { 306 343 version, 307 344 mut entries, 308 345 } = serde_json::from_slice(plaintext) 309 346 .map_err(|error| SecretsError::Malformed(error.to_string()))?; 310 347 if version != VAULT_VERSION { 311 - entries.values_mut().for_each(String::zeroize); 348 + entries.values_mut().for_each(|encoded| encoded.0.zeroize()); 312 349 return Err(SecretsError::Malformed(format!( 313 350 "unsupported vault version {version}" 314 351 ))); 315 352 } 316 353 entries 317 354 .into_iter() 318 - .map(|(did, encoded)| { 355 + .map(|(id, encoded)| { 356 + let EncodedSecret(encoded) = encoded; 319 357 let encoded = Zeroizing::new(encoded); 320 358 let bytes = Zeroizing::new( 321 359 STANDARD ··· 323 361 .map_err(|error| SecretsError::Malformed(error.to_string()))?, 324 362 ); 325 363 let scalar: [u8; SCALAR_LEN] = bytes.as_slice().try_into().map_err(|_| { 326 - SecretsError::Malformed(format!("scalar for {did} is not 32 bytes")) 364 + SecretsError::Malformed(format!("scalar for {} is not 32 bytes", id.as_str())) 327 365 })?; 328 366 k256::ecdsa::SigningKey::from_slice(&scalar).map_err(|_| { 329 - SecretsError::Malformed(format!("scalar for {did} is not a valid key")) 367 + SecretsError::Malformed(format!("scalar for {} is not a valid key", id.as_str())) 330 368 })?; 331 - Ok((did, SecretScalar(scalar))) 369 + Ok((id, SecretScalar(scalar))) 332 370 }) 333 371 .collect() 334 372 } ··· 411 449 Box::new(SeededEntropy::new(seed)) 412 450 } 413 451 414 - fn master() -> [u8; 32] { 415 - [7u8; 32] 452 + fn master() -> MasterKey { 453 + MasterKey::new([7u8; 32]).unwrap() 416 454 } 417 455 418 456 fn rid(value: &str) -> RepoDid { ··· 486 524 } 487 525 488 526 #[test] 489 - fn a_short_master_key_is_refused_at_open() { 490 - let dir = tempfile::tempdir().unwrap(); 527 + fn a_short_master_key_is_refused_at_construction() { 491 528 assert!(matches!( 492 - SealedStore::open(dir.path().join("k"), &[7u8; 31], entropy(1)), 529 + MasterKey::new([7u8; 31]), 493 530 Err(SecretsError::WeakMasterKey { len: 31 }) 494 531 )); 532 + } 533 + 534 + #[test] 535 + fn a_full_length_master_key_is_accepted() { 536 + assert!(MasterKey::new([7u8; 32]).is_ok()); 537 + } 538 + 539 + #[test] 540 + fn the_master_key_debug_redacts_its_bytes() { 541 + let rendered = format!("{:?}", MasterKey::new([7u8; 32]).unwrap()); 542 + assert_eq!(rendered, "MasterKey(<redacted>)"); 543 + assert!(!rendered.contains('7')); 495 544 } 496 545 497 546 #[test] ··· 503 552 .ensure(&rid("did:web:nel.pet")) 504 553 .unwrap(); 505 554 assert!(matches!( 506 - SealedStore::open(&path, &[9u8; 32], entropy(1)), 555 + SealedStore::open(&path, &MasterKey::new([9u8; 32]).unwrap(), entropy(1)), 507 556 Err(SecretsError::Decrypt) 508 557 )); 509 558 }
+10 -4
crates/knot-xrpc/src/forks.rs
··· 11 11 use knot_events::Reservation; 12 12 use knot_git::{Filter, GitError, RefUpdate, Repo, Staging}; 13 13 use knot_index::Resolved; 14 - use knot_pack::{FetchError, PackLimits, UpstreamRefs}; 14 + use knot_pack::{FetchError, HaveOids, PackLimits, UpstreamRefs, WantOids}; 15 15 use knot_postreceive::{Actor, Ci}; 16 16 use knot_runtime::{Clock, HttpTransport}; 17 17 use knot_types::{Oid, OwnerDid, RefName, RepoDid}; ··· 136 136 pub(crate) async fn upstream_pack<H: HttpTransport, C: Clock>( 137 137 state: &XrpcState<H, C>, 138 138 upstream: &Upstream, 139 - wants: Vec<Oid>, 140 - haves: Vec<Oid>, 139 + wants: WantOids, 140 + haves: HaveOids, 141 141 ) -> Result<Vec<u8>, XrpcError> { 142 142 let cap = state.fork_max_pack_bytes; 143 143 match upstream { ··· 330 330 let tip = refs 331 331 .find(branch) 332 332 .ok_or_else(|| XrpcError::not_found("upstream repository does not have that branch"))?; 333 - let pack = upstream_pack(state, &upstream, vec![tip], fork.haves.clone()).await?; 333 + let pack = upstream_pack( 334 + state, 335 + &upstream, 336 + WantOids::new(vec![tip]), 337 + HaveOids::new(fork.haves.clone()), 338 + ) 339 + .await?; 334 340 335 341 let layout = state.layout.clone(); 336 342 let opened = repo_did.clone();
+7 -1
crates/knot-xrpc/src/repos.rs
··· 342 342 "refs/tags/".to_string(), 343 343 ]; 344 344 let refs = crate::forks::upstream_refs(state, upstream, prefixes).await?; 345 - let pack = crate::forks::upstream_pack(state, upstream, refs.tips(), Vec::new()).await?; 345 + let pack = crate::forks::upstream_pack( 346 + state, 347 + upstream, 348 + knot_pack::WantOids::new(refs.tips()), 349 + knot_pack::HaveOids::default(), 350 + ) 351 + .await?; 346 352 let layout = state.layout.clone(); 347 353 let placed = repo_did.clone(); 348 354 let origin = raw.to_string();