This repository has no description
1use bobbin_types::edges::{ExtractError, Record};
2use bobbin_types::legacy::{
3 LegacyCollaborator, LegacyIssue, LegacyKnotMember, LegacyPublicKey, LegacyPull, LegacyRecord,
4 LegacyRefUpdate, LegacyRepo, LegacySource, LegacyStar, LegacyTarget,
5};
6use bobbin_types::sh_tangled::feed::star::{Repo as StarRepo, Star, StarString, StarSubject};
7use bobbin_types::sh_tangled::git::ref_update::RefUpdate;
8use bobbin_types::sh_tangled::knot::member::Member as KnotMember;
9use bobbin_types::sh_tangled::public_key::PublicKey;
10use bobbin_types::sh_tangled::repo::Repo;
11use bobbin_types::sh_tangled::repo::collaborator::Collaborator;
12use bobbin_types::sh_tangled::repo::issue::Issue;
13use bobbin_types::sh_tangled::repo::pull::{Pull, Round, Source, Target};
14use jacquard_common::types::did::Did;
15use jacquard_common::types::nsid::Nsid;
16use jacquard_common::types::string::AtUri;
17use jacquard_common::{BosStr, DefaultStr};
18
19use crate::normalize::{is_repo_at_uri, resolve_repo_uri};
20use crate::{RepoIdResolver, Resolution};
21use jacquard_common::IntoStatic;
22use jacquard_common::types::ident::AtIdentifier;
23use jacquard_common::types::recordkey::Rkey;
24
25#[derive(Debug)]
26pub enum DecodedRecord {
27 Canon(Record),
28 Legacy(LegacyRecord),
29}
30
31impl DecodedRecord {
32 pub fn try_decode<S: BosStr + AsRef<str>>(
33 nsid: &Nsid<S>,
34 bytes: &[u8],
35 ) -> Result<Self, ExtractError> {
36 match Record::from_json_bytes(nsid, bytes) {
37 Ok(record) => Ok(Self::Canon(record)),
38 Err(canon_err) => {
39 let normalized = normalize_record_fields(bytes);
40 let working: &[u8] = normalized.as_deref().unwrap_or(bytes);
41 if normalized.is_some()
42 && let Ok(record) = Record::from_json_bytes(nsid, working)
43 {
44 return Ok(Self::Canon(record));
45 }
46 if let Some(scrubbed) = scrub_record_bytes(nsid, working) {
47 if let Ok(record) = Record::from_json_bytes(nsid, &scrubbed) {
48 return Ok(Self::Canon(record));
49 }
50 if let Ok(legacy) = LegacyRecord::from_json_bytes(nsid, &scrubbed) {
51 return Ok(Self::Legacy(legacy));
52 }
53 }
54 match LegacyRecord::from_json_bytes(nsid, working) {
55 Ok(legacy) => Ok(Self::Legacy(legacy)),
56 Err(_) => Err(canon_err),
57 }
58 }
59 }
60 }
61}
62
63pub fn normalize_record_fields(bytes: &[u8]) -> Option<alloc::vec::Vec<u8>> {
64 let value: serde_json::Value = serde_json::from_slice(bytes).ok()?;
65 let reserialized = serde_json::to_vec(&value).ok()?;
66 (reserialized.as_slice() != bytes).then_some(reserialized)
67}
68
69pub fn synthesize_created_at(bytes: &[u8], fallback_rfc3339: &str) -> Option<alloc::vec::Vec<u8>> {
70 let mut value: serde_json::Value = serde_json::from_slice(bytes).ok()?;
71 let obj = value.as_object_mut()?;
72 let needs_fill = match obj.get("createdAt") {
73 None => true,
74 Some(serde_json::Value::String(s)) if s.is_empty() => true,
75 _ => false,
76 };
77 if !needs_fill {
78 return None;
79 }
80 obj.insert(
81 "createdAt".to_owned(),
82 serde_json::Value::String(fallback_rfc3339.to_owned()),
83 );
84 serde_json::to_vec(&value).ok()
85}
86
87#[derive(Clone, Copy, Debug)]
88enum FieldRule {
89 DropIfEmptyString,
90 NullToEmptyArray,
91}
92
93fn scrub_rules(nsid: &str) -> &'static [(&'static str, FieldRule)] {
94 match nsid {
95 "sh.tangled.actor.profile" => &[("preferredHandle", FieldRule::DropIfEmptyString)],
96 "sh.tangled.label.op" => &[
97 ("add", FieldRule::NullToEmptyArray),
98 ("delete", FieldRule::NullToEmptyArray),
99 ],
100 "sh.tangled.repo.pull" => &[("rounds", FieldRule::NullToEmptyArray)],
101 _ => &[],
102 }
103}
104
105pub fn scrub_record_bytes<S: BosStr + AsRef<str>>(
106 nsid: &Nsid<S>,
107 bytes: &[u8],
108) -> Option<alloc::vec::Vec<u8>> {
109 let rules = scrub_rules(nsid.as_ref());
110 if rules.is_empty() {
111 return None;
112 }
113 let value: serde_json::Value = serde_json::from_slice(bytes).ok()?;
114 let mut obj = value.as_object()?.clone();
115 let touched: alloc::vec::Vec<(&str, FieldRule)> = rules
116 .iter()
117 .filter_map(|(field, rule)| match (rule, obj.get(*field)) {
118 (FieldRule::DropIfEmptyString, Some(serde_json::Value::String(s))) if s.is_empty() => {
119 Some((*field, *rule))
120 }
121 (FieldRule::NullToEmptyArray, Some(serde_json::Value::Null)) => Some((*field, *rule)),
122 _ => None,
123 })
124 .collect();
125 if touched.is_empty() {
126 return None;
127 }
128 touched.iter().for_each(|(field, rule)| match rule {
129 FieldRule::DropIfEmptyString => {
130 obj.remove(*field);
131 }
132 FieldRule::NullToEmptyArray => {
133 obj.insert(
134 (*field).to_owned(),
135 serde_json::Value::Array(alloc::vec::Vec::new()),
136 );
137 }
138 });
139 tracing::debug!(nsid = %nsid.as_ref(), ?touched, "scrubbing fields before record retry");
140 serde_json::to_vec(&serde_json::Value::Object(obj)).ok()
141}
142
143async fn upgrade_repo_did(
144 resolver: &RepoIdResolver,
145 at_uri: Option<AtUri<DefaultStr>>,
146 explicit_did: Option<Did<DefaultStr>>,
147) -> Option<Did<DefaultStr>> {
148 if let Some(d) = explicit_did {
149 return Some(d);
150 }
151 let uri = at_uri?;
152 resolve_repo_uri(resolver, &uri).await
153}
154
155pub async fn upgrade_wire_bytes<S: BosStr + AsRef<str>>(
156 nsid: &Nsid<S>,
157 bytes: &[u8],
158 resolver: &RepoIdResolver,
159) -> Result<alloc::vec::Vec<u8>, ExtractError> {
160 let legacy = LegacyRecord::from_json_bytes(nsid, bytes)?;
161 let canon = upgrade(legacy, resolver)
162 .await
163 .ok_or_else(|| upgrade_failed(nsid))?;
164 serialize_canon_variant(&canon).map_err(ExtractError::DecodeJson)
165}
166
167fn upgrade_failed<S: BosStr + AsRef<str>>(nsid: &Nsid<S>) -> ExtractError {
168 ExtractError::UnknownCollection(alloc::format!("{}: legacy upgrade failed", nsid.as_ref()))
169}
170
171pub async fn decode_canon_or_upgrade<S: BosStr + AsRef<str>>(
172 nsid: &Nsid<S>,
173 bytes: &[u8],
174 resolver: &RepoIdResolver,
175) -> Result<Record, ExtractError> {
176 match DecodedRecord::try_decode(nsid, bytes)? {
177 DecodedRecord::Canon(r) => Ok(r),
178 DecodedRecord::Legacy(legacy) => upgrade(legacy, resolver)
179 .await
180 .ok_or_else(|| upgrade_failed(nsid)),
181 }
182}
183
184pub async fn decode_canon_or_upgrade_bytes<'a, S: BosStr + AsRef<str>>(
185 nsid: &Nsid<S>,
186 bytes: &'a [u8],
187 resolver: &RepoIdResolver,
188) -> Result<(Record, alloc::borrow::Cow<'a, [u8]>), ExtractError> {
189 let decoded = DecodedRecord::try_decode(nsid, bytes)?;
190 match decoded {
191 DecodedRecord::Canon(r) => Ok((r, alloc::borrow::Cow::Borrowed(bytes))),
192 DecodedRecord::Legacy(legacy) => {
193 let canon = upgrade(legacy, resolver)
194 .await
195 .ok_or_else(|| upgrade_failed(nsid))?;
196 let canon_bytes = serialize_canon_variant(&canon).map_err(ExtractError::DecodeJson)?;
197 Ok((canon, alloc::borrow::Cow::Owned(canon_bytes)))
198 }
199 }
200}
201
202fn serialize_canon_variant(record: &Record) -> Result<alloc::vec::Vec<u8>, serde_json::Error> {
203 match record {
204 Record::Issue(r) => serde_json::to_vec(r),
205 Record::Pull(r) => serde_json::to_vec(r),
206 Record::Collaborator(r) => serde_json::to_vec(r),
207 Record::RefUpdate(r) => serde_json::to_vec(r),
208 Record::Star(r) => serde_json::to_vec(r),
209 Record::PublicKey(r) => serde_json::to_vec(r),
210 Record::Repo(r) => serde_json::to_vec(r),
211 Record::KnotMember(r) => serde_json::to_vec(r),
212 _ => unreachable!(
213 "upgrade only produces Issue/Pull/Collaborator/RefUpdate/Star/PublicKey/Repo/KnotMember"
214 ),
215 }
216}
217
218pub async fn upgrade(legacy: LegacyRecord, resolver: &RepoIdResolver) -> Option<Record> {
219 match legacy {
220 LegacyRecord::Issue(l) => upgrade_issue(l, resolver).await.map(Record::Issue),
221 LegacyRecord::Pull(l) => upgrade_pull(l, resolver).await.map(Record::Pull),
222 LegacyRecord::Collaborator(l) => upgrade_collaborator(l, resolver)
223 .await
224 .map(Record::Collaborator),
225 LegacyRecord::RefUpdate(l) => Some(Record::RefUpdate(upgrade_ref_update(l))),
226 LegacyRecord::Star(l) => upgrade_star(l, resolver).await.map(Record::Star),
227 LegacyRecord::PublicKey(l) => Some(Record::PublicKey(upgrade_public_key(l))),
228 LegacyRecord::Repo(l) => Some(Record::Repo(upgrade_repo(l))),
229 LegacyRecord::KnotMember(l) => Some(Record::KnotMember(upgrade_knot_member(l))),
230 }
231}
232
233async fn upgrade_issue(
234 l: LegacyIssue<DefaultStr>,
235 resolver: &RepoIdResolver,
236) -> Option<Issue<DefaultStr>> {
237 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await?;
238 Some(Issue {
239 created_at: l.created_at,
240 body: l.body,
241 mentions: l.mentions,
242 references: l.references,
243 repo,
244 title: l.title,
245 extra_data: l.extra_data,
246 })
247}
248
249async fn upgrade_target(
250 l: LegacyTarget<DefaultStr>,
251 resolver: &RepoIdResolver,
252) -> Option<Target<DefaultStr>> {
253 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await?;
254 Some(Target {
255 branch: l.branch,
256 repo,
257 extra_data: None,
258 })
259}
260
261async fn upgrade_source(
262 l: LegacySource<DefaultStr>,
263 resolver: &RepoIdResolver,
264) -> Source<DefaultStr> {
265 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await;
266 Source {
267 branch: l.branch,
268 repo,
269 extra_data: None,
270 }
271}
272
273async fn upgrade_pull(
274 l: LegacyPull<DefaultStr>,
275 resolver: &RepoIdResolver,
276) -> Option<Pull<DefaultStr>> {
277 let target = upgrade_target(l.target, resolver).await?;
278 let source = match l.source {
279 Some(s) => Some(upgrade_source(s, resolver).await),
280 None => None,
281 };
282 let rounds = if l.rounds.is_empty() {
283 l.patch_blob
284 .map(|patch_blob| {
285 alloc::vec![Round {
286 created_at: l.created_at.clone(),
287 patch_blob,
288 extra_data: None,
289 }]
290 })
291 .unwrap_or_default()
292 } else {
293 l.rounds
294 };
295 Some(Pull {
296 created_at: l.created_at,
297 body: l.body,
298 dependent_on: l.dependent_on,
299 mentions: l.mentions,
300 references: l.references,
301 rounds,
302 source,
303 target,
304 title: l.title,
305 extra_data: l.extra_data,
306 })
307}
308
309fn upgrade_public_key(l: LegacyPublicKey<DefaultStr>) -> PublicKey<DefaultStr> {
310 PublicKey {
311 created_at: l.created,
312 key: l.key,
313 name: l.name,
314 extra_data: l.extra_data,
315 }
316}
317
318fn upgrade_repo(l: LegacyRepo<DefaultStr>) -> Repo<DefaultStr> {
319 let _ = l.owner;
320 Repo {
321 created_at: l.added_at,
322 description: l.description,
323 knot: l.knot,
324 labels: None,
325 name: l.name,
326 repo_did: None,
327 source: None,
328 spindle: None,
329 topics: None,
330 website: None,
331 extra_data: l.extra_data,
332 }
333}
334
335fn upgrade_knot_member(l: LegacyKnotMember<DefaultStr>) -> KnotMember<DefaultStr> {
336 KnotMember {
337 created_at: l.added_at,
338 domain: l.domain,
339 subject: l.member,
340 extra_data: l.extra_data,
341 }
342}
343
344async fn upgrade_collaborator(
345 l: LegacyCollaborator<DefaultStr>,
346 resolver: &RepoIdResolver,
347) -> Option<Collaborator<DefaultStr>> {
348 let repo = upgrade_repo_did(resolver, l.repo, l.repo_did).await?;
349 Some(Collaborator {
350 created_at: l.created_at,
351 repo,
352 subject: l.subject,
353 extra_data: l.extra_data,
354 })
355}
356
357fn upgrade_ref_update(l: LegacyRefUpdate<DefaultStr>) -> RefUpdate<DefaultStr> {
358 RefUpdate {
359 committer_did: l.committer_did,
360 meta: l.meta,
361 new_sha: l.new_sha,
362 old_sha: l.old_sha,
363 owner_did: l.owner_did,
364 r#ref: l.r#ref,
365 repo: l.repo_did,
366 extra_data: l.extra_data,
367 }
368}
369
370async fn upgrade_star(
371 l: LegacyStar<DefaultStr>,
372 resolver: &RepoIdResolver,
373) -> Option<Star<DefaultStr>> {
374 let subject = if let Some(did) = l.subject_did {
375 StarSubject::Repo(alloc::boxed::Box::new(StarRepo {
376 did,
377 extra_data: None,
378 }))
379 } else {
380 let uri = l.subject?;
381 let resolved = if is_repo_at_uri(&uri) {
382 cached_repo_did(resolver, &uri).await
383 } else {
384 None
385 };
386 match resolved {
387 Some(did) => StarSubject::Repo(alloc::boxed::Box::new(StarRepo {
388 did,
389 extra_data: None,
390 })),
391 None => StarSubject::String(alloc::boxed::Box::new(StarString {
392 uri,
393 extra_data: None,
394 })),
395 }
396 };
397 Some(Star {
398 created_at: l.created_at,
399 subject,
400 extra_data: l.extra_data,
401 })
402}
403
404async fn cached_repo_did(
405 resolver: &RepoIdResolver,
406 uri: &jacquard_common::types::string::AtUri<DefaultStr>,
407) -> Option<Did<DefaultStr>> {
408 let owner = match uri.authority() {
409 AtIdentifier::Did(d) => d.clone().into_static(),
410 AtIdentifier::Handle(_) => return None,
411 };
412 let rkey: Rkey<DefaultStr> = uri.rkey()?.clone().into_static();
413 match resolver.cached_resolution(&owner, &rkey).await? {
414 Resolution::Mapped(did) => Some(did),
415 Resolution::NoRepoDid | Resolution::Unresolvable => None,
416 }
417}
418
419extern crate alloc;
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::RepoIdResolver;
425 use bobbin_runtime::RuntimeHasher;
426 use bobbin_types::edges::Record;
427 use jacquard_common::DefaultStr;
428 use jacquard_common::types::did::Did;
429 use jacquard_common::types::recordkey::Rkey;
430
431 fn did(s: &str) -> Did<DefaultStr> {
432 Did::new_owned(s).unwrap()
433 }
434
435 fn rkey(s: &str) -> Rkey<DefaultStr> {
436 Rkey::new_owned(s).unwrap()
437 }
438
439 fn nsid(s: &'static str) -> Nsid<DefaultStr> {
440 Nsid::new_static(s).unwrap()
441 }
442
443 #[test]
444 fn legacy_decode_routes_through_try_decode_for_known_nsids() {
445 let json = br#"{"$type":"sh.tangled.repo.issue","repoDid":"did:plc:squid","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#;
446 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.issue"), json)
447 .expect("legacy issue must decode");
448 assert!(matches!(
449 decoded,
450 DecodedRecord::Legacy(LegacyRecord::Issue(_))
451 ));
452 }
453
454 #[test]
455 fn canon_decode_wins_when_wire_matches_new_shape() {
456 let json = br#"{"$type":"sh.tangled.repo.issue","repo":"did:plc:squid","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#;
457 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.issue"), json)
458 .expect("canon issue must decode");
459 match decoded {
460 DecodedRecord::Canon(Record::Issue(i)) => {
461 assert_eq!(i.repo.as_ref(), "did:plc:squid")
462 }
463 other => panic!("expected canon issue, got {other:?}"),
464 }
465 }
466
467 #[test]
468 fn scrub_returns_none_for_unknown_nsid() {
469 let json = br#"{"$type":"sh.tangled.repo.issue","preferredHandle":""}"#;
470 assert!(scrub_record_bytes(&nsid("sh.tangled.repo.issue"), json).is_none());
471 }
472
473 #[test]
474 fn scrub_returns_none_when_target_field_is_non_empty() {
475 let json =
476 br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":"nel.pet"}"#;
477 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json).is_none());
478 }
479
480 #[test]
481 fn scrub_returns_none_when_target_field_is_absent() {
482 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true}"#;
483 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json).is_none());
484 }
485
486 #[test]
487 fn scrub_returns_none_for_non_string_value() {
488 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":42}"#;
489 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json).is_none());
490 }
491
492 #[test]
493 fn scrub_returns_none_for_non_object_json() {
494 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"[]").is_none());
495 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"null").is_none());
496 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"123").is_none());
497 }
498
499 #[test]
500 fn scrub_returns_none_for_invalid_json() {
501 assert!(scrub_record_bytes(&nsid("sh.tangled.actor.profile"), b"{not json").is_none());
502 }
503
504 #[test]
505 fn scrub_drops_empty_preferred_handle_and_preserves_other_fields() {
506 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":"","description":"hi"}"#;
507 let scrubbed = scrub_record_bytes(&nsid("sh.tangled.actor.profile"), json)
508 .expect("empty preferredHandle must trigger scrub");
509 let value: serde_json::Value = serde_json::from_slice(&scrubbed).expect("valid json");
510 let obj = value.as_object().expect("object");
511 assert!(!obj.contains_key("preferredHandle"));
512 assert_eq!(obj.get("bluesky"), Some(&serde_json::json!(true)));
513 assert_eq!(obj.get("description"), Some(&serde_json::json!("hi")));
514 assert_eq!(
515 obj.get("$type"),
516 Some(&serde_json::json!("sh.tangled.actor.profile"))
517 );
518 }
519
520 #[test]
521 fn scrub_replaces_null_arrays_with_empty_for_label_op() {
522 let json = br#"{"$type":"sh.tangled.label.op","add":[{"key":"at://did:plc:limpet/sh.tangled.label.definition/k","value":"v"}],"delete":null,"performedAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:limpet/sh.tangled.repo.issue/3aaa"}"#;
523 let scrubbed = scrub_record_bytes(&nsid("sh.tangled.label.op"), json)
524 .expect("null delete must trigger scrub");
525 let value: serde_json::Value = serde_json::from_slice(&scrubbed).expect("valid json");
526 let obj = value.as_object().expect("object");
527 assert_eq!(obj.get("delete"), Some(&serde_json::json!([])));
528 assert!(obj.get("add").is_some_and(|v| v.is_array()));
529 }
530
531 #[test]
532 fn scrub_passes_through_when_label_op_arrays_are_non_null() {
533 let json = br#"{"$type":"sh.tangled.label.op","add":[],"delete":[],"performedAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:limpet/sh.tangled.repo.issue/3aaa"}"#;
534 assert!(scrub_record_bytes(&nsid("sh.tangled.label.op"), json).is_none());
535 }
536
537 #[test]
538 fn try_decode_recovers_label_op_with_null_delete() {
539 let json = br#"{"$type":"sh.tangled.label.op","add":[{"key":"at://did:plc:limpet/sh.tangled.label.definition/k","value":"v"}],"delete":null,"performedAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:limpet/sh.tangled.repo.issue/3aaa"}"#;
540 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.label.op"), json)
541 .expect("label.op with null delete must scrub-recover");
542 assert!(matches!(decoded, DecodedRecord::Canon(Record::LabelOp(_))));
543 }
544
545 #[test]
546 fn try_decode_recovers_profile_with_empty_preferred_handle() {
547 let json = br#"{"$type":"sh.tangled.actor.profile","bluesky":true,"preferredHandle":"","description":"hi"}"#;
548 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.actor.profile"), json)
549 .expect("profile with empty preferredHandle must scrub-recover");
550 match decoded {
551 DecodedRecord::Canon(Record::Profile(p)) => {
552 assert!(p.preferred_handle.is_none());
553 assert_eq!(p.description.as_deref(), Some("hi"));
554 }
555 other => panic!("expected canon profile, got {other:?}"),
556 }
557 }
558
559 #[test]
560 fn legacy_decode_passes_through_for_unaffected_nsids() {
561 let json = br#"{"$type":"sh.tangled.graph.follow","subject":"did:plc:bailey","createdAt":"2026-05-01T00:00:00Z"}"#;
562 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.graph.follow"), json)
563 .expect("follow has no legacy form, must decode canon");
564 assert!(matches!(decoded, DecodedRecord::Canon(Record::Follow(_))));
565 }
566
567 #[test]
568 fn normalize_returns_none_for_invalid_json() {
569 assert!(normalize_record_fields(b"{not json").is_none());
570 }
571
572 #[test]
573 fn normalize_drops_repeated_dollar_type_key() {
574 let json =
575 br#"{"$type":"sh.tangled.repo.pull","title":"t","$type":"sh.tangled.repo.pull"}"#;
576 let normalized =
577 normalize_record_fields(json).expect("duplicate key must produce normalized output");
578 let value: serde_json::Value = serde_json::from_slice(&normalized).expect("valid json");
579 let obj = value.as_object().expect("object");
580 assert_eq!(obj.len(), 2);
581 assert_eq!(
582 obj.get("$type"),
583 Some(&serde_json::json!("sh.tangled.repo.pull"))
584 );
585 assert_eq!(obj.get("title"), Some(&serde_json::json!("t")));
586 }
587
588 #[test]
589 fn normalize_keeps_last_value_for_repeated_keys() {
590 let json = br#"{"$type":"sh.tangled.repo.issue","$type":"sh.tangled.repo.pull"}"#;
591 let normalized =
592 normalize_record_fields(json).expect("duplicate key must produce normalized output");
593 let value: serde_json::Value = serde_json::from_slice(&normalized).expect("valid json");
594 assert_eq!(
595 value.get("$type"),
596 Some(&serde_json::json!("sh.tangled.repo.pull")),
597 );
598 }
599
600 #[test]
601 fn synthesize_fills_empty_created_at_with_fallback() {
602 let json = br#"{"$type":"sh.tangled.repo.issue","title":"meow","createdAt":""}"#;
603 let patched = synthesize_created_at(json, "2026-05-01T00:00:00.000000Z")
604 .expect("empty createdAt must be filled");
605 let value: serde_json::Value = serde_json::from_slice(&patched).expect("valid json");
606 assert_eq!(
607 value.get("createdAt"),
608 Some(&serde_json::json!("2026-05-01T00:00:00.000000Z")),
609 );
610 }
611
612 #[test]
613 fn synthesize_fills_missing_created_at_with_fallback() {
614 let json = br#"{"$type":"sh.tangled.repo.issue","title":"meow"}"#;
615 let patched = synthesize_created_at(json, "2026-05-01T00:00:00.000000Z")
616 .expect("missing createdAt must be filled");
617 let value: serde_json::Value = serde_json::from_slice(&patched).expect("valid json");
618 assert_eq!(
619 value.get("createdAt"),
620 Some(&serde_json::json!("2026-05-01T00:00:00.000000Z")),
621 );
622 }
623
624 #[test]
625 fn synthesize_returns_none_when_created_at_already_set() {
626 let json = br#"{"$type":"sh.tangled.repo.issue","createdAt":"2026-05-01T00:00:00Z"}"#;
627 assert!(synthesize_created_at(json, "2026-04-01T00:00:00Z").is_none());
628 }
629
630 #[test]
631 fn synthesize_returns_none_for_non_string_created_at() {
632 let json = br#"{"$type":"sh.tangled.repo.issue","createdAt":null}"#;
633 assert!(synthesize_created_at(json, "2026-04-01T00:00:00Z").is_none());
634 }
635
636 #[tokio::test]
637 async fn try_decode_recovers_legacy_issue_after_synthesized_created_at() {
638 let json = br#"{"$type":"sh.tangled.repo.issue","body":"a bug","createdAt":"","repo":"at://did:plc:scallop/sh.tangled.repo/limpet","repoDid":"did:plc:scallop","title":"a bug"}"#;
639 let patched = synthesize_created_at(json, "2025-08-01T12:00:00.000000Z")
640 .expect("empty createdAt must be filled");
641 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.issue"), &patched)
642 .expect("issue must legacy-decode after createdAt fill");
643 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
644 let canon = match decoded {
645 DecodedRecord::Canon(r) => r,
646 DecodedRecord::Legacy(l) => upgrade(l, &resolver).await.expect("upgrade"),
647 };
648 match canon {
649 Record::Issue(i) => {
650 assert_eq!(AsRef::<str>::as_ref(&i.title), "a bug");
651 assert_eq!(i.repo.as_ref(), "did:plc:scallop");
652 }
653 other => panic!("expected issue, got {other:?}"),
654 }
655 }
656
657 #[tokio::test]
658 async fn try_decode_recovers_canon_pull_with_duplicate_dollar_type() {
659 let json = br#"{"$type":"sh.tangled.repo.pull","createdAt":"2026-05-01T00:00:00Z","title":"meow","target":{"branch":"main","repo":"at://did:plc:scallop/sh.tangled.repo/limpet"},"rounds":[],"$type":"sh.tangled.repo.pull"}"#;
660 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo.pull"), json)
661 .expect("duplicate $type pull must normalize-recover");
662 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
663 resolver
664 .observe(
665 did("did:plc:scallop"),
666 rkey("limpet"),
667 Some(did("did:plc:scallop")),
668 )
669 .await;
670 let canon = match decoded {
671 DecodedRecord::Canon(r) => r,
672 DecodedRecord::Legacy(l) => upgrade(l, &resolver).await.expect("upgrade"),
673 };
674 match canon {
675 Record::Pull(p) => assert_eq!(AsRef::<str>::as_ref(&p.title), "meow"),
676 other => panic!("expected pull, got {other:?}"),
677 }
678 }
679
680 #[tokio::test]
681 async fn upgrade_issue_uses_repo_did_directly() {
682 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
683 let json = br#"{"$type":"sh.tangled.repo.issue","repoDid":"did:plc:scallop","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#;
684 let legacy =
685 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.issue"), json).expect("decode");
686 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
687 match canon {
688 Record::Issue(i) => assert_eq!(i.repo, did("did:plc:scallop")),
689 other => panic!("expected canon issue, got {other:?}"),
690 }
691 }
692
693 #[tokio::test]
694 async fn upgrade_issue_resolves_repo_uri_via_observed_resolver() {
695 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
696 let owner = did("did:plc:nel");
697 let key = rkey("abcabcabcabcz");
698 resolver
699 .observe(owner.clone(), key.clone(), Some(did("did:plc:scallop")))
700 .await;
701 let json = br#"{"$type":"sh.tangled.repo.issue","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#;
702 let legacy =
703 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.issue"), json).expect("decode");
704 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
705 match canon {
706 Record::Issue(i) => assert_eq!(i.repo, did("did:plc:scallop")),
707 other => panic!("expected canon issue, got {other:?}"),
708 }
709 }
710
711 #[tokio::test]
712 async fn upgrade_issue_drops_when_resolver_cannot_map() {
713 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
714 let json = br#"{"$type":"sh.tangled.repo.issue","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz","title":"t","createdAt":"2026-05-01T00:00:00Z"}"#;
715 let legacy =
716 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.issue"), json).expect("decode");
717 assert!(
718 upgrade(legacy, &resolver).await.is_none(),
719 "no resolver entry and no repoDid means the canon Did cannot be constructed",
720 );
721 }
722
723 #[tokio::test]
724 async fn upgrade_pull_propagates_target_resolution() {
725 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
726 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","rounds":[],"target":{"branch":"main","repoDid":"did:plc:scallop"}}"#;
727 let legacy =
728 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode");
729 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
730 match canon {
731 Record::Pull(p) => {
732 assert_eq!(p.target.repo, did("did:plc:scallop"));
733 assert!(p.source.is_none());
734 }
735 other => panic!("expected canon pull, got {other:?}"),
736 }
737 }
738
739 #[tokio::test]
740 async fn upgrade_pull_pre_rounds_synthesizes_round_from_top_level_patch_blob() {
741 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
742 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","target":{"branch":"main","repoDid":"did:plc:scallop"},"patchBlob":{"$type":"blob","mimeType":"application/gzip","ref":{"$link":"bafkreibpatvbeajtwzlr4jwr4s2hnwo5l7sgdbfnqu6n7ctd2bcbtluw4a"},"size":920}}"#;
743 let legacy =
744 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode");
745 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
746 match canon {
747 Record::Pull(p) => {
748 assert_eq!(
749 p.rounds.len(),
750 1,
751 "pre-rounds wire must yield exactly one synthesized round"
752 );
753 assert_eq!(
754 p.rounds[0].patch_blob.blob().mime_type.as_ref(),
755 "application/gzip"
756 );
757 assert_eq!(p.rounds[0].created_at, p.created_at);
758 }
759 other => panic!("expected canon pull, got {other:?}"),
760 }
761 }
762
763 #[tokio::test]
764 async fn upgrade_pull_omits_round_when_neither_rounds_nor_patch_blob_present() {
765 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
766 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","target":{"branch":"main","repoDid":"did:plc:scallop"}}"#;
767 let legacy =
768 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode");
769 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
770 match canon {
771 Record::Pull(p) => assert!(p.rounds.is_empty()),
772 other => panic!("expected canon pull, got {other:?}"),
773 }
774 }
775
776 #[tokio::test]
777 async fn upgrade_public_key_renames_created_to_created_at() {
778 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
779 let json = br#"{"$type":"sh.tangled.publicKey","created":"2025-04-15T18:35:38Z","key":"ssh-ed25519 AAAA","name":"laptop"}"#;
780 let legacy =
781 LegacyRecord::from_json_bytes(&nsid("sh.tangled.publicKey"), json).expect("decode");
782 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
783 match canon {
784 Record::PublicKey(k) => {
785 assert_eq!(k.created_at.as_str(), "2025-04-15T18:35:38Z");
786 assert_eq!(k.key.as_str(), "ssh-ed25519 AAAA");
787 assert_eq!(k.name.as_str(), "laptop");
788 }
789 other => panic!("expected canon publicKey, got {other:?}"),
790 }
791 }
792
793 #[tokio::test]
794 async fn upgrade_repo_renames_added_at_and_drops_owner() {
795 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
796 let json = br#"{"$type":"sh.tangled.repo","addedAt":"2025-03-21T10:18:58Z","description":"hi","knot":"knot1.tangled.sh","name":"site","owner":"did:plc:nel"}"#;
797 let legacy = LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo"), json).expect("decode");
798 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
799 match canon {
800 Record::Repo(r) => {
801 assert_eq!(r.created_at.as_str(), "2025-03-21T10:18:58Z");
802 assert_eq!(r.description.as_deref(), Some("hi"));
803 assert_eq!(r.knot.as_str(), "knot1.tangled.sh");
804 assert_eq!(r.name.as_deref(), Some("site"));
805 assert!(r.repo_did.is_none(), "legacy repos have no repo_did");
806 }
807 other => panic!("expected canon repo, got {other:?}"),
808 }
809 }
810
811 #[tokio::test]
812 async fn upgrade_knot_member_renames_added_at_and_member() {
813 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
814 let json = br#"{"$type":"sh.tangled.knot.member","addedAt":"2025-03-31T05:14:09Z","domain":"knot.example","member":"did:plc:nel"}"#;
815 let legacy =
816 LegacyRecord::from_json_bytes(&nsid("sh.tangled.knot.member"), json).expect("decode");
817 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
818 match canon {
819 Record::KnotMember(m) => {
820 assert_eq!(m.created_at.as_str(), "2025-03-31T05:14:09Z");
821 assert_eq!(m.domain.as_str(), "knot.example");
822 assert_eq!(m.subject, did("did:plc:nel"));
823 }
824 other => panic!("expected canon knot.member, got {other:?}"),
825 }
826 }
827
828 #[tokio::test]
829 async fn legacy_pull_target_with_empty_repo_did_treats_as_none() {
830 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
831 resolver
832 .observe(
833 did("did:plc:nel"),
834 rkey("abcabcabcabcz"),
835 Some(did("did:plc:scallop")),
836 )
837 .await;
838 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","rounds":[],"target":{"branch":"main","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz","repoDid":""}}"#;
839 let legacy =
840 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode");
841 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
842 match canon {
843 Record::Pull(p) => assert_eq!(p.target.repo, did("did:plc:scallop")),
844 other => panic!("expected canon pull, got {other:?}"),
845 }
846 }
847
848 #[tokio::test]
849 async fn try_decode_recovers_publickey_with_created_field() {
850 let json = br#"{"$type":"sh.tangled.publicKey","created":"2025-04-15T18:35:38Z","key":"k","name":"n"}"#;
851 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.publicKey"), json)
852 .expect("legacy publicKey must decode");
853 assert!(matches!(
854 decoded,
855 DecodedRecord::Legacy(LegacyRecord::PublicKey(_))
856 ));
857 }
858
859 #[tokio::test]
860 async fn try_decode_recovers_repo_with_added_at_field() {
861 let json = br#"{"$type":"sh.tangled.repo","addedAt":"2025-03-21T10:18:58Z","knot":"knot1.tangled.sh","owner":"did:plc:nel"}"#;
862 let decoded = DecodedRecord::try_decode(&nsid("sh.tangled.repo"), json)
863 .expect("legacy repo must decode");
864 assert!(matches!(
865 decoded,
866 DecodedRecord::Legacy(LegacyRecord::Repo(_))
867 ));
868 }
869
870 #[tokio::test]
871 async fn upgrade_pull_source_repo_resolution_is_independent_of_target() {
872 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
873 let json = br#"{"$type":"sh.tangled.repo.pull","title":"t","createdAt":"2026-05-01T00:00:00Z","rounds":[],"target":{"branch":"main","repoDid":"did:plc:scallop"},"source":{"branch":"feat","repo":"at://did:plc:nel/sh.tangled.repo/missing"}}"#;
874 let legacy =
875 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.pull"), json).expect("decode");
876 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
877 match canon {
878 Record::Pull(p) => {
879 assert_eq!(p.target.repo, did("did:plc:scallop"));
880 let source = p.source.expect("source struct retained");
881 assert_eq!(source.branch.as_str(), "feat");
882 assert!(
883 source.repo.is_none(),
884 "unresolvable source repo at-uri leaves the source.repo None rather than dropping the whole pull",
885 );
886 }
887 other => panic!("expected canon pull, got {other:?}"),
888 }
889 }
890
891 #[tokio::test]
892 async fn upgrade_ref_update_renames_repo_did_to_repo() {
893 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
894 let json = br#"{"$type":"sh.tangled.git.refUpdate","ref":"refs/heads/main","committerDid":"did:plc:olaren","repoDid":"did:plc:scallop","oldSha":"0000000000000000000000000000000000000000","newSha":"1111111111111111111111111111111111111111","meta":{"isDefaultRef":true,"commitCount":{}}}"#;
895 let legacy =
896 LegacyRecord::from_json_bytes(&nsid("sh.tangled.git.refUpdate"), json).expect("decode");
897 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
898 match canon {
899 Record::RefUpdate(r) => assert_eq!(r.repo, did("did:plc:scallop")),
900 other => panic!("expected canon ref update, got {other:?}"),
901 }
902 }
903
904 #[tokio::test]
905 async fn upgrade_star_prefers_subject_did_over_subject_uri() {
906 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
907 let json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:nel/sh.tangled.string/k1","subjectDid":"did:plc:scallop"}"#;
908 let legacy =
909 LegacyRecord::from_json_bytes(&nsid("sh.tangled.feed.star"), json).expect("decode");
910 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
911 match canon {
912 Record::Star(s) => match s.subject {
913 StarSubject::Repo(r) => assert_eq!(r.did, did("did:plc:scallop")),
914 StarSubject::String(_) => panic!("subjectDid must win"),
915 },
916 other => panic!("expected canon star, got {other:?}"),
917 }
918 }
919
920 #[tokio::test]
921 async fn upgrade_star_falls_back_to_string_when_repo_uri_not_in_cache() {
922 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
923 let json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"}"#;
924 let legacy =
925 LegacyRecord::from_json_bytes(&nsid("sh.tangled.feed.star"), json).expect("decode");
926 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
927 match canon {
928 Record::Star(s) => match s.subject {
929 StarSubject::String(s) => assert_eq!(
930 s.uri.as_ref(),
931 "at://did:plc:nel/sh.tangled.repo/abcabcabcabcz",
932 "cache-miss on repo uri preserves the uri under the #string variant for later normalization",
933 ),
934 StarSubject::Repo(_) => panic!("cold cache must not upgrade to Repo variant"),
935 },
936 other => panic!("expected canon star, got {other:?}"),
937 }
938 }
939
940 #[tokio::test]
941 async fn upgrade_star_uses_cached_repo_did_when_observed() {
942 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
943 let owner = did("did:plc:nel");
944 let key = rkey("abcabcabcabcz");
945 resolver
946 .observe(owner.clone(), key.clone(), Some(did("did:plc:scallop")))
947 .await;
948 let json = br#"{"$type":"sh.tangled.feed.star","createdAt":"2026-05-01T00:00:00Z","subject":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"}"#;
949 let legacy =
950 LegacyRecord::from_json_bytes(&nsid("sh.tangled.feed.star"), json).expect("decode");
951 let canon = upgrade(legacy, &resolver).await.expect("upgrade");
952 match canon {
953 Record::Star(s) => match s.subject {
954 StarSubject::Repo(r) => assert_eq!(r.did, did("did:plc:scallop")),
955 StarSubject::String(_) => panic!("observed cache must upgrade to Repo variant"),
956 },
957 other => panic!("expected canon star, got {other:?}"),
958 }
959 }
960
961 #[tokio::test]
962 async fn upgrade_collaborator_requires_repo_did() {
963 let resolver = RepoIdResolver::detached(RuntimeHasher::default());
964 let with_did = br#"{"$type":"sh.tangled.repo.collaborator","createdAt":"2026-05-01T00:00:00Z","subject":"did:plc:lyna","repoDid":"did:plc:scallop"}"#;
965 let canon = upgrade(
966 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.collaborator"), with_did)
967 .expect("decode"),
968 &resolver,
969 )
970 .await
971 .expect("upgrade");
972 match canon {
973 Record::Collaborator(c) => assert_eq!(c.repo, did("did:plc:scallop")),
974 other => panic!("expected canon collaborator, got {other:?}"),
975 }
976
977 let no_resolution = br#"{"$type":"sh.tangled.repo.collaborator","createdAt":"2026-05-01T00:00:00Z","subject":"did:plc:lyna","repo":"at://did:plc:nel/sh.tangled.repo/abcabcabcabcz"}"#;
978 let legacy =
979 LegacyRecord::from_json_bytes(&nsid("sh.tangled.repo.collaborator"), no_resolution)
980 .expect("decode");
981 assert!(upgrade(legacy, &resolver).await.is_none());
982 }
983}