This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 41 kB View raw
1use bobbin_types::edges::{ExtractError, Record}; 2use bobbin_types::legacy::{ 3 LegacyCollaborator, LegacyIssue, LegacyKnotMember, LegacyPublicKey, LegacyPull, LegacyRecord, 4 LegacyRefUpdate, LegacyRepo, LegacySource, LegacyStar, LegacyTarget, 5}; 6use bobbin_types::sh_tangled::feed::star::{Repo as StarRepo, Star, StarString, StarSubject}; 7use bobbin_types::sh_tangled::git::ref_update::RefUpdate; 8use bobbin_types::sh_tangled::knot::member::Member as KnotMember; 9use bobbin_types::sh_tangled::public_key::PublicKey; 10use bobbin_types::sh_tangled::repo::Repo; 11use bobbin_types::sh_tangled::repo::collaborator::Collaborator; 12use bobbin_types::sh_tangled::repo::issue::Issue; 13use bobbin_types::sh_tangled::repo::pull::{Pull, Round, Source, Target}; 14use jacquard_common::types::did::Did; 15use jacquard_common::types::nsid::Nsid; 16use jacquard_common::types::string::AtUri; 17use jacquard_common::{BosStr, DefaultStr}; 18 19use crate::normalize::{is_repo_at_uri, resolve_repo_uri}; 20use crate::{RepoIdResolver, Resolution}; 21use jacquard_common::IntoStatic; 22use jacquard_common::types::ident::AtIdentifier; 23use jacquard_common::types::recordkey::Rkey; 24 25#[derive(Debug)] 26pub enum DecodedRecord { 27 Canon(Record), 28 Legacy(LegacyRecord), 29} 30 31impl DecodedRecord { 32 pub fn try_decode<S: BosStr + AsRef<str>>( 33 nsid: &Nsid<S>, 34 bytes: &[u8], 35 ) -> Result<Self, ExtractError> { 36 match Record::from_json_bytes(nsid, bytes) { 37 Ok(record) => Ok(Self::Canon(record)), 38 Err(canon_err) => { 39 let normalized = normalize_record_fields(bytes); 40 let working: &[u8] = normalized.as_deref().unwrap_or(bytes); 41 if normalized.is_some() 42 && let Ok(record) = Record::from_json_bytes(nsid, working) 43 { 44 return Ok(Self::Canon(record)); 45 } 46 if let Some(scrubbed) = scrub_record_bytes(nsid, working) { 47 if let Ok(record) = Record::from_json_bytes(nsid, &scrubbed) { 48 return Ok(Self::Canon(record)); 49 } 50 if let Ok(legacy) = LegacyRecord::from_json_bytes(nsid, &scrubbed) { 51 return Ok(Self::Legacy(legacy)); 52 } 53 } 54 match LegacyRecord::from_json_bytes(nsid, working) { 55 Ok(legacy) => Ok(Self::Legacy(legacy)), 56 Err(_) => Err(canon_err), 57 } 58 } 59 } 60 } 61} 62 63pub fn normalize_record_fields(bytes: &[u8]) -> Option<alloc::vec::Vec<u8>> { 64 let value: serde_json::Value = serde_json::from_slice(bytes).ok()?; 65 let reserialized = serde_json::to_vec(&value).ok()?; 66 (reserialized.as_slice() != bytes).then_some(reserialized) 67} 68 69pub fn synthesize_created_at(bytes: &[u8], fallback_rfc3339: &str) -> Option<alloc::vec::Vec<u8>> { 70 let mut value: serde_json::Value = serde_json::from_slice(bytes).ok()?; 71 let obj = value.as_object_mut()?; 72 let needs_fill = match obj.get("createdAt") { 73 None => true, 74 Some(serde_json::Value::String(s)) if s.is_empty() => true, 75 _ => false, 76 }; 77 if !needs_fill { 78 return None; 79 } 80 obj.insert( 81 "createdAt".to_owned(), 82 serde_json::Value::String(fallback_rfc3339.to_owned()), 83 ); 84 serde_json::to_vec(&value).ok() 85} 86 87#[derive(Clone, Copy, Debug)] 88enum FieldRule { 89 DropIfEmptyString, 90 NullToEmptyArray, 91} 92 93fn scrub_rules(nsid: &str) -> &'static [(&'static str, FieldRule)] { 94 match nsid { 95 "sh.tangled.actor.profile" => &[("preferredHandle", FieldRule::DropIfEmptyString)], 96 "sh.tangled.label.op" => &[ 97 ("add", FieldRule::NullToEmptyArray), 98 ("delete", FieldRule::NullToEmptyArray), 99 ], 100 "sh.tangled.repo.pull" => &[("rounds", FieldRule::NullToEmptyArray)], 101 _ => &[], 102 } 103} 104 105pub fn scrub_record_bytes<S: BosStr + AsRef<str>>( 106 nsid: &Nsid<S>, 107 bytes: &[u8], 108) -> Option<alloc::vec::Vec<u8>> { 109 let rules = scrub_rules(nsid.as_ref()); 110 if rules.is_empty() { 111 return None; 112 } 113 let value: serde_json::Value = serde_json::from_slice(bytes).ok()?; 114 let mut obj = value.as_object()?.clone(); 115 let touched: alloc::vec::Vec<(&str, FieldRule)> = rules 116 .iter() 117 .filter_map(|(field, rule)| match (rule, obj.get(*field)) { 118 (FieldRule::DropIfEmptyString, Some(serde_json::Value::String(s))) if s.is_empty() => { 119 Some((*field, *rule)) 120 } 121 (FieldRule::NullToEmptyArray, Some(serde_json::Value::Null)) => Some((*field, *rule)), 122 _ => None, 123 }) 124 .collect(); 125 if touched.is_empty() { 126 return None; 127 } 128 touched.iter().for_each(|(field, rule)| match rule { 129 FieldRule::DropIfEmptyString => { 130 obj.remove(*field); 131 } 132 FieldRule::NullToEmptyArray => { 133 obj.insert( 134 (*field).to_owned(), 135 serde_json::Value::Array(alloc::vec::Vec::new()), 136 ); 137 } 138 }); 139 tracing::debug!(nsid = %nsid.as_ref(), ?touched, "scrubbing fields before record retry"); 140 serde_json::to_vec(&serde_json::Value::Object(obj)).ok() 141} 142 143async fn upgrade_repo_did( 144 resolver: &RepoIdResolver, 145 at_uri: Option<AtUri<DefaultStr>>, 146 explicit_did: Option<Did<DefaultStr>>, 147) -> Option<Did<DefaultStr>> { 148 if let Some(d) = explicit_did { 149 return Some(d); 150 } 151 let uri = at_uri?; 152 resolve_repo_uri(resolver, &uri).await 153} 154 155pub async fn upgrade_wire_bytes<S: BosStr + AsRef<str>>( 156 nsid: &Nsid<S>, 157 bytes: &[u8], 158 resolver: &RepoIdResolver, 159) -> Result<alloc::vec::Vec<u8>, ExtractError> { 160 let legacy = LegacyRecord::from_json_bytes(nsid, bytes)?; 161 let canon = upgrade(legacy, resolver) 162 .await 163 .ok_or_else(|| upgrade_failed(nsid))?; 164 serialize_canon_variant(&canon).map_err(ExtractError::DecodeJson) 165} 166 167fn upgrade_failed<S: BosStr + AsRef<str>>(nsid: &Nsid<S>) -> ExtractError { 168 ExtractError::UnknownCollection(alloc::format!("{}: legacy upgrade failed", nsid.as_ref())) 169} 170 171pub async fn decode_canon_or_upgrade<S: BosStr + AsRef<str>>( 172 nsid: &Nsid<S>, 173 bytes: &[u8], 174 resolver: &RepoIdResolver, 175) -> Result<Record, ExtractError> { 176 match DecodedRecord::try_decode(nsid, bytes)? { 177 DecodedRecord::Canon(r) => Ok(r), 178 DecodedRecord::Legacy(legacy) => upgrade(legacy, resolver) 179 .await 180 .ok_or_else(|| upgrade_failed(nsid)), 181 } 182} 183 184pub async fn decode_canon_or_upgrade_bytes<'a, S: BosStr + AsRef<str>>( 185 nsid: &Nsid<S>, 186 bytes: &'a [u8], 187 resolver: &RepoIdResolver, 188) -> Result<(Record, alloc::borrow::Cow<'a, [u8]>), ExtractError> { 189 let decoded = DecodedRecord::try_decode(nsid, bytes)?; 190 match decoded { 191 DecodedRecord::Canon(r) => Ok((r, alloc::borrow::Cow::Borrowed(bytes))), 192 DecodedRecord::Legacy(legacy) => { 193 let canon = upgrade(legacy, resolver) 194 .await 195 .ok_or_else(|| upgrade_failed(nsid))?; 196 let canon_bytes = serialize_canon_variant(&canon).map_err(ExtractError::DecodeJson)?; 197 Ok((canon, alloc::borrow::Cow::Owned(canon_bytes))) 198 } 199 } 200} 201 202fn serialize_canon_variant(record: &Record) -> Result<alloc::vec::Vec<u8>, serde_json::Error> { 203 match record { 204 Record::Issue(r) => serde_json::to_vec(r), 205 Record::Pull(r) => serde_json::to_vec(r), 206 Record::Collaborator(r) => serde_json::to_vec(r), 207 Record::RefUpdate(r) => serde_json::to_vec(r), 208 Record::Star(r) => serde_json::to_vec(r), 209 Record::PublicKey(r) => serde_json::to_vec(r), 210 Record::Repo(r) => serde_json::to_vec(r), 211 Record::KnotMember(r) => serde_json::to_vec(r), 212 _ => unreachable!( 213 "upgrade only produces Issue/Pull/Collaborator/RefUpdate/Star/PublicKey/Repo/KnotMember" 214 ), 215 } 216} 217 218pub async fn upgrade(legacy: LegacyRecord, resolver: &RepoIdResolver) -> Option<Record> { 219 match legacy { 220 LegacyRecord::Issue(l) => upgrade_issue(l, resolver).await.map(Record::Issue), 221 LegacyRecord::Pull(l) => upgrade_pull(l, resolver).await.map(Record::Pull), 222 LegacyRecord::Collaborator(l) => upgrade_collaborator(l, resolver) 223 .await 224 .map(Record::Collaborator), 225 LegacyRecord::RefUpdate(l) => Some(Record::RefUpdate(upgrade_ref_update(l))), 226 LegacyRecord::Star(l) => upgrade_star(l, resolver).await.map(Record::Star), 227 LegacyRecord::PublicKey(l) => Some(Record::PublicKey(upgrade_public_key(l))), 228 LegacyRecord::Repo(l) => Some(Record::Repo(upgrade_repo(l))), 229 LegacyRecord::KnotMember(l) => Some(Record::KnotMember(upgrade_knot_member(l))), 230 } 231} 232 233async fn upgrade_issue( 234 l: LegacyIssue<DefaultStr>, 235 resolver: &RepoIdResolver, 236) -> Option<Issue<DefaultStr>> { 237 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await?; 238 Some(Issue { 239 created_at: l.created_at, 240 body: l.body, 241 mentions: l.mentions, 242 references: l.references, 243 repo, 244 title: l.title, 245 extra_data: l.extra_data, 246 }) 247} 248 249async fn upgrade_target( 250 l: LegacyTarget<DefaultStr>, 251 resolver: &RepoIdResolver, 252) -> Option<Target<DefaultStr>> { 253 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await?; 254 Some(Target { 255 branch: l.branch, 256 repo, 257 extra_data: None, 258 }) 259} 260 261async fn upgrade_source( 262 l: LegacySource<DefaultStr>, 263 resolver: &RepoIdResolver, 264) -> Source<DefaultStr> { 265 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await; 266 Source { 267 branch: l.branch, 268 repo, 269 extra_data: None, 270 } 271} 272 273async fn upgrade_pull( 274 l: LegacyPull<DefaultStr>, 275 resolver: &RepoIdResolver, 276) -> Option<Pull<DefaultStr>> { 277 let target = upgrade_target(l.target, resolver).await?; 278 let source = match l.source { 279 Some(s) => Some(upgrade_source(s, resolver).await), 280 None => None, 281 }; 282 let rounds = if l.rounds.is_empty() { 283 l.patch_blob 284 .map(|patch_blob| { 285 alloc::vec![Round { 286 created_at: l.created_at.clone(), 287 patch_blob, 288 extra_data: None, 289 }] 290 }) 291 .unwrap_or_default() 292 } else { 293 l.rounds 294 }; 295 Some(Pull { 296 created_at: l.created_at, 297 body: l.body, 298 dependent_on: l.dependent_on, 299 mentions: l.mentions, 300 references: l.references, 301 rounds, 302 source, 303 target, 304 title: l.title, 305 extra_data: l.extra_data, 306 }) 307} 308 309fn upgrade_public_key(l: LegacyPublicKey<DefaultStr>) -> PublicKey<DefaultStr> { 310 PublicKey { 311 created_at: l.created, 312 key: l.key, 313 name: l.name, 314 extra_data: l.extra_data, 315 } 316} 317 318fn upgrade_repo(l: LegacyRepo<DefaultStr>) -> Repo<DefaultStr> { 319 let _ = l.owner; 320 Repo { 321 created_at: l.added_at, 322 description: l.description, 323 knot: l.knot, 324 labels: None, 325 name: l.name, 326 repo_did: None, 327 source: None, 328 spindle: None, 329 topics: None, 330 website: None, 331 extra_data: l.extra_data, 332 } 333} 334 335fn upgrade_knot_member(l: LegacyKnotMember<DefaultStr>) -> KnotMember<DefaultStr> { 336 KnotMember { 337 created_at: l.added_at, 338 domain: l.domain, 339 subject: l.member, 340 extra_data: l.extra_data, 341 } 342} 343 344async fn upgrade_collaborator( 345 l: LegacyCollaborator<DefaultStr>, 346 resolver: &RepoIdResolver, 347) -> Option<Collaborator<DefaultStr>> { 348 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await?; 349 Some(Collaborator { 350 created_at: l.created_at, 351 repo, 352 subject: l.subject, 353 extra_data: l.extra_data, 354 }) 355} 356 357fn upgrade_ref_update(l: LegacyRefUpdate<DefaultStr>) -> RefUpdate<DefaultStr> { 358 RefUpdate { 359 committer_did: l.committer_did, 360 meta: l.meta, 361 new_sha: l.new_sha, 362 old_sha: l.old_sha, 363 owner_did: l.owner_did, 364 r#ref: l.r#ref, 365 repo: l.repo_did, 366 extra_data: l.extra_data, 367 } 368} 369 370async fn upgrade_star( 371 l: LegacyStar<DefaultStr>, 372 resolver: &RepoIdResolver, 373) -> Option<Star<DefaultStr>> { 374 let subject = if let Some(did) = l.subject_did { 375 StarSubject::Repo(alloc::boxed::Box::new(StarRepo { 376 did, 377 extra_data: None, 378 })) 379 } else { 380 let uri = l.subject?; 381 let resolved = if is_repo_at_uri(&uri) { 382 cached_repo_did(resolver, &uri).await 383 } else { 384 None 385 }; 386 match resolved { 387 Some(did) => StarSubject::Repo(alloc::boxed::Box::new(StarRepo { 388 did, 389 extra_data: None, 390 })), 391 None => StarSubject::String(alloc::boxed::Box::new(StarString { 392 uri, 393 extra_data: None, 394 })), 395 } 396 }; 397 Some(Star { 398 created_at: l.created_at, 399 subject, 400 extra_data: l.extra_data, 401 }) 402} 403 404async fn cached_repo_did( 405 resolver: &RepoIdResolver, 406 uri: &jacquard_common::types::string::AtUri<DefaultStr>, 407) -> Option<Did<DefaultStr>> { 408 let owner = match uri.authority() { 409 AtIdentifier::Did(d) => d.clone().into_static(), 410 AtIdentifier::Handle(_) => return None, 411 }; 412 let rkey: Rkey<DefaultStr> = uri.rkey()?.clone().into_static(); 413 match resolver.cached_resolution(&owner, &rkey).await? { 414 Resolution::Mapped(did) => Some(did), 415 Resolution::NoRepoDid | Resolution::Unresolvable => None, 416 } 417} 418 419extern crate alloc; 420 421#[cfg(test)] 422mod tests { 423 use super::*; 424 use crate::RepoIdResolver; 425 use bobbin_runtime::RuntimeHasher; 426 use bobbin_types::edges::Record; 427 use jacquard_common::DefaultStr; 428 use jacquard_common::types::did::Did; 429 use jacquard_common::types::recordkey::Rkey; 430 431 fn did(s: &str) -> Did<DefaultStr> { 432 Did::new_owned(s).unwrap() 433 } 434 435 fn rkey(s: &str) -> Rkey<DefaultStr> { 436 Rkey::new_owned(s).unwrap() 437 } 438 439 fn nsid(s: &'static str) -> Nsid<DefaultStr> { 440 Nsid::new_static(s).unwrap() 441 } 442 443 #[test] 444 fn legacy_decode_routes_through_try_decode_for_known_nsids() { 445 let json = br#"{"$type":"sh.tangled.repo.issue","repoDid":"did:plc:squid","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#; 446 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.issue"), json) 447 .expect("legacy issue must decode"); 448 assert!(matches!( 449 decoded, 450 DecodedRecord::Legacy(LegacyRecord::Issue(_)) 451 )); 452 } 453 454 #[test] 455 fn canon_decode_wins_when_wire_matches_new_shape() { 456 let json = br#"{"$type":"sh.tangled.repo.issue","repo":"did:plc:squid","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#; 457 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.issue"), json) 458 .expect("canon issue must decode"); 459 match decoded { 460 DecodedRecord::Canon(Record::Issue(i)) => { 461 assert_eq!(i.repo.as_ref(), "did:plc:squid") 462 } 463 other => panic!("expected canon issue, got {other:?}"), 464 } 465 } 466 467 #[test] 468 fn scrub_returns_none_for_unknown_nsid() { 469 let json = br#"{"$type":"sh.tangled.repo.issue","preferredHandle":""}"#; 470 assert!(scrub_record_bytes(&nsid("sh.tangled.repo.issue"), json).is_none()); 471 } 472 473 #[test] 474 fn scrub_returns_none_when_target_field_is_non_empty() { 475 let json = 476 br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":"nel.pet"}"#; 477 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json).is_none()); 478 } 479 480 #[test] 481 fn scrub_returns_none_when_target_field_is_absent() { 482 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true}"#; 483 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json).is_none()); 484 } 485 486 #[test] 487 fn scrub_returns_none_for_non_string_value() { 488 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":42}"#; 489 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json).is_none()); 490 } 491 492 #[test] 493 fn scrub_returns_none_for_non_object_json() { 494 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"[]").is_none()); 495 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"null").is_none()); 496 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"123").is_none()); 497 } 498 499 #[test] 500 fn scrub_returns_none_for_invalid_json() { 501 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"{not json").is_none()); 502 } 503 504 #[test] 505 fn scrub_drops_empty_preferred_handle_and_preserves_other_fields() { 506 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":"","description":"hi"}"#; 507 let scrubbed = scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json) 508 .expect("empty preferredHandle must trigger scrub"); 509 let value: serde_json::Value = serde_json::from_slice(&scrubbed).expect("valid json"); 510 let obj = value.as_object().expect("object"); 511 assert!(!obj.contains_key("preferredHandle")); 512 assert_eq!(obj.get("bluesky"), Some(&serde_json::json!(true))); 513 assert_eq!(obj.get("description"), Some(&serde_json::json!("hi"))); 514 assert_eq!( 515 obj.get("$type"), 516 Some(&serde_json::json!("sh.tangled.actor.profile")) 517 ); 518 } 519 520 #[test] 521 fn scrub_replaces_null_arrays_with_empty_for_label_op() { 522 let json = br#"{"$type":"sh.tangled.label.op","add":[{"key":"at://did:plc:limpet/sh.tangled.label.definition/k","value":"v"}],"delete":null,"performedAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:limpet/sh.tangled.repo.issue/3aaa"}"#; 523 let scrubbed = scrub_record_bytes(&nsid("sh.tangled.label.op"), json) 524 .expect("null delete must trigger scrub"); 525 let value: serde_json::Value = serde_json::from_slice(&scrubbed).expect("valid json"); 526 let obj = value.as_object().expect("object"); 527 assert_eq!(obj.get("delete"), Some(&serde_json::json!([]))); 528 assert!(obj.get("add").is_some_and(|v| v.is_array())); 529 } 530 531 #[test] 532 fn scrub_passes_through_when_label_op_arrays_are_non_null() { 533 let json = br#"{"$type":"sh.tangled.label.op","add":[],"delete":[],"performedAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:limpet/sh.tangled.repo.issue/3aaa"}"#; 534 assert!(scrub_record_bytes(&nsid("sh.tangled.label.op"), json).is_none()); 535 } 536 537 #[test] 538 fn try_decode_recovers_label_op_with_null_delete() { 539 let json = br#"{"$type":"sh.tangled.label.op","add":[{"key":"at://did:plc:limpet/sh.tangled.label.definition/k","value":"v"}],"delete":null,"performedAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:limpet/sh.tangled.repo.issue/3aaa"}"#; 540 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.label.op"), json) 541 .expect("label.op with null delete must scrub-recover"); 542 assert!(matches!(decoded, DecodedRecord::Canon(Record::LabelOp(_)))); 543 } 544 545 #[test] 546 fn try_decode_recovers_profile_with_empty_preferred_handle() { 547 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":"","description":"hi"}"#; 548 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.actor.profile"), json) 549 .expect("profile with empty preferredHandle must scrub-recover"); 550 match decoded { 551 DecodedRecord::Canon(Record::Profile(p)) => { 552 assert!(p.preferred_handle.is_none()); 553 assert_eq!(p.description.as_deref(), Some("hi")); 554 } 555 other => panic!("expected canon profile, got {other:?}"), 556 } 557 } 558 559 #[test] 560 fn legacy_decode_passes_through_for_unaffected_nsids() { 561 let json = br#"{"$type":"sh.tangled.graph.follow","subject":"did:plc:bailey","createdAt":"2026-05-01T00:00:00Z"}"#; 562 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.graph.follow"), json) 563 .expect("follow has no legacy form, must decode canon"); 564 assert!(matches!(decoded, DecodedRecord::Canon(Record::Follow(_)))); 565 } 566 567 #[test] 568 fn normalize_returns_none_for_invalid_json() { 569 assert!(normalize_record_fields(b"{not json").is_none()); 570 } 571 572 #[test] 573 fn normalize_drops_repeated_dollar_type_key() { 574 let json = 575 br#"{"$type":"sh.tangled.repo.pull","title":"t","$type":"sh.tangled.repo.pull"}"#; 576 let normalized = 577 normalize_record_fields(json).expect("duplicate key must produce normalized output"); 578 let value: serde_json::Value = serde_json::from_slice(&normalized).expect("valid json"); 579 let obj = value.as_object().expect("object"); 580 assert_eq!(obj.len(), 2); 581 assert_eq!( 582 obj.get("$type"), 583 Some(&serde_json::json!("sh.tangled.repo.pull")) 584 ); 585 assert_eq!(obj.get("title"), Some(&serde_json::json!("t"))); 586 } 587 588 #[test] 589 fn normalize_keeps_last_value_for_repeated_keys() { 590 let json = br#"{"$type":"sh.tangled.repo.issue","$type":"sh.tangled.repo.pull"}"#; 591 let normalized = 592 normalize_record_fields(json).expect("duplicate key must produce normalized output"); 593 let value: serde_json::Value = serde_json::from_slice(&normalized).expect("valid json"); 594 assert_eq!( 595 value.get("$type"), 596 Some(&serde_json::json!("sh.tangled.repo.pull")), 597 ); 598 } 599 600 #[test] 601 fn synthesize_fills_empty_created_at_with_fallback() { 602 let json = br#"{"$type":"sh.tangled.repo.issue","title":"meow","createdAt":""}"#; 603 let patched = synthesize_created_at(json, "2026-05-01T00:00:00.000000Z") 604 .expect("empty createdAt must be filled"); 605 let value: serde_json::Value = serde_json::from_slice(&patched).expect("valid json"); 606 assert_eq!( 607 value.get("createdAt"), 608 Some(&serde_json::json!("2026-05-01T00:00:00.000000Z")), 609 ); 610 } 611 612 #[test] 613 fn synthesize_fills_missing_created_at_with_fallback() { 614 let json = br#"{"$type":"sh.tangled.repo.issue","title":"meow"}"#; 615 let patched = synthesize_created_at(json, "2026-05-01T00:00:00.000000Z") 616 .expect("missing createdAt must be filled"); 617 let value: serde_json::Value = serde_json::from_slice(&patched).expect("valid json"); 618 assert_eq!( 619 value.get("createdAt"), 620 Some(&serde_json::json!("2026-05-01T00:00:00.000000Z")), 621 ); 622 } 623 624 #[test] 625 fn synthesize_returns_none_when_created_at_already_set() { 626 let json = br#"{"$type":"sh.tangled.repo.issue","createdAt":"2026-05-01T00:00:00Z"}"#; 627 assert!(synthesize_created_at(json, "2026-04-01T00:00:00Z").is_none()); 628 } 629 630 #[test] 631 fn synthesize_returns_none_for_non_string_created_at() { 632 let json = br#"{"$type":"sh.tangled.repo.issue","createdAt":null}"#; 633 assert!(synthesize_created_at(json, "2026-04-01T00:00:00Z").is_none()); 634 } 635 636 #[tokio::test] 637 async fn try_decode_recovers_legacy_issue_after_synthesized_created_at() { 638 let json = br#"{"$type":"sh.tangled.repo.issue","body":"a bug","createdAt":"","repo":"at://did:plc:scallop/sh.tangled.repo/limpet","repoDid":"did:plc:scallop","title":"a bug"}"#; 639 let patched = synthesize_created_at(json, "2025-08-01T12:00:00.000000Z") 640 .expect("empty createdAt must be filled"); 641 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.issue"), &patched) 642 .expect("issue must legacy-decode after createdAt fill"); 643 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 644 let canon = match decoded { 645 DecodedRecord::Canon(r) => r, 646 DecodedRecord::Legacy(l) => upgrade(l, &resolver).await.expect("upgrade"), 647 }; 648 match canon { 649 Record::Issue(i) => { 650 assert_eq!(AsRef::<str>::as_ref(&i.title), "a bug"); 651 assert_eq!(i.repo.as_ref(), "did:plc:scallop"); 652 } 653 other => panic!("expected issue, got {other:?}"), 654 } 655 } 656 657 #[tokio::test] 658 async fn try_decode_recovers_canon_pull_with_duplicate_dollar_type() { 659 let json = br#"{"$type":"sh.tangled.repo.pull","createdAt":"2026-05-01T00:00:00Z","title":"meow","target":{"branch":"main","repo":"at://did:plc:scallop/sh.tangled.repo/limpet"},"rounds":[],"$type":"sh.tangled.repo.pull"}"#; 660 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.pull"), json) 661 .expect("duplicate $type pull must normalize-recover"); 662 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 663 resolver 664 .observe( 665 did("did:plc:scallop"), 666 rkey("limpet"), 667 Some(did("did:plc:scallop")), 668 ) 669 .await; 670 let canon = match decoded { 671 DecodedRecord::Canon(r) => r, 672 DecodedRecord::Legacy(l) => upgrade(l, &resolver).await.expect("upgrade"), 673 }; 674 match canon { 675 Record::Pull(p) => assert_eq!(AsRef::<str>::as_ref(&p.title), "meow"), 676 other => panic!("expected pull, got {other:?}"), 677 } 678 } 679 680 #[tokio::test] 681 async fn upgrade_issue_uses_repo_did_directly() { 682 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 683 let json = br#"{"$type":"sh.tangled.repo.issue","repoDid":"did:plc:scallop","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#; 684 let legacy = 685 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.issue"), json).expect("decode"); 686 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 687 match canon { 688 Record::Issue(i) => assert_eq!(i.repo, did("did:plc:scallop")), 689 other => panic!("expected canon issue, got {other:?}"), 690 } 691 } 692 693 #[tokio::test] 694 async fn upgrade_issue_resolves_repo_uri_via_observed_resolver() { 695 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 696 let owner = did("did:plc:nel"); 697 let key = rkey("abcabcabcabcz"); 698 resolver 699 .observe(owner.clone(), key.clone(), Some(did("did:plc:scallop"))) 700 .await; 701 let json = br#"{"$type":"sh.tangled.repo.issue","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#; 702 let legacy = 703 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.issue"), json).expect("decode"); 704 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 705 match canon { 706 Record::Issue(i) => assert_eq!(i.repo, did("did:plc:scallop")), 707 other => panic!("expected canon issue, got {other:?}"), 708 } 709 } 710 711 #[tokio::test] 712 async fn upgrade_issue_drops_when_resolver_cannot_map() { 713 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 714 let json = br#"{"$type":"sh.tangled.repo.issue","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#; 715 let legacy = 716 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.issue"), json).expect("decode"); 717 assert!( 718 upgrade(legacy, &resolver).await.is_none(), 719 "no resolver entry and no repoDid means the canon Did cannot be constructed", 720 ); 721 } 722 723 #[tokio::test] 724 async fn upgrade_pull_propagates_target_resolution() { 725 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 726 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","rounds":[],"target":{"branch":"main","repoDid":"did:plc:scallop"}}"#; 727 let legacy = 728 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode"); 729 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 730 match canon { 731 Record::Pull(p) => { 732 assert_eq!(p.target.repo, did("did:plc:scallop")); 733 assert!(p.source.is_none()); 734 } 735 other => panic!("expected canon pull, got {other:?}"), 736 } 737 } 738 739 #[tokio::test] 740 async fn upgrade_pull_pre_rounds_synthesizes_round_from_top_level_patch_blob() { 741 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 742 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","target":{"branch":"main","repoDid":"did:plc:scallop"},"patchBlob":{"$type":"blob","mimeType":"application/gzip","ref":{"$link":"bafkreibpatvbeajtwzlr4jwr4s2hnwo5l7sgdbfnqu6n7ctd2bcbtluw4a"},"size":920}}"#; 743 let legacy = 744 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode"); 745 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 746 match canon { 747 Record::Pull(p) => { 748 assert_eq!( 749 p.rounds.len(), 750 1, 751 "pre-rounds wire must yield exactly one synthesized round" 752 ); 753 assert_eq!( 754 p.rounds[0].patch_blob.blob().mime_type.as_ref(), 755 "application/gzip" 756 ); 757 assert_eq!(p.rounds[0].created_at, p.created_at); 758 } 759 other => panic!("expected canon pull, got {other:?}"), 760 } 761 } 762 763 #[tokio::test] 764 async fn upgrade_pull_omits_round_when_neither_rounds_nor_patch_blob_present() { 765 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 766 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","target":{"branch":"main","repoDid":"did:plc:scallop"}}"#; 767 let legacy = 768 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode"); 769 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 770 match canon { 771 Record::Pull(p) => assert!(p.rounds.is_empty()), 772 other => panic!("expected canon pull, got {other:?}"), 773 } 774 } 775 776 #[tokio::test] 777 async fn upgrade_public_key_renames_created_to_created_at() { 778 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 779 let json = br#"{"$type":"sh.tangled.publicKey","created":"2025-04-15T18:35:38Z","key":"ssh-ed25519 AAAA","name":"laptop"}"#; 780 let legacy = 781 LegacyRecord::from_json_bytes(&nsid("sh.tangled.publicKey"), json).expect("decode"); 782 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 783 match canon { 784 Record::PublicKey(k) => { 785 assert_eq!(k.created_at.as_str(), "2025-04-15T18:35:38Z"); 786 assert_eq!(k.key.as_str(), "ssh-ed25519 AAAA"); 787 assert_eq!(k.name.as_str(), "laptop"); 788 } 789 other => panic!("expected canon publicKey, got {other:?}"), 790 } 791 } 792 793 #[tokio::test] 794 async fn upgrade_repo_renames_added_at_and_drops_owner() { 795 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 796 let json = br#"{"$type":"sh.tangled.repo","addedAt":"2025-03-21T10:18:58Z","description":"hi","knot":"knot1.tangled.sh","name":"site","owner":"did:plc:nel"}"#; 797 let legacy = LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo"), json).expect("decode"); 798 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 799 match canon { 800 Record::Repo(r) => { 801 assert_eq!(r.created_at.as_str(), "2025-03-21T10:18:58Z"); 802 assert_eq!(r.description.as_deref(), Some("hi")); 803 assert_eq!(r.knot.as_str(), "knot1.tangled.sh"); 804 assert_eq!(r.name.as_deref(), Some("site")); 805 assert!(r.repo_did.is_none(), "legacy repos have no repo_did"); 806 } 807 other => panic!("expected canon repo, got {other:?}"), 808 } 809 } 810 811 #[tokio::test] 812 async fn upgrade_knot_member_renames_added_at_and_member() { 813 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 814 let json = br#"{"$type":"sh.tangled.knot.member","addedAt":"2025-03-31T05:14:09Z","domain":"knot.example","member":"did:plc:nel"}"#; 815 let legacy = 816 LegacyRecord::from_json_bytes(&nsid("sh.tangled.knot.member"), json).expect("decode"); 817 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 818 match canon { 819 Record::KnotMember(m) => { 820 assert_eq!(m.created_at.as_str(), "2025-03-31T05:14:09Z"); 821 assert_eq!(m.domain.as_str(), "knot.example"); 822 assert_eq!(m.subject, did("did:plc:nel")); 823 } 824 other => panic!("expected canon knot.member, got {other:?}"), 825 } 826 } 827 828 #[tokio::test] 829 async fn legacy_pull_target_with_empty_repo_did_treats_as_none() { 830 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 831 resolver 832 .observe( 833 did("did:plc:nel"), 834 rkey("abcabcabcabcz"), 835 Some(did("did:plc:scallop")), 836 ) 837 .await; 838 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","rounds":[],"target":{"branch":"main","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz","repoDid":""}}"#; 839 let legacy = 840 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode"); 841 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 842 match canon { 843 Record::Pull(p) => assert_eq!(p.target.repo, did("did:plc:scallop")), 844 other => panic!("expected canon pull, got {other:?}"), 845 } 846 } 847 848 #[tokio::test] 849 async fn try_decode_recovers_publickey_with_created_field() { 850 let json = br#"{"$type":"sh.tangled.publicKey","created":"2025-04-15T18:35:38Z","key":"k","name":"n"}"#; 851 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.publicKey"), json) 852 .expect("legacy publicKey must decode"); 853 assert!(matches!( 854 decoded, 855 DecodedRecord::Legacy(LegacyRecord::PublicKey(_)) 856 )); 857 } 858 859 #[tokio::test] 860 async fn try_decode_recovers_repo_with_added_at_field() { 861 let json = br#"{"$type":"sh.tangled.repo","addedAt":"2025-03-21T10:18:58Z","knot":"knot1.tangled.sh","owner":"did:plc:nel"}"#; 862 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo"), json) 863 .expect("legacy repo must decode"); 864 assert!(matches!( 865 decoded, 866 DecodedRecord::Legacy(LegacyRecord::Repo(_)) 867 )); 868 } 869 870 #[tokio::test] 871 async fn upgrade_pull_source_repo_resolution_is_independent_of_target() { 872 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 873 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","rounds":[],"target":{"branch":"main","repoDid":"did:plc:scallop"},"source":{"branch":"feat","repo":"at://did:plc:nel/sh.tangled.repo/missing"}}"#; 874 let legacy = 875 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode"); 876 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 877 match canon { 878 Record::Pull(p) => { 879 assert_eq!(p.target.repo, did("did:plc:scallop")); 880 let source = p.source.expect("source struct retained"); 881 assert_eq!(source.branch.as_str(), "feat"); 882 assert!( 883 source.repo.is_none(), 884 "unresolvable source repo at-uri leaves the source.repo None rather than dropping the whole pull", 885 ); 886 } 887 other => panic!("expected canon pull, got {other:?}"), 888 } 889 } 890 891 #[tokio::test] 892 async fn upgrade_ref_update_renames_repo_did_to_repo() { 893 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 894 let json = br#"{"$type":"sh.tangled.git.refUpdate","ref":"refs/heads/main","committerDid":"did:plc:olaren","repoDid":"did:plc:scallop","oldSha":"0000000000000000000000000000000000000000","newSha":"1111111111111111111111111111111111111111","meta":{"isDefaultRef":true,"commitCount":{}}}"#; 895 let legacy = 896 LegacyRecord::from_json_bytes(&nsid("sh.tangled.git.refUpdate"), json).expect("decode"); 897 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 898 match canon { 899 Record::RefUpdate(r) => assert_eq!(r.repo, did("did:plc:scallop")), 900 other => panic!("expected canon ref update, got {other:?}"), 901 } 902 } 903 904 #[tokio::test] 905 async fn upgrade_star_prefers_subject_did_over_subject_uri() { 906 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 907 let json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:nel/sh.tangled.string/k1","subjectDid":"did:plc:scallop"}"#; 908 let legacy = 909 LegacyRecord::from_json_bytes(&nsid("sh.tangled.feed.star"), json).expect("decode"); 910 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 911 match canon { 912 Record::Star(s) => match s.subject { 913 StarSubject::Repo(r) => assert_eq!(r.did, did("did:plc:scallop")), 914 StarSubject::String(_) => panic!("subjectDid must win"), 915 }, 916 other => panic!("expected canon star, got {other:?}"), 917 } 918 } 919 920 #[tokio::test] 921 async fn upgrade_star_falls_back_to_string_when_repo_uri_not_in_cache() { 922 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 923 let json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"}"#; 924 let legacy = 925 LegacyRecord::from_json_bytes(&nsid("sh.tangled.feed.star"), json).expect("decode"); 926 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 927 match canon { 928 Record::Star(s) => match s.subject { 929 StarSubject::String(s) => assert_eq!( 930 s.uri.as_ref(), 931 "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz", 932 "cache-miss on repo uri preserves the uri under the #string variant for later normalization", 933 ), 934 StarSubject::Repo(_) => panic!("cold cache must not upgrade to Repo variant"), 935 }, 936 other => panic!("expected canon star, got {other:?}"), 937 } 938 } 939 940 #[tokio::test] 941 async fn upgrade_star_uses_cached_repo_did_when_observed() { 942 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 943 let owner = did("did:plc:nel"); 944 let key = rkey("abcabcabcabcz"); 945 resolver 946 .observe(owner.clone(), key.clone(), Some(did("did:plc:scallop"))) 947 .await; 948 let json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"}"#; 949 let legacy = 950 LegacyRecord::from_json_bytes(&nsid("sh.tangled.feed.star"), json).expect("decode"); 951 let canon = upgrade(legacy, &resolver).await.expect("upgrade"); 952 match canon { 953 Record::Star(s) => match s.subject { 954 StarSubject::Repo(r) => assert_eq!(r.did, did("did:plc:scallop")), 955 StarSubject::String(_) => panic!("observed cache must upgrade to Repo variant"), 956 }, 957 other => panic!("expected canon star, got {other:?}"), 958 } 959 } 960 961 #[tokio::test] 962 async fn upgrade_collaborator_requires_repo_did() { 963 let resolver = RepoIdResolver::detached(RuntimeHasher::default()); 964 let with_did = br#"{"$type":"sh.tangled.repo.collaborator","createdAt":"2026-05-01T00:00:00Z","subject":"did:plc:lyna","repoDid":"did:plc:scallop"}"#; 965 let canon = upgrade( 966 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.collaborator"), with_did) 967 .expect("decode"), 968 &resolver, 969 ) 970 .await 971 .expect("upgrade"); 972 match canon { 973 Record::Collaborator(c) => assert_eq!(c.repo, did("did:plc:scallop")), 974 other => panic!("expected canon collaborator, got {other:?}"), 975 } 976 977 let no_resolution = br#"{"$type":"sh.tangled.repo.collaborator","createdAt":"2026-05-01T00:00:00Z","subject":"did:plc:lyna","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"}"#; 978 let legacy = 979 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.collaborator"), no_resolution) 980 .expect("decode"); 981 assert!(upgrade(legacy, &resolver).await.is_none()); 982 } 983}