Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/mentions" 31 "tangled.org/core/appview/models" 32 "tangled.org/core/appview/notify" 33 "tangled.org/core/appview/repoverify" 34 "tangled.org/core/appview/serververify" 35 "tangled.org/core/appview/validator" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/orm" 38 "tangled.org/core/rbac" 39) 40 41type Ingester struct { 42 Ctx context.Context 43 Db *db.DB 44 Enforcer *rbac.Enforcer 45 IdResolver *idresolver.Resolver 46 Cache *cache.Cache 47 Config *config.Config 48 Logger *slog.Logger 49 Validator *validator.Validator 50 MentionsResolver *mentions.Resolver 51 Notifier notify.Notifier 52 Verifier repoverify.Verifier 53} 54 55type processFunc func(ctx context.Context, e *jmodels.Event) error 56 57func (i *Ingester) Ingest() processFunc { 58 return func(ctx context.Context, e *jmodels.Event) error { 59 var err error 60 61 l := i.Logger.With("kind", e.Kind) 62 switch e.Kind { 63 case jmodels.EventKindAccount: 64 // TODO: sync account state to db 65 if e.Account.Active { 66 break 67 } 68 // TODO: revoke sessions by DID 69 if *e.Account.Status == "deactivated" { 70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 71 } 72 case jmodels.EventKindIdentity: 73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 74 case jmodels.EventKindCommit: 75 l = l.With( 76 "nsid", e.Commit.Collection, 77 "did", e.Did, 78 "rkey", e.Commit.RKey, 79 "op", e.Commit.Operation, 80 ) 81 switch e.Commit.Collection { 82 case tangled.GraphFollowNSID: 83 err = i.ingestFollow(e, l) 84 case tangled.GraphVouchNSID: 85 err = i.ingestVouch(ctx, e, l) 86 case tangled.FeedStarNSID: 87 err = i.ingestStar(ctx, e, l) 88 case tangled.FeedReactionNSID: 89 err = i.ingestReaction(e, l) 90 case tangled.PublicKeyNSID: 91 err = i.ingestPublicKey(e, l) 92 case tangled.RepoArtifactNSID: 93 err = i.ingestArtifact(ctx, e, l) 94 case tangled.ActorProfileNSID: 95 err = i.ingestProfile(ctx, e, l) 96 case tangled.SpindleMemberNSID: 97 err = i.ingestSpindleMember(ctx, e, l) 98 case tangled.SpindleNSID: 99 err = i.ingestSpindle(ctx, e, l) 100 case tangled.KnotMemberNSID: 101 err = i.ingestKnotMember(ctx, e, l) 102 case tangled.KnotNSID: 103 err = i.ingestKnot(ctx, e, l) 104 case tangled.StringNSID: 105 err = i.ingestString(e, l) 106 case tangled.RepoIssueNSID: 107 err = i.ingestIssue(ctx, e, l) 108 case tangled.RepoPullNSID: 109 err = i.ingestPull(ctx, e, l) 110 case tangled.FeedCommentNSID: 111 err = i.ingestComment(e, l) 112 case tangled.RepoIssueCommentNSID: 113 err = i.ingestIssueComment(e, l) 114 case tangled.RepoPullCommentNSID: 115 err = i.ingestPullComment(e, l) 116 case tangled.LabelDefinitionNSID: 117 err = i.ingestLabelDefinition(e, l) 118 case tangled.LabelOpNSID: 119 err = i.ingestLabelOp(e, l) 120 case tangled.RepoNSID: 121 err = i.ingestRepo(ctx, e, l) 122 } 123 } 124 125 if err != nil { 126 l.Warn("failed to ingest record, skipping", "err", err) 127 } 128 129 lastTimeUs := e.TimeUS + 1 130 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 131 l.Error("failed to save cursor", "err", saveErr) 132 } 133 134 return nil 135 } 136} 137 138func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 139 if strings.HasPrefix(ref, "did:") { 140 return db.GetRepoByDid(i.Db, ref) 141 } 142 return db.GetRepoByAtUri(i.Db, ref) 143} 144 145func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 146 var legacy struct { 147 Subject *string `json:"subject"` 148 SubjectDid *string `json:"subjectDid"` 149 } 150 if err := json.Unmarshal(raw, &legacy); err != nil { 151 return false, err 152 } 153 154 switch { 155 case legacy.SubjectDid != nil: 156 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 157 if err != nil { 158 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 159 return false, nil 160 } 161 star.SubjectType = models.StarSubjectRepo 162 star.Subject = repo.RepoDid 163 return true, nil 164 165 case legacy.Subject != nil: 166 uri, err := syntax.ParseATURI(*legacy.Subject) 167 if err != nil { 168 return false, fmt.Errorf("invalid old-format star subject: %w", err) 169 } 170 switch uri.Collection().String() { 171 case tangled.RepoNSID: 172 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 173 if err != nil { 174 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 175 return false, nil 176 } 177 star.SubjectType = models.StarSubjectRepo 178 star.Subject = repo.RepoDid 179 return true, nil 180 default: 181 star.SubjectType = models.StarSubjectString 182 star.Subject = *legacy.Subject 183 return true, nil 184 } 185 186 default: 187 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 188 } 189} 190 191func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 192 var err error 193 did := e.Did 194 195 l = l.With("handler", "ingestStar") 196 197 switch e.Commit.Operation { 198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 199 raw := json.RawMessage(e.Commit.Record) 200 record := tangled.FeedStar{} 201 unmarshalErr := json.Unmarshal(raw, &record) 202 203 star := &models.Star{ 204 Did: did, 205 Rkey: e.Commit.RKey, 206 } 207 208 switch { 209 case unmarshalErr != nil: 210 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 211 if resolveErr != nil { 212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 213 return unmarshalErr 214 } 215 if !resolved { 216 return nil 217 } 218 219 case record.Subject == nil: 220 return fmt.Errorf("star record has nil subject") 221 222 case record.Subject.FeedStar_Repo != nil: 223 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 224 if repoErr != nil { 225 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 226 return nil 227 } 228 star.SubjectType = models.StarSubjectRepo 229 star.Subject = repo.RepoDid 230 231 case record.Subject.FeedStar_String != nil: 232 star.SubjectType = models.StarSubjectString 233 star.Subject = record.Subject.FeedStar_String.Uri 234 235 default: 236 return fmt.Errorf("star record has empty subject union") 237 } 238 239 err = db.AddStar(i.Db, star) 240 case jmodels.CommitOperationDelete: 241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 242 } 243 244 if err != nil { 245 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 246 } 247 248 l.Info("ingested record") 249 return nil 250} 251 252func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error { 253 var err error 254 did := e.Did 255 256 l = l.With("handler", "ingestFollow") 257 258 switch e.Commit.Operation { 259 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 260 raw := json.RawMessage(e.Commit.Record) 261 record := tangled.GraphFollow{} 262 err = json.Unmarshal(raw, &record) 263 if err != nil { 264 l.Error("invalid record", "err", err) 265 return err 266 } 267 268 err = db.AddFollow(i.Db, &models.Follow{ 269 UserDid: did, 270 SubjectDid: record.Subject, 271 Rkey: e.Commit.RKey, 272 }) 273 case jmodels.CommitOperationDelete: 274 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 275 } 276 277 if err != nil { 278 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 279 } 280 281 l.Info("ingested record") 282 return nil 283} 284 285func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 286 var err error 287 did := e.Did 288 289 l = l.With("handler", "ingestVouch") 290 291 switch e.Commit.Operation { 292 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 293 raw := json.RawMessage(e.Commit.Record) 294 record := tangled.GraphVouch{} 295 err = json.Unmarshal(raw, &record) 296 if err != nil { 297 l.Error("invalid record", "err", err) 298 return err 299 } 300 301 // rkey is the subject_did being vouched for/denounced 302 subjectDID := e.Commit.RKey 303 304 _, err = syntax.ParseDID(subjectDID) 305 if err != nil { 306 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 307 return fmt.Errorf("invalid subject_did: %w", err) 308 } 309 310 if did == subjectDID { 311 l.Warn("attempted self-vouch", "did", did) 312 return fmt.Errorf("cannot vouch for self") 313 } 314 315 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 316 if err != nil { 317 return err 318 } 319 320 if subjectId.Handle.IsInvalidHandle() { 321 return err 322 } 323 324 kind, err := models.ParseVouchKind(record.Kind) 325 if err != nil { 326 l.Error("invalid kind", "kind", kind) 327 return fmt.Errorf("invalid kind: %s", kind) 328 } 329 330 recordCid, err := cid.Parse(e.Commit.CID) 331 if err != nil { 332 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 333 return fmt.Errorf("invalid cid: %w", err) 334 } 335 336 var evidences []syntax.ATURI 337 for _, raw := range record.Evidences { 338 uri, parseErr := syntax.ParseATURI(raw) 339 if parseErr != nil { 340 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 341 continue 342 } 343 evidences = append(evidences, uri) 344 } 345 346 tx, txErr := i.Db.Begin() 347 if txErr != nil { 348 return fmt.Errorf("failed to start transaction: %w", txErr) 349 } 350 351 addErr := db.AddVouch(tx, &models.Vouch{ 352 Did: syntax.DID(did), 353 SubjectDid: subjectId.DID, 354 Cid: recordCid, 355 Kind: kind, 356 Reason: record.Reason, 357 Evidences: evidences, 358 }) 359 if addErr != nil { 360 tx.Rollback() 361 err = addErr 362 } else { 363 err = tx.Commit() 364 } 365 366 case jmodels.CommitOperationDelete: 367 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 368 } 369 370 if err != nil { 371 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 372 } 373 374 l.Info("ingested record") 375 return nil 376} 377 378func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error { 379 did := e.Did 380 var err error 381 382 l = l.With("handler", "ingestPublicKey") 383 384 switch e.Commit.Operation { 385 case jmodels.CommitOperationCreate: 386 l.Debug("processing add of pubkey") 387 raw := json.RawMessage(e.Commit.Record) 388 record := tangled.PublicKey{} 389 err = json.Unmarshal(raw, &record) 390 if err != nil { 391 l.Error("invalid record", "err", err) 392 return err 393 } 394 395 name := record.Name 396 key := record.Key 397 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 398 case jmodels.CommitOperationUpdate: 399 l.Debug("processing update of pubkey") 400 raw := json.RawMessage(e.Commit.Record) 401 record := tangled.PublicKey{} 402 err = json.Unmarshal(raw, &record) 403 if err != nil { 404 l.Error("invalid record", "err", err) 405 return err 406 } 407 408 name := record.Name 409 key := record.Key 410 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 411 case jmodels.CommitOperationDelete: 412 l.Debug("processing delete of pubkey") 413 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 414 } 415 416 if err != nil { 417 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 418 } 419 420 l.Info("ingested record") 421 return nil 422} 423 424func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 425 did := e.Did 426 var err error 427 428 l = l.With("handler", "ingestArtifact") 429 430 switch e.Commit.Operation { 431 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 432 raw := json.RawMessage(e.Commit.Record) 433 record := tangled.RepoArtifact{} 434 err = json.Unmarshal(raw, &record) 435 if err != nil { 436 l.Error("invalid record", "err", err) 437 return err 438 } 439 440 var repo *models.Repo 441 if record.RepoDid != nil && *record.RepoDid != "" { 442 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 443 if err != nil && !errors.Is(err, sql.ErrNoRows) { 444 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 445 } 446 } 447 if repo == nil && record.Repo != nil { 448 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 449 if parseErr != nil { 450 return parseErr 451 } 452 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 453 if err != nil { 454 return err 455 } 456 } 457 if repo == nil { 458 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 459 } 460 461 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 462 if err != nil || !ok { 463 return err 464 } 465 466 repoDid := repo.RepoDid 467 if repoDid == "" && record.RepoDid != nil { 468 repoDid = *record.RepoDid 469 } 470 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 471 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 472 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 473 } 474 } 475 476 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 477 if err != nil { 478 createdAt = time.Now() 479 } 480 481 artifact := models.Artifact{ 482 Did: did, 483 Rkey: e.Commit.RKey, 484 RepoDid: syntax.DID(repo.RepoDid), 485 Tag: plumbing.Hash(record.Tag), 486 CreatedAt: createdAt, 487 BlobCid: cid.Cid(record.Artifact.Ref), 488 Name: record.Name, 489 Size: uint64(record.Artifact.Size), 490 MimeType: record.Artifact.MimeType, 491 } 492 493 err = db.AddArtifact(i.Db, artifact) 494 case jmodels.CommitOperationDelete: 495 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 496 } 497 498 if err != nil { 499 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 500 } 501 502 l.Info("ingested record") 503 return nil 504} 505 506func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 507 did := e.Did 508 var err error 509 510 l = l.With("handler", "ingestProfile") 511 512 if e.Commit.RKey != "self" { 513 return fmt.Errorf("ingestProfile only ingests `self` record") 514 } 515 516 switch e.Commit.Operation { 517 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 518 raw := json.RawMessage(e.Commit.Record) 519 record := tangled.ActorProfile{} 520 err = json.Unmarshal(raw, &record) 521 if err != nil { 522 l.Error("invalid record", "err", err) 523 return err 524 } 525 526 avatar := "" 527 if record.Avatar != nil { 528 avatar = record.Avatar.Ref.String() 529 } 530 531 description := "" 532 if record.Description != nil { 533 description = *record.Description 534 } 535 536 includeBluesky := record.Bluesky 537 538 pronouns := "" 539 if record.Pronouns != nil { 540 pronouns = *record.Pronouns 541 } 542 543 location := "" 544 if record.Location != nil { 545 location = *record.Location 546 } 547 548 var links [5]string 549 for i, l := range record.Links { 550 if i < 5 { 551 links[i] = l 552 } 553 } 554 555 var stats [2]models.VanityStat 556 for i, s := range record.Stats { 557 if i < 2 { 558 stats[i].Kind = models.ParseVanityStatKind(s) 559 } 560 } 561 562 var pinned [6]string 563 for i, r := range record.PinnedRepositories { 564 if i < 6 { 565 pinned[i] = r 566 } 567 } 568 569 var preferredHandle syntax.Handle 570 if record.PreferredHandle != nil { 571 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 572 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 573 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 574 preferredHandle = h 575 } 576 } 577 } 578 579 profile := models.Profile{ 580 Did: did, 581 Avatar: avatar, 582 Description: description, 583 IncludeBluesky: includeBluesky, 584 Location: location, 585 Links: links, 586 Stats: stats, 587 PinnedRepos: pinned, 588 Pronouns: pronouns, 589 PreferredHandle: preferredHandle, 590 } 591 592 tx, err := i.Db.Begin() 593 if err != nil { 594 return fmt.Errorf("failed to start transaction: %w", err) 595 } 596 597 err = db.ValidateProfile(tx, &profile) 598 if err != nil { 599 return fmt.Errorf("invalid profile record") 600 } 601 602 err = db.UpsertProfile(tx, &profile) 603 if err == nil && i.Cache != nil { 604 pipe := i.Cache.Pipeline() 605 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 606 if preferredHandle != "" { 607 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 608 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 609 } else { 610 pipe.Del(ctx, didKey) 611 } 612 if _, execErr := pipe.Exec(ctx); execErr != nil { 613 l.Warn("failed to update preferred handle cache", "err", execErr) 614 } 615 } 616 case jmodels.CommitOperationDelete: 617 tx, beginErr := i.Db.Begin() 618 if beginErr != nil { 619 return fmt.Errorf("failed to start transaction: %w", beginErr) 620 } 621 622 priorHandle, phErr := db.GetPreferredHandle(tx, did) 623 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 624 l.Warn("failed to read prior preferred handle", "err", phErr) 625 } 626 627 err = db.DeleteProfile(tx, did) 628 if err == nil && i.Cache != nil { 629 pipe := i.Cache.Pipeline() 630 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 631 if priorHandle != "" { 632 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 633 } 634 if _, execErr := pipe.Exec(ctx); execErr != nil { 635 l.Warn("failed to evict preferred handle cache", "err", execErr) 636 } 637 } 638 } 639 640 if err != nil { 641 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 642 } 643 644 l.Info("ingested record") 645 return nil 646} 647 648func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 649 did := e.Did 650 var err error 651 652 l = l.With("handler", "ingestSpindleMember") 653 654 switch e.Commit.Operation { 655 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 656 raw := json.RawMessage(e.Commit.Record) 657 record := tangled.SpindleMember{} 658 err = json.Unmarshal(raw, &record) 659 if err != nil { 660 l.Error("invalid record", "err", err) 661 return err 662 } 663 664 // only spindle owner can invite to spindles 665 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 666 if err != nil { 667 return fmt.Errorf("failed to check invite permission: %w", err) 668 } 669 if !ok { 670 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 671 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 672 } 673 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 674 if err != nil { 675 return fmt.Errorf("failed to re-check invite permission: %w", err) 676 } 677 if !ok { 678 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 679 } 680 } 681 682 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 683 if err != nil { 684 return err 685 } 686 687 if memberId.Handle.IsInvalidHandle() { 688 return fmt.Errorf("invalid handle for member %s", record.Subject) 689 } 690 691 existing, err := db.GetSpindleMembers(i.Db, 692 orm.FilterEq("did", did), 693 orm.FilterEq("rkey", e.Commit.RKey), 694 ) 695 if err != nil { 696 return fmt.Errorf("failed to look up existing member: %w", err) 697 } 698 if len(existing) > 1 { 699 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 700 } 701 702 tx, err := i.Db.Begin() 703 if err != nil { 704 return fmt.Errorf("failed to start txn: %w", err) 705 } 706 committed := false 707 defer func() { 708 if committed { 709 return 710 } 711 tx.Rollback() 712 i.Enforcer.E.LoadPolicy() 713 }() 714 715 if len(existing) == 1 { 716 prev := existing[0] 717 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 718 if err = db.RemoveSpindleMember(tx, 719 orm.FilterEq("did", did), 720 orm.FilterEq("rkey", e.Commit.RKey), 721 ); err != nil { 722 return fmt.Errorf("failed to remove stale row: %w", err) 723 } 724 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 725 return fmt.Errorf("failed to remove stale ACL: %w", err) 726 } 727 } 728 } 729 730 if err = db.AddSpindleMember(tx, models.SpindleMember{ 731 Did: syntax.DID(did), 732 Rkey: e.Commit.RKey, 733 Instance: record.Instance, 734 Subject: memberId.DID, 735 }); err != nil { 736 return fmt.Errorf("failed to add to db: %w", err) 737 } 738 739 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 740 return fmt.Errorf("failed to update ACLs: %w", err) 741 } 742 743 if err = tx.Commit(); err != nil { 744 return fmt.Errorf("failed to commit txn: %w", err) 745 } 746 747 if err = i.Enforcer.E.SavePolicy(); err != nil { 748 return fmt.Errorf("failed to save ACLs: %w", err) 749 } 750 committed = true 751 752 l.Info("upserted spindle member") 753 case jmodels.CommitOperationDelete: 754 rkey := e.Commit.RKey 755 756 // get record from db first 757 members, err := db.GetSpindleMembers( 758 i.Db, 759 orm.FilterEq("did", did), 760 orm.FilterEq("rkey", rkey), 761 ) 762 if err != nil || len(members) != 1 { 763 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 764 } 765 member := members[0] 766 767 tx, err := i.Db.Begin() 768 if err != nil { 769 return fmt.Errorf("failed to start txn: %w", err) 770 } 771 committed := false 772 defer func() { 773 if committed { 774 return 775 } 776 tx.Rollback() 777 i.Enforcer.E.LoadPolicy() 778 }() 779 780 // remove record by rkey && update enforcer 781 if err = db.RemoveSpindleMember( 782 tx, 783 orm.FilterEq("did", did), 784 orm.FilterEq("rkey", rkey), 785 ); err != nil { 786 return fmt.Errorf("failed to remove from db: %w", err) 787 } 788 789 // update enforcer 790 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 791 if err != nil { 792 return fmt.Errorf("failed to update ACLs: %w", err) 793 } 794 795 if err = tx.Commit(); err != nil { 796 return fmt.Errorf("failed to commit txn: %w", err) 797 } 798 799 if err = i.Enforcer.E.SavePolicy(); err != nil { 800 return fmt.Errorf("failed to save ACLs: %w", err) 801 } 802 committed = true 803 804 l.Info("removed spindle member") 805 } 806 807 return nil 808} 809 810func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 811 did := e.Did 812 var err error 813 814 l = l.With("handler", "ingestSpindle") 815 816 switch e.Commit.Operation { 817 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 818 raw := json.RawMessage(e.Commit.Record) 819 record := tangled.Spindle{} 820 err = json.Unmarshal(raw, &record) 821 if err != nil { 822 l.Error("invalid record", "err", err) 823 return err 824 } 825 826 instance := e.Commit.RKey 827 828 err := db.AddSpindle(i.Db, models.Spindle{ 829 Owner: syntax.DID(did), 830 Instance: instance, 831 }) 832 if err != nil { 833 l.Error("failed to add spindle to db", "err", err, "instance", instance) 834 return err 835 } 836 837 if err := i.verifySpindle(ctx, instance, did); err != nil { 838 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 839 } 840 841 l.Info("ingested record", "instance", instance) 842 return nil 843 844 case jmodels.CommitOperationDelete: 845 instance := e.Commit.RKey 846 847 // get record from db first 848 spindles, err := db.GetSpindles( 849 ctx, 850 i.Db, 851 orm.FilterEq("owner", did), 852 orm.FilterEq("instance", instance), 853 ) 854 if err != nil || len(spindles) != 1 { 855 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 856 } 857 spindle := spindles[0] 858 859 tx, err := i.Db.Begin() 860 if err != nil { 861 return fmt.Errorf("failed to start txn: %w", err) 862 } 863 defer func() { 864 tx.Rollback() 865 i.Enforcer.E.LoadPolicy() 866 }() 867 868 // remove spindle members first 869 err = db.RemoveSpindleMember( 870 tx, 871 orm.FilterEq("owner", did), 872 orm.FilterEq("instance", instance), 873 ) 874 if err != nil { 875 return fmt.Errorf("failed to remove spindle members: %w", err) 876 } 877 878 err = db.DeleteSpindle( 879 tx, 880 orm.FilterEq("owner", did), 881 orm.FilterEq("instance", instance), 882 ) 883 if err != nil { 884 return fmt.Errorf("failed to delete spindle: %w", err) 885 } 886 887 if spindle.Verified != nil { 888 err = i.Enforcer.RemoveSpindle(instance) 889 if err != nil { 890 return fmt.Errorf("failed to remove spindle from enforcer: %w", err) 891 } 892 } 893 894 err = tx.Commit() 895 if err != nil { 896 return fmt.Errorf("failed to commit txn: %w", err) 897 } 898 899 err = i.Enforcer.E.SavePolicy() 900 if err != nil { 901 return fmt.Errorf("failed to save ACLs: %w", err) 902 } 903 904 l.Info("ingested record", "instance", instance) 905 } 906 907 return nil 908} 909 910func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error { 911 did := e.Did 912 rkey := e.Commit.RKey 913 914 var err error 915 916 l = l.With("handler", "ingestString") 917 918 switch e.Commit.Operation { 919 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 920 raw := json.RawMessage(e.Commit.Record) 921 record := tangled.String{} 922 err = json.Unmarshal(raw, &record) 923 if err != nil { 924 l.Error("invalid record", "err", err) 925 return err 926 } 927 928 string := models.StringFromRecord(did, rkey, record) 929 930 if err = i.Validator.ValidateString(&string); err != nil { 931 l.Error("invalid record", "err", err) 932 return err 933 } 934 935 if err = db.AddString(i.Db, string); err != nil { 936 l.Error("failed to add string", "err", err) 937 return err 938 } 939 940 l.Info("ingested record") 941 return nil 942 943 case jmodels.CommitOperationDelete: 944 if err := db.DeleteString( 945 i.Db, 946 orm.FilterEq("did", did), 947 orm.FilterEq("rkey", rkey), 948 ); err != nil { 949 l.Error("failed to delete", "err", err) 950 return fmt.Errorf("failed to delete string record: %w", err) 951 } 952 953 l.Info("ingested record") 954 return nil 955 } 956 957 return nil 958} 959 960func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 961 did := e.Did 962 var err error 963 964 l = l.With("handler", "ingestKnotMember") 965 966 switch e.Commit.Operation { 967 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 968 raw := json.RawMessage(e.Commit.Record) 969 record := tangled.KnotMember{} 970 err = json.Unmarshal(raw, &record) 971 if err != nil { 972 l.Error("invalid record", "err", err) 973 return err 974 } 975 976 // only knot owner can invite to knots 977 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 978 if err != nil { 979 return fmt.Errorf("failed to check invite permission: %w", err) 980 } 981 if !ok { 982 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 983 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 984 } 985 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 986 if err != nil { 987 return fmt.Errorf("failed to re-check invite permission: %w", err) 988 } 989 if !ok { 990 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 991 } 992 } 993 994 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 995 if err != nil { 996 return err 997 } 998 999 if memberId.Handle.IsInvalidHandle() { 1000 return fmt.Errorf("invalid handle for member %s", record.Subject) 1001 } 1002 1003 existing, err := db.GetKnotMembers(i.Db, 1004 orm.FilterEq("did", did), 1005 orm.FilterEq("rkey", e.Commit.RKey), 1006 ) 1007 if err != nil { 1008 return fmt.Errorf("failed to look up existing member: %w", err) 1009 } 1010 if len(existing) > 1 { 1011 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1012 } 1013 1014 tx, err := i.Db.Begin() 1015 if err != nil { 1016 return fmt.Errorf("failed to start txn: %w", err) 1017 } 1018 committed := false 1019 defer func() { 1020 if committed { 1021 return 1022 } 1023 tx.Rollback() 1024 i.Enforcer.E.LoadPolicy() 1025 }() 1026 1027 if len(existing) == 1 { 1028 prev := existing[0] 1029 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1030 if err = db.RemoveKnotMember(tx, 1031 orm.FilterEq("did", did), 1032 orm.FilterEq("rkey", e.Commit.RKey), 1033 ); err != nil { 1034 return fmt.Errorf("failed to remove stale row: %w", err) 1035 } 1036 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1037 return fmt.Errorf("failed to remove stale ACL: %w", err) 1038 } 1039 } 1040 } 1041 1042 if err = db.AddKnotMember(tx, models.KnotMember{ 1043 Did: syntax.DID(did), 1044 Rkey: e.Commit.RKey, 1045 Domain: record.Domain, 1046 Subject: memberId.DID, 1047 }); err != nil { 1048 return fmt.Errorf("failed to add to db: %w", err) 1049 } 1050 1051 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1052 return fmt.Errorf("failed to update ACLs: %w", err) 1053 } 1054 1055 if err = tx.Commit(); err != nil { 1056 return fmt.Errorf("failed to commit txn: %w", err) 1057 } 1058 1059 if err = i.Enforcer.E.SavePolicy(); err != nil { 1060 return fmt.Errorf("failed to save ACLs: %w", err) 1061 } 1062 committed = true 1063 1064 l.Info("upserted knot member") 1065 case jmodels.CommitOperationDelete: 1066 rkey := e.Commit.RKey 1067 1068 members, err := db.GetKnotMembers( 1069 i.Db, 1070 orm.FilterEq("did", did), 1071 orm.FilterEq("rkey", rkey), 1072 ) 1073 if err != nil { 1074 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1075 } 1076 if len(members) == 0 { 1077 l.Info("knot member already removed", "rkey", rkey) 1078 return nil 1079 } 1080 if len(members) > 1 { 1081 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1082 } 1083 member := members[0] 1084 1085 tx, err := i.Db.Begin() 1086 if err != nil { 1087 return fmt.Errorf("failed to start txn: %w", err) 1088 } 1089 committed := false 1090 defer func() { 1091 if committed { 1092 return 1093 } 1094 tx.Rollback() 1095 i.Enforcer.E.LoadPolicy() 1096 }() 1097 1098 if err = db.RemoveKnotMember( 1099 tx, 1100 orm.FilterEq("did", did), 1101 orm.FilterEq("rkey", rkey), 1102 ); err != nil { 1103 return fmt.Errorf("failed to remove from db: %w", err) 1104 } 1105 1106 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1107 return fmt.Errorf("failed to update ACLs: %w", err) 1108 } 1109 1110 if err = tx.Commit(); err != nil { 1111 return fmt.Errorf("failed to commit txn: %w", err) 1112 } 1113 1114 if err = i.Enforcer.E.SavePolicy(); err != nil { 1115 return fmt.Errorf("failed to save ACLs: %w", err) 1116 } 1117 committed = true 1118 1119 l.Info("removed knot member") 1120 } 1121 1122 return nil 1123} 1124 1125func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1126 did := e.Did 1127 var err error 1128 1129 l = l.With("handler", "ingestKnot") 1130 1131 switch e.Commit.Operation { 1132 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1133 raw := json.RawMessage(e.Commit.Record) 1134 record := tangled.Knot{} 1135 err = json.Unmarshal(raw, &record) 1136 if err != nil { 1137 l.Error("invalid record", "err", err) 1138 return err 1139 } 1140 1141 domain := e.Commit.RKey 1142 1143 err := db.AddKnot(i.Db, domain, did) 1144 if err != nil { 1145 l.Error("failed to add knot to db", "err", err, "domain", domain) 1146 return err 1147 } 1148 1149 if err := i.verifyKnot(ctx, domain, did); err != nil { 1150 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1151 } 1152 1153 l.Info("ingested record", "domain", domain) 1154 return nil 1155 1156 case jmodels.CommitOperationDelete: 1157 domain := e.Commit.RKey 1158 1159 // get record from db first 1160 registrations, err := db.GetRegistrations( 1161 i.Db, 1162 orm.FilterEq("domain", domain), 1163 orm.FilterEq("did", did), 1164 ) 1165 if err != nil { 1166 return fmt.Errorf("failed to get registration: %w", err) 1167 } 1168 if len(registrations) != 1 { 1169 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1170 } 1171 registration := registrations[0] 1172 1173 tx, err := i.Db.Begin() 1174 if err != nil { 1175 return fmt.Errorf("failed to start txn: %w", err) 1176 } 1177 defer func() { 1178 tx.Rollback() 1179 i.Enforcer.E.LoadPolicy() 1180 }() 1181 1182 err = db.RemoveKnotMember( 1183 tx, 1184 orm.FilterEq("did", did), 1185 orm.FilterEq("domain", domain), 1186 ) 1187 if err != nil { 1188 return fmt.Errorf("failed to remove knot members: %w", err) 1189 } 1190 1191 err = db.DeleteKnot( 1192 tx, 1193 orm.FilterEq("did", did), 1194 orm.FilterEq("domain", domain), 1195 ) 1196 if err != nil { 1197 return fmt.Errorf("failed to delete knot: %w", err) 1198 } 1199 1200 err = db.RemoveReposByKnot(tx, domain) 1201 if err != nil { 1202 return fmt.Errorf("failed to remove repos by knot: %w", err) 1203 } 1204 1205 if registration.Registered != nil { 1206 err = i.Enforcer.RemoveKnot(domain) 1207 if err != nil { 1208 return fmt.Errorf("failed to remove knot from enforcer: %w", err) 1209 } 1210 } 1211 1212 err = tx.Commit() 1213 if err != nil { 1214 return fmt.Errorf("failed to commit txn: %w", err) 1215 } 1216 1217 err = i.Enforcer.E.SavePolicy() 1218 if err != nil { 1219 return fmt.Errorf("failed to save ACLs: %w", err) 1220 } 1221 1222 l.Info("ingested record", "domain", domain) 1223 } 1224 1225 return nil 1226} 1227 1228const ( 1229 verifyAttempts = 4 1230 verifyMinDelay = 1 * time.Second 1231 verifyMaxDelay = 5 * time.Second 1232) 1233 1234func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1235 regs, err := db.GetRegistrations(i.Db, 1236 orm.FilterEq("domain", domain), 1237 orm.FilterEq("did", did), 1238 ) 1239 if err != nil { 1240 return fmt.Errorf("look up registration: %w", err) 1241 } 1242 if len(regs) != 1 { 1243 return fmt.Errorf("no registration for %s by %s", domain, did) 1244 } 1245 if regs[0].Registered != nil { 1246 return nil 1247 } 1248 1249 err = retry.Do( 1250 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1251 retry.Context(ctx), 1252 retry.Attempts(verifyAttempts), 1253 retry.Delay(verifyMinDelay), 1254 retry.MaxDelay(verifyMaxDelay), 1255 retry.DelayType(retry.BackOffDelay), 1256 retry.LastErrorOnly(true), 1257 ) 1258 if err != nil { 1259 return fmt.Errorf("verify: %w", err) 1260 } 1261 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1262} 1263 1264func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1265 spindles, err := db.GetSpindles(ctx, i.Db, 1266 orm.FilterEq("instance", instance), 1267 orm.FilterEq("owner", did), 1268 ) 1269 if err != nil { 1270 return fmt.Errorf("look up spindle: %w", err) 1271 } 1272 if len(spindles) != 1 { 1273 return fmt.Errorf("no spindle for %s by %s", instance, did) 1274 } 1275 if spindles[0].Verified != nil { 1276 return nil 1277 } 1278 1279 err = retry.Do( 1280 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1281 retry.Context(ctx), 1282 retry.Attempts(verifyAttempts), 1283 retry.Delay(verifyMinDelay), 1284 retry.MaxDelay(verifyMaxDelay), 1285 retry.DelayType(retry.BackOffDelay), 1286 retry.LastErrorOnly(true), 1287 ) 1288 if err != nil { 1289 return fmt.Errorf("verify: %w", err) 1290 } 1291 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1292 return err 1293} 1294 1295const sweepConcurrency = 4 1296 1297func (i *Ingester) SweepPendingVerifications() { 1298 l := i.Logger.With("handler", "SweepPendingVerifications") 1299 1300 var g errgroup.Group 1301 g.SetLimit(sweepConcurrency) 1302 1303 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1304 if err != nil { 1305 l.Error("failed to list unverified knots", "err", err) 1306 } else { 1307 for _, reg := range regs { 1308 g.Go(func() error { 1309 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1310 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1311 } 1312 return nil 1313 }) 1314 } 1315 } 1316 1317 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1318 if err != nil { 1319 l.Error("failed to list unverified spindles", "err", err) 1320 g.Wait() 1321 return 1322 } 1323 for _, s := range spindles { 1324 g.Go(func() error { 1325 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1326 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1327 } 1328 return nil 1329 }) 1330 } 1331 g.Wait() 1332} 1333 1334func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1335 did := e.Did 1336 rkey := e.Commit.RKey 1337 1338 var err error 1339 1340 l = l.With("handler", "ingestIssue") 1341 1342 switch e.Commit.Operation { 1343 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1344 raw := json.RawMessage(e.Commit.Record) 1345 record := tangled.RepoIssue{} 1346 err = json.Unmarshal(raw, &record) 1347 if err != nil { 1348 l.Error("invalid record", "err", err) 1349 return err 1350 } 1351 1352 issue := models.IssueFromRecord(did, rkey, record) 1353 1354 if issue.RepoDid == "" { 1355 return fmt.Errorf("issue record has no repo field") 1356 } 1357 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1358 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1359 } 1360 1361 if err := i.Validator.ValidateIssue(&issue); err != nil { 1362 return fmt.Errorf("failed to validate issue: %w", err) 1363 } 1364 1365 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1366 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1367 if repoErr == nil && repo.RepoDid != "" { 1368 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1369 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1370 } 1371 } 1372 } 1373 1374 tx, err := i.Db.BeginTx(ctx, nil) 1375 if err != nil { 1376 l.Error("failed to begin transaction", "err", err) 1377 return err 1378 } 1379 defer tx.Rollback() 1380 1381 err = db.PutIssue(tx, &issue) 1382 if err != nil { 1383 l.Error("failed to create issue", "err", err) 1384 return err 1385 } 1386 1387 err = tx.Commit() 1388 if err != nil { 1389 l.Error("failed to commit txn", "err", err) 1390 return err 1391 } 1392 1393 l.Info("ingested record") 1394 return nil 1395 1396 case jmodels.CommitOperationDelete: 1397 tx, err := i.Db.BeginTx(ctx, nil) 1398 if err != nil { 1399 l.Error("failed to begin transaction", "err", err) 1400 return err 1401 } 1402 defer tx.Rollback() 1403 1404 if err := db.DeleteIssues( 1405 tx, 1406 did, 1407 rkey, 1408 ); err != nil { 1409 l.Error("failed to delete", "err", err) 1410 return fmt.Errorf("failed to delete issue record: %w", err) 1411 } 1412 if err := tx.Commit(); err != nil { 1413 l.Error("failed to commit txn", "err", err) 1414 return err 1415 } 1416 1417 l.Info("ingested record") 1418 return nil 1419 } 1420 1421 return nil 1422} 1423 1424func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1425 did := e.Did 1426 rkey := e.Commit.RKey 1427 1428 var err error 1429 1430 l = l.With("handler", "ingestPull") 1431 1432 switch e.Commit.Operation { 1433 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1434 raw := json.RawMessage(e.Commit.Record) 1435 record := tangled.RepoPull{} 1436 err = json.Unmarshal(raw, &record) 1437 if err != nil { 1438 l.Error("invalid record", "err", err) 1439 return err 1440 } 1441 1442 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1443 if err != nil { 1444 l.Error("failed to resolve did", "err", err) 1445 return err 1446 } 1447 1448 // go through and fetch all blobs in parallel 1449 readers := make([]*io.ReadCloser, len(record.Rounds)) 1450 var mu sync.Mutex 1451 1452 g, gctx := errgroup.WithContext(ctx) 1453 1454 for idx, b := range record.Rounds { 1455 g.Go(func() error { 1456 // for some reason, a blob is empty 1457 if b.PatchBlob == nil { 1458 return fmt.Errorf("missing patchBlob in round %d", idx) 1459 } 1460 1461 ownerPds := ownerId.PDSEndpoint() 1462 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1463 q := url.Query() 1464 q.Set("cid", b.PatchBlob.Ref.String()) 1465 q.Set("did", did) 1466 url.RawQuery = q.Encode() 1467 1468 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1469 if err != nil { 1470 l.Error("failed to create request") 1471 return err 1472 } 1473 req.Header.Set("Content-Type", "application/json") 1474 1475 resp, err := http.DefaultClient.Do(req) 1476 if err != nil { 1477 l.Error("failed to make request") 1478 return err 1479 } 1480 1481 mu.Lock() 1482 readers[idx] = &resp.Body 1483 mu.Unlock() 1484 1485 return nil 1486 }) 1487 } 1488 1489 if err := g.Wait(); err != nil { 1490 for _, r := range readers { 1491 if r != nil && *r != nil { 1492 (*r).Close() 1493 } 1494 } 1495 return err 1496 } 1497 1498 defer func() { 1499 for _, r := range readers { 1500 if r != nil && *r != nil { 1501 (*r).Close() 1502 } 1503 } 1504 }() 1505 1506 pull, err := models.PullFromRecord(did, rkey, record, readers) 1507 if err != nil { 1508 return fmt.Errorf("failed to parse pull from record: %w", err) 1509 } 1510 if err := i.Validator.ValidatePull(pull); err != nil { 1511 return fmt.Errorf("failed to validate pull: %w", err) 1512 } 1513 1514 tx, err := i.Db.BeginTx(ctx, nil) 1515 if err != nil { 1516 l.Error("failed to begin transaction", "err", err) 1517 return err 1518 } 1519 defer tx.Rollback() 1520 1521 err = db.PutPull(tx, pull) 1522 if err != nil { 1523 l.Error("failed to create pull", "err", err) 1524 return err 1525 } 1526 1527 err = tx.Commit() 1528 if err != nil { 1529 l.Error("failed to commit txn", "err", err) 1530 return err 1531 } 1532 1533 l.Info("ingested record") 1534 return nil 1535 1536 case jmodels.CommitOperationDelete: 1537 tx, err := i.Db.BeginTx(ctx, nil) 1538 if err != nil { 1539 l.Error("failed to begin transaction", "err", err) 1540 return err 1541 } 1542 defer tx.Rollback() 1543 1544 if err := db.AbandonPulls( 1545 tx, 1546 orm.FilterEq("owner_did", did), 1547 orm.FilterEq("rkey", rkey), 1548 ); err != nil { 1549 l.Error("failed to abandon", "err", err) 1550 return fmt.Errorf("failed to abandon pull record: %w", err) 1551 } 1552 if err := tx.Commit(); err != nil { 1553 l.Error("failed to commit txn", "err", err) 1554 return err 1555 } 1556 1557 l.Info("ingested record") 1558 return nil 1559 } 1560 1561 return nil 1562} 1563 1564// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1565func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error { 1566 l = l.With("handler", "ingestIssueComment") 1567 1568 switch e.Commit.Operation { 1569 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1570 // no-op. sh.tangled.repo.issue.comment is deprecated 1571 1572 case jmodels.CommitOperationDelete: 1573 if err := db.PurgeComments( 1574 i.Db, 1575 orm.FilterEq("did", e.Did), 1576 orm.FilterEq("collection", e.Commit.Collection), 1577 orm.FilterEq("rkey", e.Commit.RKey), 1578 ); err != nil { 1579 return fmt.Errorf("failed to delete comment record: %w", err) 1580 } 1581 } 1582 1583 l.Info("ingested record") 1584 return nil 1585} 1586 1587// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1588func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error { 1589 l = l.With("handler", "ingestPullComment") 1590 1591 switch e.Commit.Operation { 1592 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1593 // no-op. sh.tangled.repo.pull.comment is deprecated 1594 1595 case jmodels.CommitOperationDelete: 1596 if err := db.PurgeComments( 1597 i.Db, 1598 orm.FilterEq("did", e.Did), 1599 orm.FilterEq("collection", e.Commit.Collection), 1600 orm.FilterEq("rkey", e.Commit.RKey), 1601 ); err != nil { 1602 return fmt.Errorf("failed to delete comment record: %w", err) 1603 } 1604 } 1605 1606 l.Info("ingested record") 1607 return nil 1608} 1609 1610func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error { 1611 did := e.Did 1612 rkey := e.Commit.RKey 1613 cid := e.Commit.CID 1614 1615 var err error 1616 1617 l = l.With("handler", "ingestComment") 1618 1619 ctx := context.Background() 1620 1621 switch e.Commit.Operation { 1622 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1623 raw := json.RawMessage(e.Commit.Record) 1624 record := tangled.FeedComment{} 1625 err = json.Unmarshal(raw, &record) 1626 if err != nil { 1627 return fmt.Errorf("invalid record: %w", err) 1628 } 1629 1630 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1631 if err != nil { 1632 return fmt.Errorf("failed to parse comment from record: %w", err) 1633 } 1634 1635 if err := comment.Validate(); err != nil { 1636 return fmt.Errorf("failed to validate comment: %w", err) 1637 } 1638 1639 var references []syntax.ATURI 1640 if comment.Body.Original != nil { 1641 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1642 } 1643 1644 tx, err := i.Db.Begin() 1645 if err != nil { 1646 return fmt.Errorf("failed to start transaction: %w", err) 1647 } 1648 defer tx.Rollback() 1649 1650 _, err = db.PutComment(tx, comment, references) 1651 if err != nil { 1652 return fmt.Errorf("failed to create comment: %w", err) 1653 } 1654 1655 if err := tx.Commit(); err != nil { 1656 return err 1657 } 1658 1659 case jmodels.CommitOperationDelete: 1660 if err := db.DeleteComments( 1661 i.Db, 1662 orm.FilterEq("did", did), 1663 orm.FilterEq("collection", e.Commit.Collection), 1664 orm.FilterEq("rkey", rkey), 1665 ); err != nil { 1666 return fmt.Errorf("failed to delete comment record: %w", err) 1667 } 1668 } 1669 1670 l.Info("ingested record") 1671 return nil 1672} 1673 1674func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error { 1675 did := e.Did 1676 rkey := e.Commit.RKey 1677 1678 l = l.With("handler", "ingestReaction") 1679 1680 switch e.Commit.Operation { 1681 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1682 raw := json.RawMessage(e.Commit.Record) 1683 record := tangled.FeedReaction{} 1684 if err := json.Unmarshal(raw, &record); err != nil { 1685 return fmt.Errorf("invalid record: %w", err) 1686 } 1687 1688 subjectUri, err := syntax.ParseATURI(record.Subject) 1689 if err != nil { 1690 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1691 } 1692 subjectUri = models.NormalizeReactionSubject(subjectUri) 1693 1694 kind, ok := models.ParseReactionKind(record.Reaction) 1695 if !ok { 1696 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1697 } 1698 1699 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1700 if parseErr != nil { 1701 created = time.Now() 1702 } 1703 1704 tx, err := i.Db.Begin() 1705 if err != nil { 1706 return fmt.Errorf("failed to start transaction: %w", err) 1707 } 1708 defer tx.Rollback() 1709 1710 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1711 return fmt.Errorf("failed to clear existing reaction: %w", err) 1712 } 1713 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1714 return fmt.Errorf("failed to add reaction: %w", err) 1715 } 1716 1717 if err := tx.Commit(); err != nil { 1718 return err 1719 } 1720 1721 case jmodels.CommitOperationDelete: 1722 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1723 return fmt.Errorf("failed to delete reaction record: %w", err) 1724 } 1725 } 1726 1727 l.Info("ingested record") 1728 return nil 1729} 1730 1731func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error { 1732 did := e.Did 1733 rkey := e.Commit.RKey 1734 1735 var err error 1736 1737 l = l.With("handler", "ingestLabelDefinition") 1738 1739 switch e.Commit.Operation { 1740 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1741 raw := json.RawMessage(e.Commit.Record) 1742 record := tangled.LabelDefinition{} 1743 err = json.Unmarshal(raw, &record) 1744 if err != nil { 1745 return fmt.Errorf("invalid record: %w", err) 1746 } 1747 1748 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1749 if err != nil { 1750 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1751 } 1752 1753 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1754 return fmt.Errorf("failed to validate labeldef: %w", err) 1755 } 1756 1757 _, err = db.AddLabelDefinition(i.Db, def) 1758 if err != nil { 1759 return fmt.Errorf("failed to create labeldef: %w", err) 1760 } 1761 1762 l.Info("ingested record") 1763 return nil 1764 1765 case jmodels.CommitOperationDelete: 1766 if err := db.DeleteLabelDefinition( 1767 i.Db, 1768 orm.FilterEq("did", did), 1769 orm.FilterEq("rkey", rkey), 1770 ); err != nil { 1771 return fmt.Errorf("failed to delete labeldef record: %w", err) 1772 } 1773 1774 l.Info("ingested record") 1775 return nil 1776 } 1777 1778 return nil 1779} 1780 1781func (i *Ingester) ingestLabelOp(e *jmodels.Event, l *slog.Logger) error { 1782 did := e.Did 1783 rkey := e.Commit.RKey 1784 1785 var err error 1786 1787 l = l.With("handler", "ingestLabelOp") 1788 1789 switch e.Commit.Operation { 1790 case jmodels.CommitOperationCreate: 1791 raw := json.RawMessage(e.Commit.Record) 1792 record := tangled.LabelOp{} 1793 err = json.Unmarshal(raw, &record) 1794 if err != nil { 1795 return fmt.Errorf("invalid record: %w", err) 1796 } 1797 1798 subject := syntax.ATURI(record.Subject) 1799 collection := subject.Collection() 1800 1801 var repo *models.Repo 1802 switch collection { 1803 case tangled.RepoIssueNSID: 1804 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1805 if err != nil || len(i) != 1 { 1806 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1807 } 1808 repo = i[0].Repo 1809 case tangled.RepoPullNSID: 1810 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1811 if err != nil || len(p) != 1 { 1812 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1813 } 1814 repo = p[0].Repo 1815 default: 1816 return fmt.Errorf("unsupported label subject: %s", collection) 1817 } 1818 1819 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1820 if err != nil { 1821 return fmt.Errorf("failed to build label application ctx: %w", err) 1822 } 1823 1824 ops := models.LabelOpsFromRecord(did, rkey, record) 1825 1826 for _, o := range ops { 1827 def, ok := actx.Defs[o.OperandKey] 1828 if !ok { 1829 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1830 } 1831 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1832 return fmt.Errorf("failed to validate labelop: %w", err) 1833 } 1834 } 1835 1836 tx, err := i.Db.Begin() 1837 if err != nil { 1838 return err 1839 } 1840 defer tx.Rollback() 1841 1842 for _, o := range ops { 1843 _, err = db.AddLabelOp(tx, &o) 1844 if err != nil { 1845 return fmt.Errorf("failed to add labelop: %w", err) 1846 } 1847 } 1848 1849 if err = tx.Commit(); err != nil { 1850 return err 1851 } 1852 1853 l.Info("ingested record") 1854 } 1855 1856 return nil 1857}