Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/mentions" 31 "tangled.org/core/appview/models" 32 "tangled.org/core/appview/notify" 33 "tangled.org/core/appview/repoverify" 34 "tangled.org/core/appview/serververify" 35 "tangled.org/core/appview/validator" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/orm" 38 "tangled.org/core/rbac" 39) 40 41type Ingester struct { 42 Ctx context.Context 43 Db *db.DB 44 Enforcer *rbac.Enforcer 45 IdResolver *idresolver.Resolver 46 Cache *cache.Cache 47 Config *config.Config 48 Logger *slog.Logger 49 Validator *validator.Validator 50 MentionsResolver *mentions.Resolver 51 Notifier notify.Notifier 52 Verifier repoverify.Verifier 53} 54 55type processFunc func(ctx context.Context, e *jmodels.Event) error 56 57func (i *Ingester) Ingest() processFunc { 58 return func(ctx context.Context, e *jmodels.Event) error { 59 var err error 60 61 l := i.Logger.With("kind", e.Kind) 62 switch e.Kind { 63 case jmodels.EventKindAccount: 64 // TODO: sync account state to db 65 if e.Account.Active { 66 break 67 } 68 // TODO: revoke sessions by DID 69 if *e.Account.Status == "deactivated" { 70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 71 } 72 case jmodels.EventKindIdentity: 73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 74 case jmodels.EventKindCommit: 75 switch e.Commit.Collection { 76 case tangled.GraphFollowNSID: 77 err = i.ingestFollow(e) 78 case tangled.GraphVouchNSID: 79 err = i.ingestVouch(ctx, e) 80 case tangled.FeedStarNSID: 81 err = i.ingestStar(ctx, e) 82 case tangled.FeedReactionNSID: 83 err = i.ingestReaction(e) 84 case tangled.PublicKeyNSID: 85 err = i.ingestPublicKey(e) 86 case tangled.RepoArtifactNSID: 87 err = i.ingestArtifact(ctx, e) 88 case tangled.ActorProfileNSID: 89 err = i.ingestProfile(ctx, e) 90 case tangled.SpindleMemberNSID: 91 err = i.ingestSpindleMember(ctx, e) 92 case tangled.SpindleNSID: 93 err = i.ingestSpindle(ctx, e) 94 case tangled.KnotMemberNSID: 95 err = i.ingestKnotMember(ctx, e) 96 case tangled.KnotNSID: 97 err = i.ingestKnot(ctx, e) 98 case tangled.StringNSID: 99 err = i.ingestString(e) 100 case tangled.RepoIssueNSID: 101 err = i.ingestIssue(ctx, e) 102 case tangled.RepoPullNSID: 103 err = i.ingestPull(ctx, e) 104 case tangled.FeedCommentNSID: 105 err = i.ingestComment(e) 106 case tangled.RepoIssueCommentNSID: 107 err = i.ingestIssueComment(e) 108 case tangled.RepoPullCommentNSID: 109 err = i.ingestPullComment(e) 110 case tangled.LabelDefinitionNSID: 111 err = i.ingestLabelDefinition(e) 112 case tangled.LabelOpNSID: 113 err = i.ingestLabelOp(e) 114 case tangled.RepoNSID: 115 err = i.ingestRepo(ctx, e) 116 } 117 l = i.Logger.With("nsid", e.Commit.Collection) 118 } 119 120 if err != nil { 121 l.Warn("failed to ingest record, skipping", "err", err) 122 } 123 124 lastTimeUs := e.TimeUS + 1 125 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 126 l.Error("failed to save cursor", "err", saveErr) 127 } 128 129 return nil 130 } 131} 132 133func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 134 if strings.HasPrefix(ref, "did:") { 135 return db.GetRepoByDid(i.Db, ref) 136 } 137 return db.GetRepoByAtUri(i.Db, ref) 138} 139 140func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 141 var legacy struct { 142 Subject *string `json:"subject"` 143 SubjectDid *string `json:"subjectDid"` 144 } 145 if err := json.Unmarshal(raw, &legacy); err != nil { 146 return false, err 147 } 148 149 switch { 150 case legacy.SubjectDid != nil: 151 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 152 if err != nil { 153 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 154 return false, nil 155 } 156 star.SubjectType = models.StarSubjectRepo 157 star.Subject = repo.RepoDid 158 return true, nil 159 160 case legacy.Subject != nil: 161 uri, err := syntax.ParseATURI(*legacy.Subject) 162 if err != nil { 163 return false, fmt.Errorf("invalid old-format star subject: %w", err) 164 } 165 switch uri.Collection().String() { 166 case tangled.RepoNSID: 167 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 168 if err != nil { 169 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 170 return false, nil 171 } 172 star.SubjectType = models.StarSubjectRepo 173 star.Subject = repo.RepoDid 174 return true, nil 175 default: 176 star.SubjectType = models.StarSubjectString 177 star.Subject = *legacy.Subject 178 return true, nil 179 } 180 181 default: 182 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 183 } 184} 185 186func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 187 var err error 188 did := e.Did 189 190 l := i.Logger.With("handler", "ingestStar") 191 l = l.With("nsid", e.Commit.Collection) 192 193 switch e.Commit.Operation { 194 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 195 raw := json.RawMessage(e.Commit.Record) 196 record := tangled.FeedStar{} 197 unmarshalErr := json.Unmarshal(raw, &record) 198 199 star := &models.Star{ 200 Did: did, 201 Rkey: e.Commit.RKey, 202 } 203 204 switch { 205 case unmarshalErr != nil: 206 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 207 if resolveErr != nil { 208 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 209 return unmarshalErr 210 } 211 if !resolved { 212 return nil 213 } 214 215 case record.Subject == nil: 216 return fmt.Errorf("star record has nil subject") 217 218 case record.Subject.FeedStar_Repo != nil: 219 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 220 if repoErr != nil { 221 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 222 return nil 223 } 224 star.SubjectType = models.StarSubjectRepo 225 star.Subject = repo.RepoDid 226 227 case record.Subject.FeedStar_String != nil: 228 star.SubjectType = models.StarSubjectString 229 star.Subject = record.Subject.FeedStar_String.Uri 230 231 default: 232 return fmt.Errorf("star record has empty subject union") 233 } 234 235 err = db.AddStar(i.Db, star) 236 case jmodels.CommitOperationDelete: 237 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 238 } 239 240 if err != nil { 241 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 242 } 243 244 return nil 245} 246 247func (i *Ingester) ingestFollow(e *jmodels.Event) error { 248 var err error 249 did := e.Did 250 251 l := i.Logger.With("handler", "ingestFollow") 252 l = l.With("nsid", e.Commit.Collection) 253 254 switch e.Commit.Operation { 255 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 256 raw := json.RawMessage(e.Commit.Record) 257 record := tangled.GraphFollow{} 258 err = json.Unmarshal(raw, &record) 259 if err != nil { 260 l.Error("invalid record", "err", err) 261 return err 262 } 263 264 err = db.AddFollow(i.Db, &models.Follow{ 265 UserDid: did, 266 SubjectDid: record.Subject, 267 Rkey: e.Commit.RKey, 268 }) 269 case jmodels.CommitOperationDelete: 270 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 271 } 272 273 if err != nil { 274 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 275 } 276 277 return nil 278} 279 280func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 281 var err error 282 did := e.Did 283 284 l := i.Logger.With("handler", "ingestVouch") 285 l = l.With("nsid", e.Commit.Collection) 286 l.Info("ingesting vouch") 287 288 switch e.Commit.Operation { 289 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 290 raw := json.RawMessage(e.Commit.Record) 291 record := tangled.GraphVouch{} 292 err = json.Unmarshal(raw, &record) 293 if err != nil { 294 l.Error("invalid record", "err", err) 295 return err 296 } 297 298 // rkey is the subject_did being vouched for/denounced 299 subjectDID := e.Commit.RKey 300 301 _, err = syntax.ParseDID(subjectDID) 302 if err != nil { 303 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 304 return fmt.Errorf("invalid subject_did: %w", err) 305 } 306 307 if did == subjectDID { 308 l.Warn("attempted self-vouch", "did", did) 309 return fmt.Errorf("cannot vouch for self") 310 } 311 312 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 313 if err != nil { 314 return err 315 } 316 317 if subjectId.Handle.IsInvalidHandle() { 318 return err 319 } 320 321 kind, err := models.ParseVouchKind(record.Kind) 322 if err != nil { 323 l.Error("invalid kind", "kind", kind) 324 return fmt.Errorf("invalid kind: %s", kind) 325 } 326 327 recordCid, err := cid.Parse(e.Commit.CID) 328 if err != nil { 329 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 330 return fmt.Errorf("invalid cid: %w", err) 331 } 332 333 var evidences []syntax.ATURI 334 for _, raw := range record.Evidences { 335 uri, parseErr := syntax.ParseATURI(raw) 336 if parseErr != nil { 337 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 338 continue 339 } 340 evidences = append(evidences, uri) 341 } 342 343 tx, txErr := i.Db.Begin() 344 if txErr != nil { 345 return fmt.Errorf("failed to start transaction: %w", txErr) 346 } 347 348 addErr := db.AddVouch(tx, &models.Vouch{ 349 Did: syntax.DID(did), 350 SubjectDid: subjectId.DID, 351 Cid: recordCid, 352 Kind: kind, 353 Reason: record.Reason, 354 Evidences: evidences, 355 }) 356 if addErr != nil { 357 tx.Rollback() 358 err = addErr 359 } else { 360 err = tx.Commit() 361 } 362 363 case jmodels.CommitOperationDelete: 364 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 365 } 366 367 if err != nil { 368 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 369 } 370 371 return nil 372} 373 374func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 375 did := e.Did 376 var err error 377 378 l := i.Logger.With("handler", "ingestPublicKey") 379 l = l.With("nsid", e.Commit.Collection) 380 381 switch e.Commit.Operation { 382 case jmodels.CommitOperationCreate: 383 l.Debug("processing add of pubkey") 384 raw := json.RawMessage(e.Commit.Record) 385 record := tangled.PublicKey{} 386 err = json.Unmarshal(raw, &record) 387 if err != nil { 388 l.Error("invalid record", "err", err) 389 return err 390 } 391 392 name := record.Name 393 key := record.Key 394 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 395 case jmodels.CommitOperationUpdate: 396 l.Debug("processing update of pubkey") 397 raw := json.RawMessage(e.Commit.Record) 398 record := tangled.PublicKey{} 399 err = json.Unmarshal(raw, &record) 400 if err != nil { 401 l.Error("invalid record", "err", err) 402 return err 403 } 404 405 name := record.Name 406 key := record.Key 407 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 408 case jmodels.CommitOperationDelete: 409 l.Debug("processing delete of pubkey") 410 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 411 } 412 413 if err != nil { 414 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 415 } 416 417 return nil 418} 419 420func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 421 did := e.Did 422 var err error 423 424 l := i.Logger.With("handler", "ingestArtifact") 425 l = l.With("nsid", e.Commit.Collection) 426 427 switch e.Commit.Operation { 428 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 429 raw := json.RawMessage(e.Commit.Record) 430 record := tangled.RepoArtifact{} 431 err = json.Unmarshal(raw, &record) 432 if err != nil { 433 l.Error("invalid record", "err", err) 434 return err 435 } 436 437 var repo *models.Repo 438 if record.RepoDid != nil && *record.RepoDid != "" { 439 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 440 if err != nil && !errors.Is(err, sql.ErrNoRows) { 441 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 442 } 443 } 444 if repo == nil && record.Repo != nil { 445 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 446 if parseErr != nil { 447 return parseErr 448 } 449 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 450 if err != nil { 451 return err 452 } 453 } 454 if repo == nil { 455 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 456 } 457 458 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 459 if err != nil || !ok { 460 return err 461 } 462 463 repoDid := repo.RepoDid 464 if repoDid == "" && record.RepoDid != nil { 465 repoDid = *record.RepoDid 466 } 467 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 468 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 469 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 470 } 471 } 472 473 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 474 if err != nil { 475 createdAt = time.Now() 476 } 477 478 artifact := models.Artifact{ 479 Did: did, 480 Rkey: e.Commit.RKey, 481 RepoDid: syntax.DID(repo.RepoDid), 482 Tag: plumbing.Hash(record.Tag), 483 CreatedAt: createdAt, 484 BlobCid: cid.Cid(record.Artifact.Ref), 485 Name: record.Name, 486 Size: uint64(record.Artifact.Size), 487 MimeType: record.Artifact.MimeType, 488 } 489 490 err = db.AddArtifact(i.Db, artifact) 491 case jmodels.CommitOperationDelete: 492 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 493 } 494 495 if err != nil { 496 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 497 } 498 499 return nil 500} 501 502func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 503 did := e.Did 504 var err error 505 506 l := i.Logger.With("handler", "ingestProfile") 507 l = l.With("nsid", e.Commit.Collection) 508 509 if e.Commit.RKey != "self" { 510 return fmt.Errorf("ingestProfile only ingests `self` record") 511 } 512 513 switch e.Commit.Operation { 514 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 515 raw := json.RawMessage(e.Commit.Record) 516 record := tangled.ActorProfile{} 517 err = json.Unmarshal(raw, &record) 518 if err != nil { 519 l.Error("invalid record", "err", err) 520 return err 521 } 522 523 avatar := "" 524 if record.Avatar != nil { 525 avatar = record.Avatar.Ref.String() 526 } 527 528 description := "" 529 if record.Description != nil { 530 description = *record.Description 531 } 532 533 includeBluesky := record.Bluesky 534 535 pronouns := "" 536 if record.Pronouns != nil { 537 pronouns = *record.Pronouns 538 } 539 540 location := "" 541 if record.Location != nil { 542 location = *record.Location 543 } 544 545 var links [5]string 546 for i, l := range record.Links { 547 if i < 5 { 548 links[i] = l 549 } 550 } 551 552 var stats [2]models.VanityStat 553 for i, s := range record.Stats { 554 if i < 2 { 555 stats[i].Kind = models.ParseVanityStatKind(s) 556 } 557 } 558 559 var pinned [6]string 560 for i, r := range record.PinnedRepositories { 561 if i < 6 { 562 pinned[i] = r 563 } 564 } 565 566 var preferredHandle syntax.Handle 567 if record.PreferredHandle != nil { 568 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 569 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 570 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 571 preferredHandle = h 572 } 573 } 574 } 575 576 profile := models.Profile{ 577 Did: did, 578 Avatar: avatar, 579 Description: description, 580 IncludeBluesky: includeBluesky, 581 Location: location, 582 Links: links, 583 Stats: stats, 584 PinnedRepos: pinned, 585 Pronouns: pronouns, 586 PreferredHandle: preferredHandle, 587 } 588 589 tx, err := i.Db.Begin() 590 if err != nil { 591 return fmt.Errorf("failed to start transaction: %w", err) 592 } 593 594 err = db.ValidateProfile(tx, &profile) 595 if err != nil { 596 return fmt.Errorf("invalid profile record") 597 } 598 599 err = db.UpsertProfile(tx, &profile) 600 if err == nil && i.Cache != nil { 601 pipe := i.Cache.Pipeline() 602 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 603 if preferredHandle != "" { 604 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 605 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 606 } else { 607 pipe.Del(ctx, didKey) 608 } 609 if _, execErr := pipe.Exec(ctx); execErr != nil { 610 l.Warn("failed to update preferred handle cache", "err", execErr) 611 } 612 } 613 case jmodels.CommitOperationDelete: 614 tx, beginErr := i.Db.Begin() 615 if beginErr != nil { 616 return fmt.Errorf("failed to start transaction: %w", beginErr) 617 } 618 619 priorHandle, phErr := db.GetPreferredHandle(tx, did) 620 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 621 l.Warn("failed to read prior preferred handle", "err", phErr) 622 } 623 624 err = db.DeleteProfile(tx, did) 625 if err == nil && i.Cache != nil { 626 pipe := i.Cache.Pipeline() 627 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 628 if priorHandle != "" { 629 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 630 } 631 if _, execErr := pipe.Exec(ctx); execErr != nil { 632 l.Warn("failed to evict preferred handle cache", "err", execErr) 633 } 634 } 635 } 636 637 if err != nil { 638 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 639 } 640 641 return nil 642} 643 644func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 645 did := e.Did 646 var err error 647 648 l := i.Logger.With("handler", "ingestSpindleMember") 649 l = l.With("nsid", e.Commit.Collection) 650 651 switch e.Commit.Operation { 652 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 653 raw := json.RawMessage(e.Commit.Record) 654 record := tangled.SpindleMember{} 655 err = json.Unmarshal(raw, &record) 656 if err != nil { 657 l.Error("invalid record", "err", err) 658 return err 659 } 660 661 // only spindle owner can invite to spindles 662 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 663 if err != nil { 664 return fmt.Errorf("failed to check invite permission: %w", err) 665 } 666 if !ok { 667 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 668 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 669 } 670 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 671 if err != nil { 672 return fmt.Errorf("failed to re-check invite permission: %w", err) 673 } 674 if !ok { 675 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 676 } 677 } 678 679 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 680 if err != nil { 681 return err 682 } 683 684 if memberId.Handle.IsInvalidHandle() { 685 return fmt.Errorf("invalid handle for member %s", record.Subject) 686 } 687 688 existing, err := db.GetSpindleMembers(i.Db, 689 orm.FilterEq("did", did), 690 orm.FilterEq("rkey", e.Commit.RKey), 691 ) 692 if err != nil { 693 return fmt.Errorf("failed to look up existing member: %w", err) 694 } 695 if len(existing) > 1 { 696 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 697 } 698 699 tx, err := i.Db.Begin() 700 if err != nil { 701 return fmt.Errorf("failed to start txn: %w", err) 702 } 703 committed := false 704 defer func() { 705 if committed { 706 return 707 } 708 tx.Rollback() 709 i.Enforcer.E.LoadPolicy() 710 }() 711 712 if len(existing) == 1 { 713 prev := existing[0] 714 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 715 if err = db.RemoveSpindleMember(tx, 716 orm.FilterEq("did", did), 717 orm.FilterEq("rkey", e.Commit.RKey), 718 ); err != nil { 719 return fmt.Errorf("failed to remove stale row: %w", err) 720 } 721 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 722 return fmt.Errorf("failed to remove stale ACL: %w", err) 723 } 724 } 725 } 726 727 if err = db.AddSpindleMember(tx, models.SpindleMember{ 728 Did: syntax.DID(did), 729 Rkey: e.Commit.RKey, 730 Instance: record.Instance, 731 Subject: memberId.DID, 732 }); err != nil { 733 return fmt.Errorf("failed to add to db: %w", err) 734 } 735 736 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 737 return fmt.Errorf("failed to update ACLs: %w", err) 738 } 739 740 if err = tx.Commit(); err != nil { 741 return fmt.Errorf("failed to commit txn: %w", err) 742 } 743 744 if err = i.Enforcer.E.SavePolicy(); err != nil { 745 return fmt.Errorf("failed to save ACLs: %w", err) 746 } 747 committed = true 748 749 l.Info("upserted spindle member") 750 case jmodels.CommitOperationDelete: 751 rkey := e.Commit.RKey 752 753 // get record from db first 754 members, err := db.GetSpindleMembers( 755 i.Db, 756 orm.FilterEq("did", did), 757 orm.FilterEq("rkey", rkey), 758 ) 759 if err != nil || len(members) != 1 { 760 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 761 } 762 member := members[0] 763 764 tx, err := i.Db.Begin() 765 if err != nil { 766 return fmt.Errorf("failed to start txn: %w", err) 767 } 768 committed := false 769 defer func() { 770 if committed { 771 return 772 } 773 tx.Rollback() 774 i.Enforcer.E.LoadPolicy() 775 }() 776 777 // remove record by rkey && update enforcer 778 if err = db.RemoveSpindleMember( 779 tx, 780 orm.FilterEq("did", did), 781 orm.FilterEq("rkey", rkey), 782 ); err != nil { 783 return fmt.Errorf("failed to remove from db: %w", err) 784 } 785 786 // update enforcer 787 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 788 if err != nil { 789 return fmt.Errorf("failed to update ACLs: %w", err) 790 } 791 792 if err = tx.Commit(); err != nil { 793 return fmt.Errorf("failed to commit txn: %w", err) 794 } 795 796 if err = i.Enforcer.E.SavePolicy(); err != nil { 797 return fmt.Errorf("failed to save ACLs: %w", err) 798 } 799 committed = true 800 801 l.Info("removed spindle member") 802 } 803 804 return nil 805} 806 807func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 808 did := e.Did 809 var err error 810 811 l := i.Logger.With("handler", "ingestSpindle") 812 l = l.With("nsid", e.Commit.Collection) 813 814 switch e.Commit.Operation { 815 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 816 raw := json.RawMessage(e.Commit.Record) 817 record := tangled.Spindle{} 818 err = json.Unmarshal(raw, &record) 819 if err != nil { 820 l.Error("invalid record", "err", err) 821 return err 822 } 823 824 instance := e.Commit.RKey 825 826 err := db.AddSpindle(i.Db, models.Spindle{ 827 Owner: syntax.DID(did), 828 Instance: instance, 829 }) 830 if err != nil { 831 l.Error("failed to add spindle to db", "err", err, "instance", instance) 832 return err 833 } 834 835 if err := i.verifySpindle(ctx, instance, did); err != nil { 836 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 837 } 838 839 return nil 840 841 case jmodels.CommitOperationDelete: 842 instance := e.Commit.RKey 843 844 // get record from db first 845 spindles, err := db.GetSpindles( 846 ctx, 847 i.Db, 848 orm.FilterEq("owner", did), 849 orm.FilterEq("instance", instance), 850 ) 851 if err != nil || len(spindles) != 1 { 852 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 853 } 854 spindle := spindles[0] 855 856 tx, err := i.Db.Begin() 857 if err != nil { 858 return err 859 } 860 defer func() { 861 tx.Rollback() 862 i.Enforcer.E.LoadPolicy() 863 }() 864 865 // remove spindle members first 866 err = db.RemoveSpindleMember( 867 tx, 868 orm.FilterEq("owner", did), 869 orm.FilterEq("instance", instance), 870 ) 871 if err != nil { 872 return err 873 } 874 875 err = db.DeleteSpindle( 876 tx, 877 orm.FilterEq("owner", did), 878 orm.FilterEq("instance", instance), 879 ) 880 if err != nil { 881 return err 882 } 883 884 if spindle.Verified != nil { 885 err = i.Enforcer.RemoveSpindle(instance) 886 if err != nil { 887 return err 888 } 889 } 890 891 err = tx.Commit() 892 if err != nil { 893 return err 894 } 895 896 err = i.Enforcer.E.SavePolicy() 897 if err != nil { 898 return err 899 } 900 } 901 902 return nil 903} 904 905func (i *Ingester) ingestString(e *jmodels.Event) error { 906 did := e.Did 907 rkey := e.Commit.RKey 908 909 var err error 910 911 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 912 l.Info("ingesting record") 913 914 switch e.Commit.Operation { 915 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 916 raw := json.RawMessage(e.Commit.Record) 917 record := tangled.String{} 918 err = json.Unmarshal(raw, &record) 919 if err != nil { 920 l.Error("invalid record", "err", err) 921 return err 922 } 923 924 string := models.StringFromRecord(did, rkey, record) 925 926 if err = i.Validator.ValidateString(&string); err != nil { 927 l.Error("invalid record", "err", err) 928 return err 929 } 930 931 if err = db.AddString(i.Db, string); err != nil { 932 l.Error("failed to add string", "err", err) 933 return err 934 } 935 936 return nil 937 938 case jmodels.CommitOperationDelete: 939 if err := db.DeleteString( 940 i.Db, 941 orm.FilterEq("did", did), 942 orm.FilterEq("rkey", rkey), 943 ); err != nil { 944 l.Error("failed to delete", "err", err) 945 return fmt.Errorf("failed to delete string record: %w", err) 946 } 947 948 return nil 949 } 950 951 return nil 952} 953 954func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error { 955 did := e.Did 956 var err error 957 958 l := i.Logger.With("handler", "ingestKnotMember") 959 l = l.With("nsid", e.Commit.Collection) 960 961 switch e.Commit.Operation { 962 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 963 raw := json.RawMessage(e.Commit.Record) 964 record := tangled.KnotMember{} 965 err = json.Unmarshal(raw, &record) 966 if err != nil { 967 l.Error("invalid record", "err", err) 968 return err 969 } 970 971 // only knot owner can invite to knots 972 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 973 if err != nil { 974 return fmt.Errorf("failed to check invite permission: %w", err) 975 } 976 if !ok { 977 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 978 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 979 } 980 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 981 if err != nil { 982 return fmt.Errorf("failed to re-check invite permission: %w", err) 983 } 984 if !ok { 985 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 986 } 987 } 988 989 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 990 if err != nil { 991 return err 992 } 993 994 if memberId.Handle.IsInvalidHandle() { 995 return fmt.Errorf("invalid handle for member %s", record.Subject) 996 } 997 998 existing, err := db.GetKnotMembers(i.Db, 999 orm.FilterEq("did", did), 1000 orm.FilterEq("rkey", e.Commit.RKey), 1001 ) 1002 if err != nil { 1003 return fmt.Errorf("failed to look up existing member: %w", err) 1004 } 1005 if len(existing) > 1 { 1006 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1007 } 1008 1009 tx, err := i.Db.Begin() 1010 if err != nil { 1011 return fmt.Errorf("failed to start txn: %w", err) 1012 } 1013 committed := false 1014 defer func() { 1015 if committed { 1016 return 1017 } 1018 tx.Rollback() 1019 i.Enforcer.E.LoadPolicy() 1020 }() 1021 1022 if len(existing) == 1 { 1023 prev := existing[0] 1024 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1025 if err = db.RemoveKnotMember(tx, 1026 orm.FilterEq("did", did), 1027 orm.FilterEq("rkey", e.Commit.RKey), 1028 ); err != nil { 1029 return fmt.Errorf("failed to remove stale row: %w", err) 1030 } 1031 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1032 return fmt.Errorf("failed to remove stale ACL: %w", err) 1033 } 1034 } 1035 } 1036 1037 if err = db.AddKnotMember(tx, models.KnotMember{ 1038 Did: syntax.DID(did), 1039 Rkey: e.Commit.RKey, 1040 Domain: record.Domain, 1041 Subject: memberId.DID, 1042 }); err != nil { 1043 return fmt.Errorf("failed to add to db: %w", err) 1044 } 1045 1046 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1047 return fmt.Errorf("failed to update ACLs: %w", err) 1048 } 1049 1050 if err = tx.Commit(); err != nil { 1051 return fmt.Errorf("failed to commit txn: %w", err) 1052 } 1053 1054 if err = i.Enforcer.E.SavePolicy(); err != nil { 1055 return fmt.Errorf("failed to save ACLs: %w", err) 1056 } 1057 committed = true 1058 1059 l.Info("upserted knot member") 1060 case jmodels.CommitOperationDelete: 1061 rkey := e.Commit.RKey 1062 1063 members, err := db.GetKnotMembers( 1064 i.Db, 1065 orm.FilterEq("did", did), 1066 orm.FilterEq("rkey", rkey), 1067 ) 1068 if err != nil { 1069 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1070 } 1071 if len(members) == 0 { 1072 l.Info("knot member already removed", "rkey", rkey) 1073 return nil 1074 } 1075 if len(members) > 1 { 1076 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1077 } 1078 member := members[0] 1079 1080 tx, err := i.Db.Begin() 1081 if err != nil { 1082 return fmt.Errorf("failed to start txn: %w", err) 1083 } 1084 committed := false 1085 defer func() { 1086 if committed { 1087 return 1088 } 1089 tx.Rollback() 1090 i.Enforcer.E.LoadPolicy() 1091 }() 1092 1093 if err = db.RemoveKnotMember( 1094 tx, 1095 orm.FilterEq("did", did), 1096 orm.FilterEq("rkey", rkey), 1097 ); err != nil { 1098 return fmt.Errorf("failed to remove from db: %w", err) 1099 } 1100 1101 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1102 return fmt.Errorf("failed to update ACLs: %w", err) 1103 } 1104 1105 if err = tx.Commit(); err != nil { 1106 return fmt.Errorf("failed to commit txn: %w", err) 1107 } 1108 1109 if err = i.Enforcer.E.SavePolicy(); err != nil { 1110 return fmt.Errorf("failed to save ACLs: %w", err) 1111 } 1112 committed = true 1113 1114 l.Info("removed knot member") 1115 } 1116 1117 return nil 1118} 1119 1120func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error { 1121 did := e.Did 1122 var err error 1123 1124 l := i.Logger.With("handler", "ingestKnot") 1125 l = l.With("nsid", e.Commit.Collection) 1126 1127 switch e.Commit.Operation { 1128 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1129 raw := json.RawMessage(e.Commit.Record) 1130 record := tangled.Knot{} 1131 err = json.Unmarshal(raw, &record) 1132 if err != nil { 1133 l.Error("invalid record", "err", err) 1134 return err 1135 } 1136 1137 domain := e.Commit.RKey 1138 1139 err := db.AddKnot(i.Db, domain, did) 1140 if err != nil { 1141 l.Error("failed to add knot to db", "err", err, "domain", domain) 1142 return err 1143 } 1144 1145 if err := i.verifyKnot(ctx, domain, did); err != nil { 1146 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1147 } 1148 1149 return nil 1150 1151 case jmodels.CommitOperationDelete: 1152 domain := e.Commit.RKey 1153 1154 // get record from db first 1155 registrations, err := db.GetRegistrations( 1156 i.Db, 1157 orm.FilterEq("domain", domain), 1158 orm.FilterEq("did", did), 1159 ) 1160 if err != nil { 1161 return fmt.Errorf("failed to get registration: %w", err) 1162 } 1163 if len(registrations) != 1 { 1164 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1165 } 1166 registration := registrations[0] 1167 1168 tx, err := i.Db.Begin() 1169 if err != nil { 1170 return err 1171 } 1172 defer func() { 1173 tx.Rollback() 1174 i.Enforcer.E.LoadPolicy() 1175 }() 1176 1177 err = db.RemoveKnotMember( 1178 tx, 1179 orm.FilterEq("did", did), 1180 orm.FilterEq("domain", domain), 1181 ) 1182 if err != nil { 1183 return err 1184 } 1185 1186 err = db.DeleteKnot( 1187 tx, 1188 orm.FilterEq("did", did), 1189 orm.FilterEq("domain", domain), 1190 ) 1191 if err != nil { 1192 return err 1193 } 1194 1195 err = db.RemoveReposByKnot(tx, domain) 1196 if err != nil { 1197 return err 1198 } 1199 1200 if registration.Registered != nil { 1201 err = i.Enforcer.RemoveKnot(domain) 1202 if err != nil { 1203 return err 1204 } 1205 } 1206 1207 err = tx.Commit() 1208 if err != nil { 1209 return err 1210 } 1211 1212 err = i.Enforcer.E.SavePolicy() 1213 if err != nil { 1214 return err 1215 } 1216 } 1217 1218 return nil 1219} 1220 1221const ( 1222 verifyAttempts = 4 1223 verifyMinDelay = 1 * time.Second 1224 verifyMaxDelay = 5 * time.Second 1225) 1226 1227func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1228 regs, err := db.GetRegistrations(i.Db, 1229 orm.FilterEq("domain", domain), 1230 orm.FilterEq("did", did), 1231 ) 1232 if err != nil { 1233 return fmt.Errorf("look up registration: %w", err) 1234 } 1235 if len(regs) != 1 { 1236 return fmt.Errorf("no registration for %s by %s", domain, did) 1237 } 1238 if regs[0].Registered != nil { 1239 return nil 1240 } 1241 1242 err = retry.Do( 1243 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1244 retry.Context(ctx), 1245 retry.Attempts(verifyAttempts), 1246 retry.Delay(verifyMinDelay), 1247 retry.MaxDelay(verifyMaxDelay), 1248 retry.DelayType(retry.BackOffDelay), 1249 retry.LastErrorOnly(true), 1250 ) 1251 if err != nil { 1252 return fmt.Errorf("verify: %w", err) 1253 } 1254 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1255} 1256 1257func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1258 spindles, err := db.GetSpindles(ctx, i.Db, 1259 orm.FilterEq("instance", instance), 1260 orm.FilterEq("owner", did), 1261 ) 1262 if err != nil { 1263 return fmt.Errorf("look up spindle: %w", err) 1264 } 1265 if len(spindles) != 1 { 1266 return fmt.Errorf("no spindle for %s by %s", instance, did) 1267 } 1268 if spindles[0].Verified != nil { 1269 return nil 1270 } 1271 1272 err = retry.Do( 1273 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1274 retry.Context(ctx), 1275 retry.Attempts(verifyAttempts), 1276 retry.Delay(verifyMinDelay), 1277 retry.MaxDelay(verifyMaxDelay), 1278 retry.DelayType(retry.BackOffDelay), 1279 retry.LastErrorOnly(true), 1280 ) 1281 if err != nil { 1282 return fmt.Errorf("verify: %w", err) 1283 } 1284 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1285 return err 1286} 1287 1288const sweepConcurrency = 4 1289 1290func (i *Ingester) SweepPendingVerifications() { 1291 l := i.Logger.With("handler", "SweepPendingVerifications") 1292 1293 var g errgroup.Group 1294 g.SetLimit(sweepConcurrency) 1295 1296 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1297 if err != nil { 1298 l.Error("failed to list unverified knots", "err", err) 1299 } else { 1300 for _, reg := range regs { 1301 g.Go(func() error { 1302 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1303 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1304 } 1305 return nil 1306 }) 1307 } 1308 } 1309 1310 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1311 if err != nil { 1312 l.Error("failed to list unverified spindles", "err", err) 1313 g.Wait() 1314 return 1315 } 1316 for _, s := range spindles { 1317 g.Go(func() error { 1318 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1319 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1320 } 1321 return nil 1322 }) 1323 } 1324 g.Wait() 1325} 1326 1327func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1328 did := e.Did 1329 rkey := e.Commit.RKey 1330 1331 var err error 1332 1333 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1334 l.Info("ingesting record") 1335 1336 switch e.Commit.Operation { 1337 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1338 raw := json.RawMessage(e.Commit.Record) 1339 record := tangled.RepoIssue{} 1340 err = json.Unmarshal(raw, &record) 1341 if err != nil { 1342 l.Error("invalid record", "err", err) 1343 return err 1344 } 1345 1346 issue := models.IssueFromRecord(did, rkey, record) 1347 1348 if issue.RepoDid == "" { 1349 return fmt.Errorf("issue record has no repo field") 1350 } 1351 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1352 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1353 } 1354 1355 if err := i.Validator.ValidateIssue(&issue); err != nil { 1356 return fmt.Errorf("failed to validate issue: %w", err) 1357 } 1358 1359 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1360 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1361 if repoErr == nil && repo.RepoDid != "" { 1362 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1363 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1364 } 1365 } 1366 } 1367 1368 tx, err := i.Db.BeginTx(ctx, nil) 1369 if err != nil { 1370 l.Error("failed to begin transaction", "err", err) 1371 return err 1372 } 1373 defer tx.Rollback() 1374 1375 err = db.PutIssue(tx, &issue) 1376 if err != nil { 1377 l.Error("failed to create issue", "err", err) 1378 return err 1379 } 1380 1381 err = tx.Commit() 1382 if err != nil { 1383 l.Error("failed to commit txn", "err", err) 1384 return err 1385 } 1386 1387 return nil 1388 1389 case jmodels.CommitOperationDelete: 1390 tx, err := i.Db.BeginTx(ctx, nil) 1391 if err != nil { 1392 l.Error("failed to begin transaction", "err", err) 1393 return err 1394 } 1395 defer tx.Rollback() 1396 1397 if err := db.DeleteIssues( 1398 tx, 1399 did, 1400 rkey, 1401 ); err != nil { 1402 l.Error("failed to delete", "err", err) 1403 return fmt.Errorf("failed to delete issue record: %w", err) 1404 } 1405 if err := tx.Commit(); err != nil { 1406 l.Error("failed to commit txn", "err", err) 1407 return err 1408 } 1409 1410 return nil 1411 } 1412 1413 return nil 1414} 1415 1416func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1417 did := e.Did 1418 rkey := e.Commit.RKey 1419 1420 var err error 1421 1422 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1423 l.Info("ingesting record") 1424 1425 switch e.Commit.Operation { 1426 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1427 raw := json.RawMessage(e.Commit.Record) 1428 record := tangled.RepoPull{} 1429 err = json.Unmarshal(raw, &record) 1430 if err != nil { 1431 l.Error("invalid record", "err", err) 1432 return err 1433 } 1434 1435 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1436 if err != nil { 1437 l.Error("failed to resolve did") 1438 return err 1439 } 1440 1441 // go through and fetch all blobs in parallel 1442 readers := make([]*io.ReadCloser, len(record.Rounds)) 1443 var mu sync.Mutex 1444 1445 g, gctx := errgroup.WithContext(ctx) 1446 1447 for idx, b := range record.Rounds { 1448 g.Go(func() error { 1449 // for some reason, a blob is empty 1450 if b.PatchBlob == nil { 1451 return fmt.Errorf("missing patchBlob in round %d", idx) 1452 } 1453 1454 ownerPds := ownerId.PDSEndpoint() 1455 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1456 q := url.Query() 1457 q.Set("cid", b.PatchBlob.Ref.String()) 1458 q.Set("did", did) 1459 url.RawQuery = q.Encode() 1460 1461 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1462 if err != nil { 1463 l.Error("failed to create request") 1464 return err 1465 } 1466 req.Header.Set("Content-Type", "application/json") 1467 1468 resp, err := http.DefaultClient.Do(req) 1469 if err != nil { 1470 l.Error("failed to make request") 1471 return err 1472 } 1473 1474 mu.Lock() 1475 readers[idx] = &resp.Body 1476 mu.Unlock() 1477 1478 return nil 1479 }) 1480 } 1481 1482 if err := g.Wait(); err != nil { 1483 for _, r := range readers { 1484 if r != nil && *r != nil { 1485 (*r).Close() 1486 } 1487 } 1488 return err 1489 } 1490 1491 defer func() { 1492 for _, r := range readers { 1493 if r != nil && *r != nil { 1494 (*r).Close() 1495 } 1496 } 1497 }() 1498 1499 pull, err := models.PullFromRecord(did, rkey, record, readers) 1500 if err != nil { 1501 return fmt.Errorf("failed to parse pull from record: %w", err) 1502 } 1503 if err := i.Validator.ValidatePull(pull); err != nil { 1504 return fmt.Errorf("failed to validate pull: %w", err) 1505 } 1506 1507 tx, err := i.Db.BeginTx(ctx, nil) 1508 if err != nil { 1509 l.Error("failed to begin transaction", "err", err) 1510 return err 1511 } 1512 defer tx.Rollback() 1513 1514 err = db.PutPull(tx, pull) 1515 if err != nil { 1516 l.Error("failed to create pull", "err", err) 1517 return err 1518 } 1519 1520 err = tx.Commit() 1521 if err != nil { 1522 l.Error("failed to commit txn", "err", err) 1523 return err 1524 } 1525 1526 return nil 1527 1528 case jmodels.CommitOperationDelete: 1529 tx, err := i.Db.BeginTx(ctx, nil) 1530 if err != nil { 1531 l.Error("failed to begin transaction", "err", err) 1532 return err 1533 } 1534 defer tx.Rollback() 1535 1536 if err := db.AbandonPulls( 1537 tx, 1538 orm.FilterEq("owner_did", did), 1539 orm.FilterEq("rkey", rkey), 1540 ); err != nil { 1541 l.Error("failed to abandon", "err", err) 1542 return fmt.Errorf("failed to abandon pull record: %w", err) 1543 } 1544 if err := tx.Commit(); err != nil { 1545 l.Error("failed to commit txn", "err", err) 1546 return err 1547 } 1548 1549 return nil 1550 } 1551 1552 return nil 1553} 1554 1555// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1556func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1557 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1558 l.Info("ingesting record") 1559 1560 switch e.Commit.Operation { 1561 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1562 // no-op. sh.tangled.repo.issue.comment is deprecated 1563 1564 case jmodels.CommitOperationDelete: 1565 if err := db.PurgeComments( 1566 i.Db, 1567 orm.FilterEq("did", e.Did), 1568 orm.FilterEq("collection", e.Commit.Collection), 1569 orm.FilterEq("rkey", e.Commit.RKey), 1570 ); err != nil { 1571 return fmt.Errorf("failed to delete comment record: %w", err) 1572 } 1573 } 1574 1575 return nil 1576} 1577 1578// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1579func (i *Ingester) ingestPullComment(e *jmodels.Event) error { 1580 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1581 l.Info("ingesting record") 1582 1583 switch e.Commit.Operation { 1584 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1585 // no-op. sh.tangled.repo.pull.comment is deprecated 1586 1587 case jmodels.CommitOperationDelete: 1588 if err := db.PurgeComments( 1589 i.Db, 1590 orm.FilterEq("did", e.Did), 1591 orm.FilterEq("collection", e.Commit.Collection), 1592 orm.FilterEq("rkey", e.Commit.RKey), 1593 ); err != nil { 1594 return fmt.Errorf("failed to delete comment record: %w", err) 1595 } 1596 } 1597 1598 return nil 1599} 1600 1601func (i *Ingester) ingestComment(e *jmodels.Event) error { 1602 did := e.Did 1603 rkey := e.Commit.RKey 1604 cid := e.Commit.CID 1605 1606 var err error 1607 1608 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1609 l.Info("ingesting record") 1610 1611 ctx := context.Background() 1612 1613 switch e.Commit.Operation { 1614 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1615 raw := json.RawMessage(e.Commit.Record) 1616 record := tangled.FeedComment{} 1617 err = json.Unmarshal(raw, &record) 1618 if err != nil { 1619 return fmt.Errorf("invalid record: %w", err) 1620 } 1621 1622 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1623 if err != nil { 1624 return fmt.Errorf("failed to parse comment from record: %w", err) 1625 } 1626 1627 if err := comment.Validate(); err != nil { 1628 return fmt.Errorf("failed to validate comment: %w", err) 1629 } 1630 1631 var references []syntax.ATURI 1632 if comment.Body.Original != nil { 1633 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1634 } 1635 1636 tx, err := i.Db.Begin() 1637 if err != nil { 1638 return fmt.Errorf("failed to start transaction: %w", err) 1639 } 1640 defer tx.Rollback() 1641 1642 _, err = db.PutComment(tx, comment, references) 1643 if err != nil { 1644 return fmt.Errorf("failed to create comment: %w", err) 1645 } 1646 1647 if err := tx.Commit(); err != nil { 1648 return err 1649 } 1650 1651 case jmodels.CommitOperationDelete: 1652 if err := db.DeleteComments( 1653 i.Db, 1654 orm.FilterEq("did", did), 1655 orm.FilterEq("collection", e.Commit.Collection), 1656 orm.FilterEq("rkey", rkey), 1657 ); err != nil { 1658 return fmt.Errorf("failed to delete comment record: %w", err) 1659 } 1660 1661 return nil 1662 } 1663 1664 return nil 1665} 1666 1667func (i *Ingester) ingestReaction(e *jmodels.Event) error { 1668 did := e.Did 1669 rkey := e.Commit.RKey 1670 1671 l := i.Logger.With("handler", "ingestReaction", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1672 l.Info("ingesting record") 1673 1674 switch e.Commit.Operation { 1675 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1676 raw := json.RawMessage(e.Commit.Record) 1677 record := tangled.FeedReaction{} 1678 if err := json.Unmarshal(raw, &record); err != nil { 1679 return fmt.Errorf("invalid record: %w", err) 1680 } 1681 1682 subjectUri, err := syntax.ParseATURI(record.Subject) 1683 if err != nil { 1684 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1685 } 1686 subjectUri = models.NormalizeReactionSubject(subjectUri) 1687 1688 kind, ok := models.ParseReactionKind(record.Reaction) 1689 if !ok { 1690 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1691 } 1692 1693 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1694 if parseErr != nil { 1695 created = time.Now() 1696 } 1697 1698 tx, err := i.Db.Begin() 1699 if err != nil { 1700 return fmt.Errorf("failed to start transaction: %w", err) 1701 } 1702 defer tx.Rollback() 1703 1704 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1705 return fmt.Errorf("failed to clear existing reaction: %w", err) 1706 } 1707 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1708 return fmt.Errorf("failed to add reaction: %w", err) 1709 } 1710 1711 return tx.Commit() 1712 1713 case jmodels.CommitOperationDelete: 1714 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1715 return fmt.Errorf("failed to delete reaction record: %w", err) 1716 } 1717 } 1718 1719 return nil 1720} 1721 1722func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1723 did := e.Did 1724 rkey := e.Commit.RKey 1725 1726 var err error 1727 1728 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1729 l.Info("ingesting record") 1730 1731 switch e.Commit.Operation { 1732 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1733 raw := json.RawMessage(e.Commit.Record) 1734 record := tangled.LabelDefinition{} 1735 err = json.Unmarshal(raw, &record) 1736 if err != nil { 1737 return fmt.Errorf("invalid record: %w", err) 1738 } 1739 1740 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1741 if err != nil { 1742 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1743 } 1744 1745 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1746 return fmt.Errorf("failed to validate labeldef: %w", err) 1747 } 1748 1749 _, err = db.AddLabelDefinition(i.Db, def) 1750 if err != nil { 1751 return fmt.Errorf("failed to create labeldef: %w", err) 1752 } 1753 1754 return nil 1755 1756 case jmodels.CommitOperationDelete: 1757 if err := db.DeleteLabelDefinition( 1758 i.Db, 1759 orm.FilterEq("did", did), 1760 orm.FilterEq("rkey", rkey), 1761 ); err != nil { 1762 return fmt.Errorf("failed to delete labeldef record: %w", err) 1763 } 1764 1765 return nil 1766 } 1767 1768 return nil 1769} 1770 1771func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1772 did := e.Did 1773 rkey := e.Commit.RKey 1774 1775 var err error 1776 1777 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1778 l.Info("ingesting record") 1779 1780 switch e.Commit.Operation { 1781 case jmodels.CommitOperationCreate: 1782 raw := json.RawMessage(e.Commit.Record) 1783 record := tangled.LabelOp{} 1784 err = json.Unmarshal(raw, &record) 1785 if err != nil { 1786 return fmt.Errorf("invalid record: %w", err) 1787 } 1788 1789 subject := syntax.ATURI(record.Subject) 1790 collection := subject.Collection() 1791 1792 var repo *models.Repo 1793 switch collection { 1794 case tangled.RepoIssueNSID: 1795 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1796 if err != nil || len(i) != 1 { 1797 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1798 } 1799 repo = i[0].Repo 1800 case tangled.RepoPullNSID: 1801 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1802 if err != nil || len(p) != 1 { 1803 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1804 } 1805 repo = p[0].Repo 1806 default: 1807 return fmt.Errorf("unsupported label subject: %s", collection) 1808 } 1809 1810 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1811 if err != nil { 1812 return fmt.Errorf("failed to build label application ctx: %w", err) 1813 } 1814 1815 ops := models.LabelOpsFromRecord(did, rkey, record) 1816 1817 for _, o := range ops { 1818 def, ok := actx.Defs[o.OperandKey] 1819 if !ok { 1820 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1821 } 1822 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1823 return fmt.Errorf("failed to validate labelop: %w", err) 1824 } 1825 } 1826 1827 tx, err := i.Db.Begin() 1828 if err != nil { 1829 return err 1830 } 1831 defer tx.Rollback() 1832 1833 for _, o := range ops { 1834 _, err = db.AddLabelOp(tx, &o) 1835 if err != nil { 1836 return fmt.Errorf("failed to add labelop: %w", err) 1837 } 1838 } 1839 1840 if err = tx.Commit(); err != nil { 1841 return err 1842 } 1843 } 1844 1845 return nil 1846}