Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/mentions" 31 "tangled.org/core/appview/models" 32 "tangled.org/core/appview/notify" 33 "tangled.org/core/appview/repoverify" 34 "tangled.org/core/appview/serververify" 35 "tangled.org/core/appview/validator" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/orm" 38 "tangled.org/core/rbac" 39) 40 41type Ingester struct { 42 Ctx context.Context 43 Db *db.DB 44 Enforcer *rbac.Enforcer 45 IdResolver *idresolver.Resolver 46 Cache *cache.Cache 47 Config *config.Config 48 Logger *slog.Logger 49 Validator *validator.Validator 50 MentionsResolver *mentions.Resolver 51 Notifier notify.Notifier 52 Verifier repoverify.Verifier 53} 54 55type processFunc func(ctx context.Context, e *jmodels.Event) error 56 57func (i *Ingester) Ingest() processFunc { 58 return func(ctx context.Context, e *jmodels.Event) error { 59 var err error 60 61 l := i.Logger.With("kind", e.Kind) 62 switch e.Kind { 63 case jmodels.EventKindAccount: 64 // TODO: sync account state to db 65 if e.Account.Active { 66 break 67 } 68 // TODO: revoke sessions by DID 69 if *e.Account.Status == "deactivated" { 70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 71 } 72 case jmodels.EventKindIdentity: 73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 74 case jmodels.EventKindCommit: 75 switch e.Commit.Collection { 76 case tangled.GraphFollowNSID: 77 err = i.ingestFollow(e) 78 case tangled.GraphVouchNSID: 79 err = i.ingestVouch(ctx, e) 80 case tangled.FeedStarNSID: 81 err = i.ingestStar(ctx, e) 82 case tangled.PublicKeyNSID: 83 err = i.ingestPublicKey(e) 84 case tangled.RepoArtifactNSID: 85 err = i.ingestArtifact(ctx, e) 86 case tangled.ActorProfileNSID: 87 err = i.ingestProfile(ctx, e) 88 case tangled.SpindleMemberNSID: 89 err = i.ingestSpindleMember(ctx, e) 90 case tangled.SpindleNSID: 91 err = i.ingestSpindle(ctx, e) 92 case tangled.KnotMemberNSID: 93 err = i.ingestKnotMember(ctx, e) 94 case tangled.KnotNSID: 95 err = i.ingestKnot(ctx, e) 96 case tangled.StringNSID: 97 err = i.ingestString(e) 98 case tangled.RepoIssueNSID: 99 err = i.ingestIssue(ctx, e) 100 case tangled.RepoPullNSID: 101 err = i.ingestPull(ctx, e) 102 case tangled.FeedCommentNSID: 103 err = i.ingestComment(e) 104 case tangled.RepoIssueCommentNSID: 105 err = i.ingestIssueComment(e) 106 case tangled.RepoPullCommentNSID: 107 err = i.ingestPullComment(e) 108 case tangled.LabelDefinitionNSID: 109 err = i.ingestLabelDefinition(e) 110 case tangled.LabelOpNSID: 111 err = i.ingestLabelOp(e) 112 case tangled.RepoNSID: 113 err = i.ingestRepo(ctx, e) 114 } 115 l = i.Logger.With("nsid", e.Commit.Collection) 116 } 117 118 if err != nil { 119 l.Warn("failed to ingest record, skipping", "err", err) 120 } 121 122 lastTimeUs := e.TimeUS + 1 123 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 124 l.Error("failed to save cursor", "err", saveErr) 125 } 126 127 return nil 128 } 129} 130 131func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 132 if strings.HasPrefix(ref, "did:") { 133 return db.GetRepoByDid(i.Db, ref) 134 } 135 return db.GetRepoByAtUri(i.Db, ref) 136} 137 138func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 139 var legacy struct { 140 Subject *string `json:"subject"` 141 SubjectDid *string `json:"subjectDid"` 142 } 143 if err := json.Unmarshal(raw, &legacy); err != nil { 144 return false, err 145 } 146 147 switch { 148 case legacy.SubjectDid != nil: 149 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 150 if err != nil { 151 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 152 return false, nil 153 } 154 star.SubjectType = models.StarSubjectRepo 155 star.Subject = repo.RepoDid 156 return true, nil 157 158 case legacy.Subject != nil: 159 uri, err := syntax.ParseATURI(*legacy.Subject) 160 if err != nil { 161 return false, fmt.Errorf("invalid old-format star subject: %w", err) 162 } 163 switch uri.Collection().String() { 164 case tangled.RepoNSID: 165 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 166 if err != nil { 167 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 168 return false, nil 169 } 170 star.SubjectType = models.StarSubjectRepo 171 star.Subject = repo.RepoDid 172 return true, nil 173 default: 174 star.SubjectType = models.StarSubjectString 175 star.Subject = *legacy.Subject 176 return true, nil 177 } 178 179 default: 180 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 181 } 182} 183 184func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 185 var err error 186 did := e.Did 187 188 l := i.Logger.With("handler", "ingestStar") 189 l = l.With("nsid", e.Commit.Collection) 190 191 switch e.Commit.Operation { 192 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 193 raw := json.RawMessage(e.Commit.Record) 194 record := tangled.FeedStar{} 195 unmarshalErr := json.Unmarshal(raw, &record) 196 197 star := &models.Star{ 198 Did: did, 199 Rkey: e.Commit.RKey, 200 } 201 202 switch { 203 case unmarshalErr != nil: 204 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 205 if resolveErr != nil { 206 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 207 return unmarshalErr 208 } 209 if !resolved { 210 return nil 211 } 212 213 case record.Subject == nil: 214 return fmt.Errorf("star record has nil subject") 215 216 case record.Subject.FeedStar_Repo != nil: 217 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 218 if repoErr != nil { 219 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 220 return nil 221 } 222 star.SubjectType = models.StarSubjectRepo 223 star.Subject = repo.RepoDid 224 225 case record.Subject.FeedStar_String != nil: 226 star.SubjectType = models.StarSubjectString 227 star.Subject = record.Subject.FeedStar_String.Uri 228 229 default: 230 return fmt.Errorf("star record has empty subject union") 231 } 232 233 err = db.AddStar(i.Db, star) 234 case jmodels.CommitOperationDelete: 235 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 236 } 237 238 if err != nil { 239 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 240 } 241 242 return nil 243} 244 245func (i *Ingester) ingestFollow(e *jmodels.Event) error { 246 var err error 247 did := e.Did 248 249 l := i.Logger.With("handler", "ingestFollow") 250 l = l.With("nsid", e.Commit.Collection) 251 252 switch e.Commit.Operation { 253 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 254 raw := json.RawMessage(e.Commit.Record) 255 record := tangled.GraphFollow{} 256 err = json.Unmarshal(raw, &record) 257 if err != nil { 258 l.Error("invalid record", "err", err) 259 return err 260 } 261 262 err = db.AddFollow(i.Db, &models.Follow{ 263 UserDid: did, 264 SubjectDid: record.Subject, 265 Rkey: e.Commit.RKey, 266 }) 267 case jmodels.CommitOperationDelete: 268 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 269 } 270 271 if err != nil { 272 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 273 } 274 275 return nil 276} 277 278func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 279 var err error 280 did := e.Did 281 282 l := i.Logger.With("handler", "ingestVouch") 283 l = l.With("nsid", e.Commit.Collection) 284 l.Info("ingesting vouch") 285 286 switch e.Commit.Operation { 287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 288 raw := json.RawMessage(e.Commit.Record) 289 record := tangled.GraphVouch{} 290 err = json.Unmarshal(raw, &record) 291 if err != nil { 292 l.Error("invalid record", "err", err) 293 return err 294 } 295 296 // rkey is the subject_did being vouched for/denounced 297 subjectDID := e.Commit.RKey 298 299 _, err = syntax.ParseDID(subjectDID) 300 if err != nil { 301 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 302 return fmt.Errorf("invalid subject_did: %w", err) 303 } 304 305 if did == subjectDID { 306 l.Warn("attempted self-vouch", "did", did) 307 return fmt.Errorf("cannot vouch for self") 308 } 309 310 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 311 if err != nil { 312 return err 313 } 314 315 if subjectId.Handle.IsInvalidHandle() { 316 return err 317 } 318 319 kind, err := models.ParseVouchKind(record.Kind) 320 if err != nil { 321 l.Error("invalid kind", "kind", kind) 322 return fmt.Errorf("invalid kind: %s", kind) 323 } 324 325 recordCid, err := cid.Parse(e.Commit.CID) 326 if err != nil { 327 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 328 return fmt.Errorf("invalid cid: %w", err) 329 } 330 331 var evidences []syntax.ATURI 332 for _, raw := range record.Evidences { 333 uri, parseErr := syntax.ParseATURI(raw) 334 if parseErr != nil { 335 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 336 continue 337 } 338 evidences = append(evidences, uri) 339 } 340 341 tx, txErr := i.Db.Begin() 342 if txErr != nil { 343 return fmt.Errorf("failed to start transaction: %w", txErr) 344 } 345 346 addErr := db.AddVouch(tx, &models.Vouch{ 347 Did: syntax.DID(did), 348 SubjectDid: subjectId.DID, 349 Cid: recordCid, 350 Kind: kind, 351 Reason: record.Reason, 352 Evidences: evidences, 353 }) 354 if addErr != nil { 355 tx.Rollback() 356 err = addErr 357 } else { 358 err = tx.Commit() 359 } 360 361 case jmodels.CommitOperationDelete: 362 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 363 } 364 365 if err != nil { 366 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 367 } 368 369 return nil 370} 371 372func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 373 did := e.Did 374 var err error 375 376 l := i.Logger.With("handler", "ingestPublicKey") 377 l = l.With("nsid", e.Commit.Collection) 378 379 switch e.Commit.Operation { 380 case jmodels.CommitOperationCreate: 381 l.Debug("processing add of pubkey") 382 raw := json.RawMessage(e.Commit.Record) 383 record := tangled.PublicKey{} 384 err = json.Unmarshal(raw, &record) 385 if err != nil { 386 l.Error("invalid record", "err", err) 387 return err 388 } 389 390 name := record.Name 391 key := record.Key 392 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 393 case jmodels.CommitOperationUpdate: 394 l.Debug("processing update of pubkey") 395 raw := json.RawMessage(e.Commit.Record) 396 record := tangled.PublicKey{} 397 err = json.Unmarshal(raw, &record) 398 if err != nil { 399 l.Error("invalid record", "err", err) 400 return err 401 } 402 403 name := record.Name 404 key := record.Key 405 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 406 case jmodels.CommitOperationDelete: 407 l.Debug("processing delete of pubkey") 408 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 409 } 410 411 if err != nil { 412 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 413 } 414 415 return nil 416} 417 418func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 419 did := e.Did 420 var err error 421 422 l := i.Logger.With("handler", "ingestArtifact") 423 l = l.With("nsid", e.Commit.Collection) 424 425 switch e.Commit.Operation { 426 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 427 raw := json.RawMessage(e.Commit.Record) 428 record := tangled.RepoArtifact{} 429 err = json.Unmarshal(raw, &record) 430 if err != nil { 431 l.Error("invalid record", "err", err) 432 return err 433 } 434 435 var repo *models.Repo 436 if record.RepoDid != nil && *record.RepoDid != "" { 437 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 438 if err != nil && !errors.Is(err, sql.ErrNoRows) { 439 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 440 } 441 } 442 if repo == nil && record.Repo != nil { 443 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 444 if parseErr != nil { 445 return parseErr 446 } 447 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 448 if err != nil { 449 return err 450 } 451 } 452 if repo == nil { 453 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 454 } 455 456 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 457 if err != nil || !ok { 458 return err 459 } 460 461 repoDid := repo.RepoDid 462 if repoDid == "" && record.RepoDid != nil { 463 repoDid = *record.RepoDid 464 } 465 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 466 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 467 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 468 } 469 } 470 471 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 472 if err != nil { 473 createdAt = time.Now() 474 } 475 476 artifact := models.Artifact{ 477 Did: did, 478 Rkey: e.Commit.RKey, 479 RepoDid: syntax.DID(repo.RepoDid), 480 Tag: plumbing.Hash(record.Tag), 481 CreatedAt: createdAt, 482 BlobCid: cid.Cid(record.Artifact.Ref), 483 Name: record.Name, 484 Size: uint64(record.Artifact.Size), 485 MimeType: record.Artifact.MimeType, 486 } 487 488 err = db.AddArtifact(i.Db, artifact) 489 case jmodels.CommitOperationDelete: 490 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 491 } 492 493 if err != nil { 494 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 495 } 496 497 return nil 498} 499 500func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 501 did := e.Did 502 var err error 503 504 l := i.Logger.With("handler", "ingestProfile") 505 l = l.With("nsid", e.Commit.Collection) 506 507 if e.Commit.RKey != "self" { 508 return fmt.Errorf("ingestProfile only ingests `self` record") 509 } 510 511 switch e.Commit.Operation { 512 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 513 raw := json.RawMessage(e.Commit.Record) 514 record := tangled.ActorProfile{} 515 err = json.Unmarshal(raw, &record) 516 if err != nil { 517 l.Error("invalid record", "err", err) 518 return err 519 } 520 521 avatar := "" 522 if record.Avatar != nil { 523 avatar = record.Avatar.Ref.String() 524 } 525 526 description := "" 527 if record.Description != nil { 528 description = *record.Description 529 } 530 531 includeBluesky := record.Bluesky 532 533 pronouns := "" 534 if record.Pronouns != nil { 535 pronouns = *record.Pronouns 536 } 537 538 location := "" 539 if record.Location != nil { 540 location = *record.Location 541 } 542 543 var links [5]string 544 for i, l := range record.Links { 545 if i < 5 { 546 links[i] = l 547 } 548 } 549 550 var stats [2]models.VanityStat 551 for i, s := range record.Stats { 552 if i < 2 { 553 stats[i].Kind = models.ParseVanityStatKind(s) 554 } 555 } 556 557 var pinned [6]string 558 for i, r := range record.PinnedRepositories { 559 if i < 6 { 560 pinned[i] = r 561 } 562 } 563 564 var preferredHandle syntax.Handle 565 if record.PreferredHandle != nil { 566 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 567 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 568 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 569 preferredHandle = h 570 } 571 } 572 } 573 574 profile := models.Profile{ 575 Did: did, 576 Avatar: avatar, 577 Description: description, 578 IncludeBluesky: includeBluesky, 579 Location: location, 580 Links: links, 581 Stats: stats, 582 PinnedRepos: pinned, 583 Pronouns: pronouns, 584 PreferredHandle: preferredHandle, 585 } 586 587 tx, err := i.Db.Begin() 588 if err != nil { 589 return fmt.Errorf("failed to start transaction: %w", err) 590 } 591 592 err = db.ValidateProfile(tx, &profile) 593 if err != nil { 594 return fmt.Errorf("invalid profile record") 595 } 596 597 err = db.UpsertProfile(tx, &profile) 598 if err == nil && i.Cache != nil { 599 pipe := i.Cache.Pipeline() 600 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 601 if preferredHandle != "" { 602 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 603 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 604 } else { 605 pipe.Del(ctx, didKey) 606 } 607 if _, execErr := pipe.Exec(ctx); execErr != nil { 608 l.Warn("failed to update preferred handle cache", "err", execErr) 609 } 610 } 611 case jmodels.CommitOperationDelete: 612 tx, beginErr := i.Db.Begin() 613 if beginErr != nil { 614 return fmt.Errorf("failed to start transaction: %w", beginErr) 615 } 616 617 priorHandle, phErr := db.GetPreferredHandle(tx, did) 618 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 619 l.Warn("failed to read prior preferred handle", "err", phErr) 620 } 621 622 err = db.DeleteProfile(tx, did) 623 if err == nil && i.Cache != nil { 624 pipe := i.Cache.Pipeline() 625 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 626 if priorHandle != "" { 627 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 628 } 629 if _, execErr := pipe.Exec(ctx); execErr != nil { 630 l.Warn("failed to evict preferred handle cache", "err", execErr) 631 } 632 } 633 } 634 635 if err != nil { 636 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 637 } 638 639 return nil 640} 641 642func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 643 did := e.Did 644 var err error 645 646 l := i.Logger.With("handler", "ingestSpindleMember") 647 l = l.With("nsid", e.Commit.Collection) 648 649 switch e.Commit.Operation { 650 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 651 raw := json.RawMessage(e.Commit.Record) 652 record := tangled.SpindleMember{} 653 err = json.Unmarshal(raw, &record) 654 if err != nil { 655 l.Error("invalid record", "err", err) 656 return err 657 } 658 659 // only spindle owner can invite to spindles 660 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 661 if err != nil { 662 return fmt.Errorf("failed to check invite permission: %w", err) 663 } 664 if !ok { 665 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 666 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 667 } 668 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 669 if err != nil { 670 return fmt.Errorf("failed to re-check invite permission: %w", err) 671 } 672 if !ok { 673 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 674 } 675 } 676 677 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 678 if err != nil { 679 return err 680 } 681 682 if memberId.Handle.IsInvalidHandle() { 683 return fmt.Errorf("invalid handle for member %s", record.Subject) 684 } 685 686 existing, err := db.GetSpindleMembers(i.Db, 687 orm.FilterEq("did", did), 688 orm.FilterEq("rkey", e.Commit.RKey), 689 ) 690 if err != nil { 691 return fmt.Errorf("failed to look up existing member: %w", err) 692 } 693 if len(existing) > 1 { 694 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 695 } 696 697 tx, err := i.Db.Begin() 698 if err != nil { 699 return fmt.Errorf("failed to start txn: %w", err) 700 } 701 committed := false 702 defer func() { 703 if committed { 704 return 705 } 706 tx.Rollback() 707 i.Enforcer.E.LoadPolicy() 708 }() 709 710 if len(existing) == 1 { 711 prev := existing[0] 712 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 713 if err = db.RemoveSpindleMember(tx, 714 orm.FilterEq("did", did), 715 orm.FilterEq("rkey", e.Commit.RKey), 716 ); err != nil { 717 return fmt.Errorf("failed to remove stale row: %w", err) 718 } 719 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 720 return fmt.Errorf("failed to remove stale ACL: %w", err) 721 } 722 } 723 } 724 725 if err = db.AddSpindleMember(tx, models.SpindleMember{ 726 Did: syntax.DID(did), 727 Rkey: e.Commit.RKey, 728 Instance: record.Instance, 729 Subject: memberId.DID, 730 }); err != nil { 731 return fmt.Errorf("failed to add to db: %w", err) 732 } 733 734 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 735 return fmt.Errorf("failed to update ACLs: %w", err) 736 } 737 738 if err = tx.Commit(); err != nil { 739 return fmt.Errorf("failed to commit txn: %w", err) 740 } 741 742 if err = i.Enforcer.E.SavePolicy(); err != nil { 743 return fmt.Errorf("failed to save ACLs: %w", err) 744 } 745 committed = true 746 747 l.Info("upserted spindle member") 748 case jmodels.CommitOperationDelete: 749 rkey := e.Commit.RKey 750 751 // get record from db first 752 members, err := db.GetSpindleMembers( 753 i.Db, 754 orm.FilterEq("did", did), 755 orm.FilterEq("rkey", rkey), 756 ) 757 if err != nil || len(members) != 1 { 758 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 759 } 760 member := members[0] 761 762 tx, err := i.Db.Begin() 763 if err != nil { 764 return fmt.Errorf("failed to start txn: %w", err) 765 } 766 committed := false 767 defer func() { 768 if committed { 769 return 770 } 771 tx.Rollback() 772 i.Enforcer.E.LoadPolicy() 773 }() 774 775 // remove record by rkey && update enforcer 776 if err = db.RemoveSpindleMember( 777 tx, 778 orm.FilterEq("did", did), 779 orm.FilterEq("rkey", rkey), 780 ); err != nil { 781 return fmt.Errorf("failed to remove from db: %w", err) 782 } 783 784 // update enforcer 785 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 786 if err != nil { 787 return fmt.Errorf("failed to update ACLs: %w", err) 788 } 789 790 if err = tx.Commit(); err != nil { 791 return fmt.Errorf("failed to commit txn: %w", err) 792 } 793 794 if err = i.Enforcer.E.SavePolicy(); err != nil { 795 return fmt.Errorf("failed to save ACLs: %w", err) 796 } 797 committed = true 798 799 l.Info("removed spindle member") 800 } 801 802 return nil 803} 804 805func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 806 did := e.Did 807 var err error 808 809 l := i.Logger.With("handler", "ingestSpindle") 810 l = l.With("nsid", e.Commit.Collection) 811 812 switch e.Commit.Operation { 813 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 814 raw := json.RawMessage(e.Commit.Record) 815 record := tangled.Spindle{} 816 err = json.Unmarshal(raw, &record) 817 if err != nil { 818 l.Error("invalid record", "err", err) 819 return err 820 } 821 822 instance := e.Commit.RKey 823 824 err := db.AddSpindle(i.Db, models.Spindle{ 825 Owner: syntax.DID(did), 826 Instance: instance, 827 }) 828 if err != nil { 829 l.Error("failed to add spindle to db", "err", err, "instance", instance) 830 return err 831 } 832 833 if err := i.verifySpindle(ctx, instance, did); err != nil { 834 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 835 } 836 837 return nil 838 839 case jmodels.CommitOperationDelete: 840 instance := e.Commit.RKey 841 842 // get record from db first 843 spindles, err := db.GetSpindles( 844 ctx, 845 i.Db, 846 orm.FilterEq("owner", did), 847 orm.FilterEq("instance", instance), 848 ) 849 if err != nil || len(spindles) != 1 { 850 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 851 } 852 spindle := spindles[0] 853 854 tx, err := i.Db.Begin() 855 if err != nil { 856 return err 857 } 858 defer func() { 859 tx.Rollback() 860 i.Enforcer.E.LoadPolicy() 861 }() 862 863 // remove spindle members first 864 err = db.RemoveSpindleMember( 865 tx, 866 orm.FilterEq("owner", did), 867 orm.FilterEq("instance", instance), 868 ) 869 if err != nil { 870 return err 871 } 872 873 err = db.DeleteSpindle( 874 tx, 875 orm.FilterEq("owner", did), 876 orm.FilterEq("instance", instance), 877 ) 878 if err != nil { 879 return err 880 } 881 882 if spindle.Verified != nil { 883 err = i.Enforcer.RemoveSpindle(instance) 884 if err != nil { 885 return err 886 } 887 } 888 889 err = tx.Commit() 890 if err != nil { 891 return err 892 } 893 894 err = i.Enforcer.E.SavePolicy() 895 if err != nil { 896 return err 897 } 898 } 899 900 return nil 901} 902 903func (i *Ingester) ingestString(e *jmodels.Event) error { 904 did := e.Did 905 rkey := e.Commit.RKey 906 907 var err error 908 909 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 910 l.Info("ingesting record") 911 912 switch e.Commit.Operation { 913 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 914 raw := json.RawMessage(e.Commit.Record) 915 record := tangled.String{} 916 err = json.Unmarshal(raw, &record) 917 if err != nil { 918 l.Error("invalid record", "err", err) 919 return err 920 } 921 922 string := models.StringFromRecord(did, rkey, record) 923 924 if err = i.Validator.ValidateString(&string); err != nil { 925 l.Error("invalid record", "err", err) 926 return err 927 } 928 929 if err = db.AddString(i.Db, string); err != nil { 930 l.Error("failed to add string", "err", err) 931 return err 932 } 933 934 return nil 935 936 case jmodels.CommitOperationDelete: 937 if err := db.DeleteString( 938 i.Db, 939 orm.FilterEq("did", did), 940 orm.FilterEq("rkey", rkey), 941 ); err != nil { 942 l.Error("failed to delete", "err", err) 943 return fmt.Errorf("failed to delete string record: %w", err) 944 } 945 946 return nil 947 } 948 949 return nil 950} 951 952func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error { 953 did := e.Did 954 var err error 955 956 l := i.Logger.With("handler", "ingestKnotMember") 957 l = l.With("nsid", e.Commit.Collection) 958 959 switch e.Commit.Operation { 960 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 961 raw := json.RawMessage(e.Commit.Record) 962 record := tangled.KnotMember{} 963 err = json.Unmarshal(raw, &record) 964 if err != nil { 965 l.Error("invalid record", "err", err) 966 return err 967 } 968 969 // only knot owner can invite to knots 970 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 971 if err != nil { 972 return fmt.Errorf("failed to check invite permission: %w", err) 973 } 974 if !ok { 975 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 976 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 977 } 978 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 979 if err != nil { 980 return fmt.Errorf("failed to re-check invite permission: %w", err) 981 } 982 if !ok { 983 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 984 } 985 } 986 987 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 988 if err != nil { 989 return err 990 } 991 992 if memberId.Handle.IsInvalidHandle() { 993 return fmt.Errorf("invalid handle for member %s", record.Subject) 994 } 995 996 existing, err := db.GetKnotMembers(i.Db, 997 orm.FilterEq("did", did), 998 orm.FilterEq("rkey", e.Commit.RKey), 999 ) 1000 if err != nil { 1001 return fmt.Errorf("failed to look up existing member: %w", err) 1002 } 1003 if len(existing) > 1 { 1004 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1005 } 1006 1007 tx, err := i.Db.Begin() 1008 if err != nil { 1009 return fmt.Errorf("failed to start txn: %w", err) 1010 } 1011 committed := false 1012 defer func() { 1013 if committed { 1014 return 1015 } 1016 tx.Rollback() 1017 i.Enforcer.E.LoadPolicy() 1018 }() 1019 1020 if len(existing) == 1 { 1021 prev := existing[0] 1022 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1023 if err = db.RemoveKnotMember(tx, 1024 orm.FilterEq("did", did), 1025 orm.FilterEq("rkey", e.Commit.RKey), 1026 ); err != nil { 1027 return fmt.Errorf("failed to remove stale row: %w", err) 1028 } 1029 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1030 return fmt.Errorf("failed to remove stale ACL: %w", err) 1031 } 1032 } 1033 } 1034 1035 if err = db.AddKnotMember(tx, models.KnotMember{ 1036 Did: syntax.DID(did), 1037 Rkey: e.Commit.RKey, 1038 Domain: record.Domain, 1039 Subject: memberId.DID, 1040 }); err != nil { 1041 return fmt.Errorf("failed to add to db: %w", err) 1042 } 1043 1044 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1045 return fmt.Errorf("failed to update ACLs: %w", err) 1046 } 1047 1048 if err = tx.Commit(); err != nil { 1049 return fmt.Errorf("failed to commit txn: %w", err) 1050 } 1051 1052 if err = i.Enforcer.E.SavePolicy(); err != nil { 1053 return fmt.Errorf("failed to save ACLs: %w", err) 1054 } 1055 committed = true 1056 1057 l.Info("upserted knot member") 1058 case jmodels.CommitOperationDelete: 1059 rkey := e.Commit.RKey 1060 1061 members, err := db.GetKnotMembers( 1062 i.Db, 1063 orm.FilterEq("did", did), 1064 orm.FilterEq("rkey", rkey), 1065 ) 1066 if err != nil { 1067 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1068 } 1069 if len(members) == 0 { 1070 l.Info("knot member already removed", "rkey", rkey) 1071 return nil 1072 } 1073 if len(members) > 1 { 1074 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1075 } 1076 member := members[0] 1077 1078 tx, err := i.Db.Begin() 1079 if err != nil { 1080 return fmt.Errorf("failed to start txn: %w", err) 1081 } 1082 committed := false 1083 defer func() { 1084 if committed { 1085 return 1086 } 1087 tx.Rollback() 1088 i.Enforcer.E.LoadPolicy() 1089 }() 1090 1091 if err = db.RemoveKnotMember( 1092 tx, 1093 orm.FilterEq("did", did), 1094 orm.FilterEq("rkey", rkey), 1095 ); err != nil { 1096 return fmt.Errorf("failed to remove from db: %w", err) 1097 } 1098 1099 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1100 return fmt.Errorf("failed to update ACLs: %w", err) 1101 } 1102 1103 if err = tx.Commit(); err != nil { 1104 return fmt.Errorf("failed to commit txn: %w", err) 1105 } 1106 1107 if err = i.Enforcer.E.SavePolicy(); err != nil { 1108 return fmt.Errorf("failed to save ACLs: %w", err) 1109 } 1110 committed = true 1111 1112 l.Info("removed knot member") 1113 } 1114 1115 return nil 1116} 1117 1118func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error { 1119 did := e.Did 1120 var err error 1121 1122 l := i.Logger.With("handler", "ingestKnot") 1123 l = l.With("nsid", e.Commit.Collection) 1124 1125 switch e.Commit.Operation { 1126 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1127 raw := json.RawMessage(e.Commit.Record) 1128 record := tangled.Knot{} 1129 err = json.Unmarshal(raw, &record) 1130 if err != nil { 1131 l.Error("invalid record", "err", err) 1132 return err 1133 } 1134 1135 domain := e.Commit.RKey 1136 1137 err := db.AddKnot(i.Db, domain, did) 1138 if err != nil { 1139 l.Error("failed to add knot to db", "err", err, "domain", domain) 1140 return err 1141 } 1142 1143 if err := i.verifyKnot(ctx, domain, did); err != nil { 1144 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1145 } 1146 1147 return nil 1148 1149 case jmodels.CommitOperationDelete: 1150 domain := e.Commit.RKey 1151 1152 // get record from db first 1153 registrations, err := db.GetRegistrations( 1154 i.Db, 1155 orm.FilterEq("domain", domain), 1156 orm.FilterEq("did", did), 1157 ) 1158 if err != nil { 1159 return fmt.Errorf("failed to get registration: %w", err) 1160 } 1161 if len(registrations) != 1 { 1162 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1163 } 1164 registration := registrations[0] 1165 1166 tx, err := i.Db.Begin() 1167 if err != nil { 1168 return err 1169 } 1170 defer func() { 1171 tx.Rollback() 1172 i.Enforcer.E.LoadPolicy() 1173 }() 1174 1175 err = db.RemoveKnotMember( 1176 tx, 1177 orm.FilterEq("did", did), 1178 orm.FilterEq("domain", domain), 1179 ) 1180 if err != nil { 1181 return err 1182 } 1183 1184 err = db.DeleteKnot( 1185 tx, 1186 orm.FilterEq("did", did), 1187 orm.FilterEq("domain", domain), 1188 ) 1189 if err != nil { 1190 return err 1191 } 1192 1193 err = db.RemoveReposByKnot(tx, domain) 1194 if err != nil { 1195 return err 1196 } 1197 1198 if registration.Registered != nil { 1199 err = i.Enforcer.RemoveKnot(domain) 1200 if err != nil { 1201 return err 1202 } 1203 } 1204 1205 err = tx.Commit() 1206 if err != nil { 1207 return err 1208 } 1209 1210 err = i.Enforcer.E.SavePolicy() 1211 if err != nil { 1212 return err 1213 } 1214 } 1215 1216 return nil 1217} 1218 1219const ( 1220 verifyAttempts = 4 1221 verifyMinDelay = 1 * time.Second 1222 verifyMaxDelay = 5 * time.Second 1223) 1224 1225func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1226 regs, err := db.GetRegistrations(i.Db, 1227 orm.FilterEq("domain", domain), 1228 orm.FilterEq("did", did), 1229 ) 1230 if err != nil { 1231 return fmt.Errorf("look up registration: %w", err) 1232 } 1233 if len(regs) != 1 { 1234 return fmt.Errorf("no registration for %s by %s", domain, did) 1235 } 1236 if regs[0].Registered != nil { 1237 return nil 1238 } 1239 1240 err = retry.Do( 1241 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1242 retry.Context(ctx), 1243 retry.Attempts(verifyAttempts), 1244 retry.Delay(verifyMinDelay), 1245 retry.MaxDelay(verifyMaxDelay), 1246 retry.DelayType(retry.BackOffDelay), 1247 retry.LastErrorOnly(true), 1248 ) 1249 if err != nil { 1250 return fmt.Errorf("verify: %w", err) 1251 } 1252 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1253} 1254 1255func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1256 spindles, err := db.GetSpindles(ctx, i.Db, 1257 orm.FilterEq("instance", instance), 1258 orm.FilterEq("owner", did), 1259 ) 1260 if err != nil { 1261 return fmt.Errorf("look up spindle: %w", err) 1262 } 1263 if len(spindles) != 1 { 1264 return fmt.Errorf("no spindle for %s by %s", instance, did) 1265 } 1266 if spindles[0].Verified != nil { 1267 return nil 1268 } 1269 1270 err = retry.Do( 1271 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1272 retry.Context(ctx), 1273 retry.Attempts(verifyAttempts), 1274 retry.Delay(verifyMinDelay), 1275 retry.MaxDelay(verifyMaxDelay), 1276 retry.DelayType(retry.BackOffDelay), 1277 retry.LastErrorOnly(true), 1278 ) 1279 if err != nil { 1280 return fmt.Errorf("verify: %w", err) 1281 } 1282 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1283 return err 1284} 1285 1286const sweepConcurrency = 4 1287 1288func (i *Ingester) SweepPendingVerifications() { 1289 l := i.Logger.With("handler", "SweepPendingVerifications") 1290 1291 var g errgroup.Group 1292 g.SetLimit(sweepConcurrency) 1293 1294 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1295 if err != nil { 1296 l.Error("failed to list unverified knots", "err", err) 1297 } else { 1298 for _, reg := range regs { 1299 g.Go(func() error { 1300 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1301 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1302 } 1303 return nil 1304 }) 1305 } 1306 } 1307 1308 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1309 if err != nil { 1310 l.Error("failed to list unverified spindles", "err", err) 1311 g.Wait() 1312 return 1313 } 1314 for _, s := range spindles { 1315 g.Go(func() error { 1316 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1317 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1318 } 1319 return nil 1320 }) 1321 } 1322 g.Wait() 1323} 1324 1325func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1326 did := e.Did 1327 rkey := e.Commit.RKey 1328 1329 var err error 1330 1331 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1332 l.Info("ingesting record") 1333 1334 switch e.Commit.Operation { 1335 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1336 raw := json.RawMessage(e.Commit.Record) 1337 record := tangled.RepoIssue{} 1338 err = json.Unmarshal(raw, &record) 1339 if err != nil { 1340 l.Error("invalid record", "err", err) 1341 return err 1342 } 1343 1344 issue := models.IssueFromRecord(did, rkey, record) 1345 1346 if issue.RepoDid == "" { 1347 return fmt.Errorf("issue record has no repo field") 1348 } 1349 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1350 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1351 } 1352 1353 if err := i.Validator.ValidateIssue(&issue); err != nil { 1354 return fmt.Errorf("failed to validate issue: %w", err) 1355 } 1356 1357 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1358 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1359 if repoErr == nil && repo.RepoDid != "" { 1360 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1361 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1362 } 1363 } 1364 } 1365 1366 tx, err := i.Db.BeginTx(ctx, nil) 1367 if err != nil { 1368 l.Error("failed to begin transaction", "err", err) 1369 return err 1370 } 1371 defer tx.Rollback() 1372 1373 err = db.PutIssue(tx, &issue) 1374 if err != nil { 1375 l.Error("failed to create issue", "err", err) 1376 return err 1377 } 1378 1379 err = tx.Commit() 1380 if err != nil { 1381 l.Error("failed to commit txn", "err", err) 1382 return err 1383 } 1384 1385 return nil 1386 1387 case jmodels.CommitOperationDelete: 1388 tx, err := i.Db.BeginTx(ctx, nil) 1389 if err != nil { 1390 l.Error("failed to begin transaction", "err", err) 1391 return err 1392 } 1393 defer tx.Rollback() 1394 1395 if err := db.DeleteIssues( 1396 tx, 1397 did, 1398 rkey, 1399 ); err != nil { 1400 l.Error("failed to delete", "err", err) 1401 return fmt.Errorf("failed to delete issue record: %w", err) 1402 } 1403 if err := tx.Commit(); err != nil { 1404 l.Error("failed to commit txn", "err", err) 1405 return err 1406 } 1407 1408 return nil 1409 } 1410 1411 return nil 1412} 1413 1414func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1415 did := e.Did 1416 rkey := e.Commit.RKey 1417 1418 var err error 1419 1420 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1421 l.Info("ingesting record") 1422 1423 switch e.Commit.Operation { 1424 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1425 raw := json.RawMessage(e.Commit.Record) 1426 record := tangled.RepoPull{} 1427 err = json.Unmarshal(raw, &record) 1428 if err != nil { 1429 l.Error("invalid record", "err", err) 1430 return err 1431 } 1432 1433 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1434 if err != nil { 1435 l.Error("failed to resolve did") 1436 return err 1437 } 1438 1439 // go through and fetch all blobs in parallel 1440 readers := make([]*io.ReadCloser, len(record.Rounds)) 1441 var mu sync.Mutex 1442 1443 g, gctx := errgroup.WithContext(ctx) 1444 1445 for idx, b := range record.Rounds { 1446 g.Go(func() error { 1447 // for some reason, a blob is empty 1448 if b.PatchBlob == nil { 1449 return fmt.Errorf("missing patchBlob in round %d", idx) 1450 } 1451 1452 ownerPds := ownerId.PDSEndpoint() 1453 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1454 q := url.Query() 1455 q.Set("cid", b.PatchBlob.Ref.String()) 1456 q.Set("did", did) 1457 url.RawQuery = q.Encode() 1458 1459 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1460 if err != nil { 1461 l.Error("failed to create request") 1462 return err 1463 } 1464 req.Header.Set("Content-Type", "application/json") 1465 1466 resp, err := http.DefaultClient.Do(req) 1467 if err != nil { 1468 l.Error("failed to make request") 1469 return err 1470 } 1471 1472 mu.Lock() 1473 readers[idx] = &resp.Body 1474 mu.Unlock() 1475 1476 return nil 1477 }) 1478 } 1479 1480 if err := g.Wait(); err != nil { 1481 for _, r := range readers { 1482 if r != nil && *r != nil { 1483 (*r).Close() 1484 } 1485 } 1486 return err 1487 } 1488 1489 defer func() { 1490 for _, r := range readers { 1491 if r != nil && *r != nil { 1492 (*r).Close() 1493 } 1494 } 1495 }() 1496 1497 pull, err := models.PullFromRecord(did, rkey, record, readers) 1498 if err != nil { 1499 return fmt.Errorf("failed to parse pull from record: %w", err) 1500 } 1501 if err := i.Validator.ValidatePull(pull); err != nil { 1502 return fmt.Errorf("failed to validate pull: %w", err) 1503 } 1504 1505 tx, err := i.Db.BeginTx(ctx, nil) 1506 if err != nil { 1507 l.Error("failed to begin transaction", "err", err) 1508 return err 1509 } 1510 defer tx.Rollback() 1511 1512 err = db.PutPull(tx, pull) 1513 if err != nil { 1514 l.Error("failed to create pull", "err", err) 1515 return err 1516 } 1517 1518 err = tx.Commit() 1519 if err != nil { 1520 l.Error("failed to commit txn", "err", err) 1521 return err 1522 } 1523 1524 return nil 1525 1526 case jmodels.CommitOperationDelete: 1527 tx, err := i.Db.BeginTx(ctx, nil) 1528 if err != nil { 1529 l.Error("failed to begin transaction", "err", err) 1530 return err 1531 } 1532 defer tx.Rollback() 1533 1534 if err := db.AbandonPulls( 1535 tx, 1536 orm.FilterEq("owner_did", did), 1537 orm.FilterEq("rkey", rkey), 1538 ); err != nil { 1539 l.Error("failed to abandon", "err", err) 1540 return fmt.Errorf("failed to abandon pull record: %w", err) 1541 } 1542 if err := tx.Commit(); err != nil { 1543 l.Error("failed to commit txn", "err", err) 1544 return err 1545 } 1546 1547 return nil 1548 } 1549 1550 return nil 1551} 1552 1553// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1554func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1555 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1556 l.Info("ingesting record") 1557 1558 switch e.Commit.Operation { 1559 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1560 // no-op. sh.tangled.repo.issue.comment is deprecated 1561 1562 case jmodels.CommitOperationDelete: 1563 if err := db.PurgeComments( 1564 i.Db, 1565 orm.FilterEq("did", e.Did), 1566 orm.FilterEq("collection", e.Commit.Collection), 1567 orm.FilterEq("rkey", e.Commit.RKey), 1568 ); err != nil { 1569 return fmt.Errorf("failed to delete comment record: %w", err) 1570 } 1571 } 1572 1573 return nil 1574} 1575 1576// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1577func (i *Ingester) ingestPullComment(e *jmodels.Event) error { 1578 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1579 l.Info("ingesting record") 1580 1581 switch e.Commit.Operation { 1582 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1583 // no-op. sh.tangled.repo.pull.comment is deprecated 1584 1585 case jmodels.CommitOperationDelete: 1586 if err := db.PurgeComments( 1587 i.Db, 1588 orm.FilterEq("did", e.Did), 1589 orm.FilterEq("collection", e.Commit.Collection), 1590 orm.FilterEq("rkey", e.Commit.RKey), 1591 ); err != nil { 1592 return fmt.Errorf("failed to delete comment record: %w", err) 1593 } 1594 } 1595 1596 return nil 1597} 1598 1599func (i *Ingester) ingestComment(e *jmodels.Event) error { 1600 did := e.Did 1601 rkey := e.Commit.RKey 1602 cid := e.Commit.CID 1603 1604 var err error 1605 1606 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1607 l.Info("ingesting record") 1608 1609 ctx := context.Background() 1610 1611 switch e.Commit.Operation { 1612 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1613 raw := json.RawMessage(e.Commit.Record) 1614 record := tangled.FeedComment{} 1615 err = json.Unmarshal(raw, &record) 1616 if err != nil { 1617 return fmt.Errorf("invalid record: %w", err) 1618 } 1619 1620 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1621 if err != nil { 1622 return fmt.Errorf("failed to parse comment from record: %w", err) 1623 } 1624 1625 if err := comment.Validate(); err != nil { 1626 return fmt.Errorf("failed to validate comment: %w", err) 1627 } 1628 1629 var references []syntax.ATURI 1630 if comment.Body.Original != nil { 1631 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1632 } 1633 1634 tx, err := i.Db.Begin() 1635 if err != nil { 1636 return fmt.Errorf("failed to start transaction: %w", err) 1637 } 1638 defer tx.Rollback() 1639 1640 _, err = db.PutComment(tx, comment, references) 1641 if err != nil { 1642 return fmt.Errorf("failed to create comment: %w", err) 1643 } 1644 1645 if err := tx.Commit(); err != nil { 1646 return err 1647 } 1648 1649 case jmodels.CommitOperationDelete: 1650 if err := db.DeleteComments( 1651 i.Db, 1652 orm.FilterEq("did", did), 1653 orm.FilterEq("collection", e.Commit.Collection), 1654 orm.FilterEq("rkey", rkey), 1655 ); err != nil { 1656 return fmt.Errorf("failed to delete comment record: %w", err) 1657 } 1658 1659 return nil 1660 } 1661 1662 return nil 1663} 1664 1665func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1666 did := e.Did 1667 rkey := e.Commit.RKey 1668 1669 var err error 1670 1671 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1672 l.Info("ingesting record") 1673 1674 switch e.Commit.Operation { 1675 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1676 raw := json.RawMessage(e.Commit.Record) 1677 record := tangled.LabelDefinition{} 1678 err = json.Unmarshal(raw, &record) 1679 if err != nil { 1680 return fmt.Errorf("invalid record: %w", err) 1681 } 1682 1683 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1684 if err != nil { 1685 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1686 } 1687 1688 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1689 return fmt.Errorf("failed to validate labeldef: %w", err) 1690 } 1691 1692 _, err = db.AddLabelDefinition(i.Db, def) 1693 if err != nil { 1694 return fmt.Errorf("failed to create labeldef: %w", err) 1695 } 1696 1697 return nil 1698 1699 case jmodels.CommitOperationDelete: 1700 if err := db.DeleteLabelDefinition( 1701 i.Db, 1702 orm.FilterEq("did", did), 1703 orm.FilterEq("rkey", rkey), 1704 ); err != nil { 1705 return fmt.Errorf("failed to delete labeldef record: %w", err) 1706 } 1707 1708 return nil 1709 } 1710 1711 return nil 1712} 1713 1714func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1715 did := e.Did 1716 rkey := e.Commit.RKey 1717 1718 var err error 1719 1720 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1721 l.Info("ingesting record") 1722 1723 switch e.Commit.Operation { 1724 case jmodels.CommitOperationCreate: 1725 raw := json.RawMessage(e.Commit.Record) 1726 record := tangled.LabelOp{} 1727 err = json.Unmarshal(raw, &record) 1728 if err != nil { 1729 return fmt.Errorf("invalid record: %w", err) 1730 } 1731 1732 subject := syntax.ATURI(record.Subject) 1733 collection := subject.Collection() 1734 1735 var repo *models.Repo 1736 switch collection { 1737 case tangled.RepoIssueNSID: 1738 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1739 if err != nil || len(i) != 1 { 1740 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1741 } 1742 repo = i[0].Repo 1743 case tangled.RepoPullNSID: 1744 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1745 if err != nil || len(p) != 1 { 1746 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1747 } 1748 repo = p[0].Repo 1749 default: 1750 return fmt.Errorf("unsupported label subject: %s", collection) 1751 } 1752 1753 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1754 if err != nil { 1755 return fmt.Errorf("failed to build label application ctx: %w", err) 1756 } 1757 1758 ops := models.LabelOpsFromRecord(did, rkey, record) 1759 1760 for _, o := range ops { 1761 def, ok := actx.Defs[o.OperandKey] 1762 if !ok { 1763 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1764 } 1765 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1766 return fmt.Errorf("failed to validate labelop: %w", err) 1767 } 1768 } 1769 1770 tx, err := i.Db.Begin() 1771 if err != nil { 1772 return err 1773 } 1774 defer tx.Rollback() 1775 1776 for _, o := range ops { 1777 _, err = db.AddLabelOp(tx, &o) 1778 if err != nil { 1779 return fmt.Errorf("failed to add labelop: %w", err) 1780 } 1781 } 1782 1783 if err = tx.Commit(); err != nil { 1784 return err 1785 } 1786 } 1787 1788 return nil 1789}