Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/mentions" 31 "tangled.org/core/appview/models" 32 "tangled.org/core/appview/notify" 33 "tangled.org/core/appview/repoverify" 34 "tangled.org/core/appview/serververify" 35 "tangled.org/core/appview/validator" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/orm" 38 "tangled.org/core/rbac" 39) 40 41type Ingester struct { 42 Ctx context.Context 43 Db *db.DB 44 Enforcer *rbac.Enforcer 45 IdResolver *idresolver.Resolver 46 Cache *cache.Cache 47 Config *config.Config 48 Logger *slog.Logger 49 Validator *validator.Validator 50 MentionsResolver *mentions.Resolver 51 Notifier notify.Notifier 52 Verifier repoverify.Verifier 53} 54 55type processFunc func(ctx context.Context, e *jmodels.Event) error 56 57func (i *Ingester) Ingest() processFunc { 58 return func(ctx context.Context, e *jmodels.Event) error { 59 var err error 60 61 l := i.Logger.With("kind", e.Kind) 62 switch e.Kind { 63 case jmodels.EventKindAccount: 64 // TODO: sync account state to db 65 if e.Account.Active { 66 break 67 } 68 // TODO: revoke sessions by DID 69 if *e.Account.Status == "deactivated" { 70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 71 } 72 case jmodels.EventKindIdentity: 73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 74 case jmodels.EventKindCommit: 75 switch e.Commit.Collection { 76 case tangled.GraphFollowNSID: 77 err = i.ingestFollow(e) 78 case tangled.GraphVouchNSID: 79 err = i.ingestVouch(ctx, e) 80 case tangled.FeedStarNSID: 81 err = i.ingestStar(ctx, e) 82 case tangled.PublicKeyNSID: 83 err = i.ingestPublicKey(e) 84 case tangled.RepoArtifactNSID: 85 err = i.ingestArtifact(ctx, e) 86 case tangled.ActorProfileNSID: 87 err = i.ingestProfile(ctx, e) 88 case tangled.SpindleMemberNSID: 89 err = i.ingestSpindleMember(ctx, e) 90 case tangled.SpindleNSID: 91 err = i.ingestSpindle(ctx, e) 92 case tangled.KnotMemberNSID: 93 err = i.ingestKnotMember(ctx, e) 94 case tangled.KnotNSID: 95 err = i.ingestKnot(ctx, e) 96 case tangled.StringNSID: 97 err = i.ingestString(e) 98 case tangled.RepoIssueNSID: 99 err = i.ingestIssue(ctx, e) 100 case tangled.RepoPullNSID: 101 err = i.ingestPull(ctx, e) 102 case tangled.FeedCommentNSID: 103 err = i.ingestComment(e) 104 case tangled.RepoIssueCommentNSID: 105 err = i.ingestIssueComment(e) 106 case tangled.RepoPullCommentNSID: 107 err = i.ingestPullComment(e) 108 case tangled.LabelDefinitionNSID: 109 err = i.ingestLabelDefinition(e) 110 case tangled.LabelOpNSID: 111 err = i.ingestLabelOp(e) 112 case tangled.RepoNSID: 113 err = i.ingestRepo(ctx, e) 114 } 115 l = i.Logger.With("nsid", e.Commit.Collection) 116 } 117 118 if err != nil { 119 l.Warn("failed to ingest record, skipping", "err", err) 120 } 121 122 lastTimeUs := e.TimeUS + 1 123 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 124 l.Error("failed to save cursor", "err", saveErr) 125 } 126 127 return nil 128 } 129} 130 131func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 132 if strings.HasPrefix(ref, "did:") { 133 return db.GetRepoByDid(i.Db, ref) 134 } 135 return db.GetRepoByAtUri(i.Db, ref) 136} 137 138func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 139 var legacy struct { 140 Subject *string `json:"subject"` 141 SubjectDid *string `json:"subjectDid"` 142 } 143 if err := json.Unmarshal(raw, &legacy); err != nil { 144 return false, err 145 } 146 147 switch { 148 case legacy.SubjectDid != nil: 149 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 150 if err != nil { 151 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 152 return false, nil 153 } 154 star.SubjectType = models.StarSubjectRepo 155 star.Subject = repo.RepoDid 156 return true, nil 157 158 case legacy.Subject != nil: 159 uri, err := syntax.ParseATURI(*legacy.Subject) 160 if err != nil { 161 return false, fmt.Errorf("invalid old-format star subject: %w", err) 162 } 163 switch uri.Collection().String() { 164 case tangled.RepoNSID: 165 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 166 if err != nil { 167 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 168 return false, nil 169 } 170 star.SubjectType = models.StarSubjectRepo 171 star.Subject = repo.RepoDid 172 return true, nil 173 default: 174 star.SubjectType = models.StarSubjectString 175 star.Subject = *legacy.Subject 176 return true, nil 177 } 178 179 default: 180 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 181 } 182} 183 184func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 185 var err error 186 did := e.Did 187 188 l := i.Logger.With("handler", "ingestStar") 189 l = l.With("nsid", e.Commit.Collection) 190 191 switch e.Commit.Operation { 192 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 193 raw := json.RawMessage(e.Commit.Record) 194 record := tangled.FeedStar{} 195 unmarshalErr := json.Unmarshal(raw, &record) 196 197 star := &models.Star{ 198 Did: did, 199 Rkey: e.Commit.RKey, 200 } 201 202 switch { 203 case unmarshalErr != nil: 204 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 205 if resolveErr != nil { 206 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 207 return unmarshalErr 208 } 209 if !resolved { 210 return nil 211 } 212 213 case record.Subject == nil: 214 return fmt.Errorf("star record has nil subject") 215 216 case record.Subject.FeedStar_Repo != nil: 217 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 218 if repoErr != nil { 219 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 220 return nil 221 } 222 star.SubjectType = models.StarSubjectRepo 223 star.Subject = repo.RepoDid 224 225 case record.Subject.FeedStar_String != nil: 226 star.SubjectType = models.StarSubjectString 227 star.Subject = record.Subject.FeedStar_String.Uri 228 229 default: 230 return fmt.Errorf("star record has empty subject union") 231 } 232 233 err = db.AddStar(i.Db, star) 234 case jmodels.CommitOperationDelete: 235 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 236 } 237 238 if err != nil { 239 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 240 } 241 242 return nil 243} 244 245func (i *Ingester) ingestFollow(e *jmodels.Event) error { 246 var err error 247 did := e.Did 248 249 l := i.Logger.With("handler", "ingestFollow") 250 l = l.With("nsid", e.Commit.Collection) 251 252 switch e.Commit.Operation { 253 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 254 raw := json.RawMessage(e.Commit.Record) 255 record := tangled.GraphFollow{} 256 err = json.Unmarshal(raw, &record) 257 if err != nil { 258 l.Error("invalid record", "err", err) 259 return err 260 } 261 262 err = db.AddFollow(i.Db, &models.Follow{ 263 UserDid: did, 264 SubjectDid: record.Subject, 265 Rkey: e.Commit.RKey, 266 }) 267 case jmodels.CommitOperationDelete: 268 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 269 } 270 271 if err != nil { 272 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 273 } 274 275 return nil 276} 277 278func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 279 var err error 280 did := e.Did 281 282 l := i.Logger.With("handler", "ingestVouch") 283 l = l.With("nsid", e.Commit.Collection) 284 l.Info("ingesting vouch") 285 286 switch e.Commit.Operation { 287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 288 raw := json.RawMessage(e.Commit.Record) 289 record := tangled.GraphVouch{} 290 err = json.Unmarshal(raw, &record) 291 if err != nil { 292 l.Error("invalid record", "err", err) 293 return err 294 } 295 296 // rkey is the subject_did being vouched for/denounced 297 subjectDID := e.Commit.RKey 298 299 _, err = syntax.ParseDID(subjectDID) 300 if err != nil { 301 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 302 return fmt.Errorf("invalid subject_did: %w", err) 303 } 304 305 if did == subjectDID { 306 l.Warn("attempted self-vouch", "did", did) 307 return fmt.Errorf("cannot vouch for self") 308 } 309 310 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 311 if err != nil { 312 return err 313 } 314 315 if subjectId.Handle.IsInvalidHandle() { 316 return err 317 } 318 319 kind, err := models.ParseVouchKind(record.Kind) 320 if err != nil { 321 l.Error("invalid kind", "kind", kind) 322 return fmt.Errorf("invalid kind: %s", kind) 323 } 324 325 recordCid, err := cid.Parse(e.Commit.CID) 326 if err != nil { 327 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 328 return fmt.Errorf("invalid cid: %w", err) 329 } 330 331 var evidences []syntax.ATURI 332 for _, raw := range record.Evidences { 333 uri, parseErr := syntax.ParseATURI(raw) 334 if parseErr != nil { 335 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 336 continue 337 } 338 evidences = append(evidences, uri) 339 } 340 341 tx, txErr := i.Db.Begin() 342 if txErr != nil { 343 return fmt.Errorf("failed to start transaction: %w", txErr) 344 } 345 346 addErr := db.AddVouch(tx, &models.Vouch{ 347 Did: syntax.DID(did), 348 SubjectDid: subjectId.DID, 349 Cid: recordCid, 350 Kind: kind, 351 Reason: record.Reason, 352 Evidences: evidences, 353 }) 354 if addErr != nil { 355 tx.Rollback() 356 err = addErr 357 } else { 358 err = tx.Commit() 359 } 360 361 case jmodels.CommitOperationDelete: 362 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 363 } 364 365 if err != nil { 366 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 367 } 368 369 return nil 370} 371 372func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 373 did := e.Did 374 var err error 375 376 l := i.Logger.With("handler", "ingestPublicKey") 377 l = l.With("nsid", e.Commit.Collection) 378 379 switch e.Commit.Operation { 380 case jmodels.CommitOperationCreate: 381 l.Debug("processing add of pubkey") 382 raw := json.RawMessage(e.Commit.Record) 383 record := tangled.PublicKey{} 384 err = json.Unmarshal(raw, &record) 385 if err != nil { 386 l.Error("invalid record", "err", err) 387 return err 388 } 389 390 name := record.Name 391 key := record.Key 392 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 393 case jmodels.CommitOperationUpdate: 394 l.Debug("processing update of pubkey") 395 raw := json.RawMessage(e.Commit.Record) 396 record := tangled.PublicKey{} 397 err = json.Unmarshal(raw, &record) 398 if err != nil { 399 l.Error("invalid record", "err", err) 400 return err 401 } 402 403 name := record.Name 404 key := record.Key 405 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 406 case jmodels.CommitOperationDelete: 407 l.Debug("processing delete of pubkey") 408 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 409 } 410 411 if err != nil { 412 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 413 } 414 415 return nil 416} 417 418func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 419 did := e.Did 420 var err error 421 422 l := i.Logger.With("handler", "ingestArtifact") 423 l = l.With("nsid", e.Commit.Collection) 424 425 switch e.Commit.Operation { 426 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 427 raw := json.RawMessage(e.Commit.Record) 428 record := tangled.RepoArtifact{} 429 err = json.Unmarshal(raw, &record) 430 if err != nil { 431 l.Error("invalid record", "err", err) 432 return err 433 } 434 435 var repo *models.Repo 436 if record.RepoDid != nil && *record.RepoDid != "" { 437 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 438 if err != nil && !errors.Is(err, sql.ErrNoRows) { 439 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 440 } 441 } 442 if repo == nil && record.Repo != nil { 443 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 444 if parseErr != nil { 445 return parseErr 446 } 447 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 448 if err != nil { 449 return err 450 } 451 } 452 if repo == nil { 453 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 454 } 455 456 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 457 if err != nil || !ok { 458 return err 459 } 460 461 repoDid := repo.RepoDid 462 if repoDid == "" && record.RepoDid != nil { 463 repoDid = *record.RepoDid 464 } 465 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 466 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 467 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 468 } 469 } 470 471 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 472 if err != nil { 473 createdAt = time.Now() 474 } 475 476 artifact := models.Artifact{ 477 Did: did, 478 Rkey: e.Commit.RKey, 479 RepoDid: syntax.DID(repo.RepoDid), 480 Tag: plumbing.Hash(record.Tag), 481 CreatedAt: createdAt, 482 BlobCid: cid.Cid(record.Artifact.Ref), 483 Name: record.Name, 484 Size: uint64(record.Artifact.Size), 485 MimeType: record.Artifact.MimeType, 486 } 487 488 err = db.AddArtifact(i.Db, artifact) 489 case jmodels.CommitOperationDelete: 490 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 491 } 492 493 if err != nil { 494 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 495 } 496 497 return nil 498} 499 500func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 501 did := e.Did 502 var err error 503 504 l := i.Logger.With("handler", "ingestProfile") 505 l = l.With("nsid", e.Commit.Collection) 506 507 if e.Commit.RKey != "self" { 508 return fmt.Errorf("ingestProfile only ingests `self` record") 509 } 510 511 switch e.Commit.Operation { 512 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 513 raw := json.RawMessage(e.Commit.Record) 514 record := tangled.ActorProfile{} 515 err = json.Unmarshal(raw, &record) 516 if err != nil { 517 l.Error("invalid record", "err", err) 518 return err 519 } 520 521 avatar := "" 522 if record.Avatar != nil { 523 avatar = record.Avatar.Ref.String() 524 } 525 526 description := "" 527 if record.Description != nil { 528 description = *record.Description 529 } 530 531 includeBluesky := record.Bluesky 532 533 pronouns := "" 534 if record.Pronouns != nil { 535 pronouns = *record.Pronouns 536 } 537 538 location := "" 539 if record.Location != nil { 540 location = *record.Location 541 } 542 543 var links [5]string 544 for i, l := range record.Links { 545 if i < 5 { 546 links[i] = l 547 } 548 } 549 550 var stats [2]models.VanityStat 551 for i, s := range record.Stats { 552 if i < 2 { 553 stats[i].Kind = models.ParseVanityStatKind(s) 554 } 555 } 556 557 var pinned [6]string 558 for i, r := range record.PinnedRepositories { 559 if i < 6 { 560 pinned[i] = r 561 } 562 } 563 564 var preferredHandle syntax.Handle 565 if record.PreferredHandle != nil { 566 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 567 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 568 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 569 preferredHandle = h 570 } 571 } 572 } 573 574 profile := models.Profile{ 575 Did: did, 576 Avatar: avatar, 577 Description: description, 578 IncludeBluesky: includeBluesky, 579 Location: location, 580 Links: links, 581 Stats: stats, 582 PinnedRepos: pinned, 583 Pronouns: pronouns, 584 PreferredHandle: preferredHandle, 585 } 586 587 tx, err := i.Db.Begin() 588 if err != nil { 589 return fmt.Errorf("failed to start transaction: %w", err) 590 } 591 592 err = db.ValidateProfile(tx, &profile) 593 if err != nil { 594 return fmt.Errorf("invalid profile record") 595 } 596 597 err = db.UpsertProfile(tx, &profile) 598 if err == nil && i.Cache != nil { 599 pipe := i.Cache.Pipeline() 600 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 601 if preferredHandle != "" { 602 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 603 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 604 } else { 605 pipe.Del(ctx, didKey) 606 } 607 if _, execErr := pipe.Exec(ctx); execErr != nil { 608 l.Warn("failed to update preferred handle cache", "err", execErr) 609 } 610 } 611 case jmodels.CommitOperationDelete: 612 tx, beginErr := i.Db.Begin() 613 if beginErr != nil { 614 return fmt.Errorf("failed to start transaction: %w", beginErr) 615 } 616 617 priorHandle, phErr := db.GetPreferredHandle(tx, did) 618 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 619 l.Warn("failed to read prior preferred handle", "err", phErr) 620 } 621 622 err = db.DeleteProfile(tx, did) 623 if err == nil && i.Cache != nil { 624 pipe := i.Cache.Pipeline() 625 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 626 if priorHandle != "" { 627 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 628 } 629 if _, execErr := pipe.Exec(ctx); execErr != nil { 630 l.Warn("failed to evict preferred handle cache", "err", execErr) 631 } 632 } 633 } 634 635 if err != nil { 636 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 637 } 638 639 return nil 640} 641 642func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 643 did := e.Did 644 var err error 645 646 l := i.Logger.With("handler", "ingestSpindleMember") 647 l = l.With("nsid", e.Commit.Collection) 648 649 switch e.Commit.Operation { 650 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 651 raw := json.RawMessage(e.Commit.Record) 652 record := tangled.SpindleMember{} 653 err = json.Unmarshal(raw, &record) 654 if err != nil { 655 l.Error("invalid record", "err", err) 656 return err 657 } 658 659 // only spindle owner can invite to spindles 660 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 661 if err != nil { 662 return fmt.Errorf("failed to check invite permission: %w", err) 663 } 664 if !ok { 665 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 666 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 667 } 668 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 669 if err != nil { 670 return fmt.Errorf("failed to re-check invite permission: %w", err) 671 } 672 if !ok { 673 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 674 } 675 } 676 677 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 678 if err != nil { 679 return err 680 } 681 682 if memberId.Handle.IsInvalidHandle() { 683 return fmt.Errorf("invalid handle for member %s", record.Subject) 684 } 685 686 existing, err := db.GetSpindleMembers(i.Db, 687 orm.FilterEq("did", did), 688 orm.FilterEq("rkey", e.Commit.RKey), 689 ) 690 if err != nil { 691 return fmt.Errorf("failed to look up existing member: %w", err) 692 } 693 if len(existing) > 1 { 694 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 695 } 696 697 tx, err := i.Db.Begin() 698 if err != nil { 699 return fmt.Errorf("failed to start txn: %w", err) 700 } 701 committed := false 702 defer func() { 703 if committed { 704 return 705 } 706 tx.Rollback() 707 i.Enforcer.E.LoadPolicy() 708 }() 709 710 if len(existing) == 1 { 711 prev := existing[0] 712 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 713 if err = db.RemoveSpindleMember(tx, 714 orm.FilterEq("did", did), 715 orm.FilterEq("rkey", e.Commit.RKey), 716 ); err != nil { 717 return fmt.Errorf("failed to remove stale row: %w", err) 718 } 719 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 720 return fmt.Errorf("failed to remove stale ACL: %w", err) 721 } 722 } 723 } 724 725 if err = db.AddSpindleMember(tx, models.SpindleMember{ 726 Did: syntax.DID(did), 727 Rkey: e.Commit.RKey, 728 Instance: record.Instance, 729 Subject: memberId.DID, 730 }); err != nil { 731 return fmt.Errorf("failed to add to db: %w", err) 732 } 733 734 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 735 return fmt.Errorf("failed to update ACLs: %w", err) 736 } 737 738 if err = tx.Commit(); err != nil { 739 return fmt.Errorf("failed to commit txn: %w", err) 740 } 741 742 if err = i.Enforcer.E.SavePolicy(); err != nil { 743 return fmt.Errorf("failed to save ACLs: %w", err) 744 } 745 committed = true 746 747 l.Info("upserted spindle member") 748 case jmodels.CommitOperationDelete: 749 rkey := e.Commit.RKey 750 751 // get record from db first 752 members, err := db.GetSpindleMembers( 753 i.Db, 754 orm.FilterEq("did", did), 755 orm.FilterEq("rkey", rkey), 756 ) 757 if err != nil || len(members) != 1 { 758 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 759 } 760 member := members[0] 761 762 tx, err := i.Db.Begin() 763 if err != nil { 764 return fmt.Errorf("failed to start txn: %w", err) 765 } 766 committed := false 767 defer func() { 768 if committed { 769 return 770 } 771 tx.Rollback() 772 i.Enforcer.E.LoadPolicy() 773 }() 774 775 // remove record by rkey && update enforcer 776 if err = db.RemoveSpindleMember( 777 tx, 778 orm.FilterEq("did", did), 779 orm.FilterEq("rkey", rkey), 780 ); err != nil { 781 return fmt.Errorf("failed to remove from db: %w", err) 782 } 783 784 // update enforcer 785 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 786 if err != nil { 787 return fmt.Errorf("failed to update ACLs: %w", err) 788 } 789 790 if err = tx.Commit(); err != nil { 791 return fmt.Errorf("failed to commit txn: %w", err) 792 } 793 794 if err = i.Enforcer.E.SavePolicy(); err != nil { 795 return fmt.Errorf("failed to save ACLs: %w", err) 796 } 797 committed = true 798 799 l.Info("removed spindle member") 800 } 801 802 return nil 803} 804 805func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 806 did := e.Did 807 var err error 808 809 l := i.Logger.With("handler", "ingestSpindle") 810 l = l.With("nsid", e.Commit.Collection) 811 812 switch e.Commit.Operation { 813 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 814 raw := json.RawMessage(e.Commit.Record) 815 record := tangled.Spindle{} 816 err = json.Unmarshal(raw, &record) 817 if err != nil { 818 l.Error("invalid record", "err", err) 819 return err 820 } 821 822 instance := e.Commit.RKey 823 824 err := db.AddSpindle(i.Db, models.Spindle{ 825 Owner: syntax.DID(did), 826 Instance: instance, 827 }) 828 if err != nil { 829 l.Error("failed to add spindle to db", "err", err, "instance", instance) 830 return err 831 } 832 833 if err := i.verifySpindle(ctx, instance, did); err != nil { 834 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 835 } 836 837 return nil 838 839 case jmodels.CommitOperationDelete: 840 instance := e.Commit.RKey 841 842 // get record from db first 843 spindles, err := db.GetSpindles( 844 ctx, 845 i.Db, 846 orm.FilterEq("owner", did), 847 orm.FilterEq("instance", instance), 848 ) 849 if err != nil || len(spindles) != 1 { 850 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 851 } 852 spindle := spindles[0] 853 854 tx, err := i.Db.Begin() 855 if err != nil { 856 return err 857 } 858 defer func() { 859 tx.Rollback() 860 i.Enforcer.E.LoadPolicy() 861 }() 862 863 // remove spindle members first 864 err = db.RemoveSpindleMember( 865 tx, 866 orm.FilterEq("owner", did), 867 orm.FilterEq("instance", instance), 868 ) 869 if err != nil { 870 return err 871 } 872 873 err = db.DeleteSpindle( 874 tx, 875 orm.FilterEq("owner", did), 876 orm.FilterEq("instance", instance), 877 ) 878 if err != nil { 879 return err 880 } 881 882 if spindle.Verified != nil { 883 err = i.Enforcer.RemoveSpindle(instance) 884 if err != nil { 885 return err 886 } 887 } 888 889 err = tx.Commit() 890 if err != nil { 891 return err 892 } 893 894 err = i.Enforcer.E.SavePolicy() 895 if err != nil { 896 return err 897 } 898 } 899 900 return nil 901} 902 903func (i *Ingester) ingestString(e *jmodels.Event) error { 904 did := e.Did 905 rkey := e.Commit.RKey 906 907 var err error 908 909 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 910 l.Info("ingesting record") 911 912 switch e.Commit.Operation { 913 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 914 raw := json.RawMessage(e.Commit.Record) 915 record := tangled.String{} 916 err = json.Unmarshal(raw, &record) 917 if err != nil { 918 l.Error("invalid record", "err", err) 919 return err 920 } 921 922 string := models.StringFromRecord(did, rkey, record) 923 924 if err = i.Validator.ValidateString(&string); err != nil { 925 l.Error("invalid record", "err", err) 926 return err 927 } 928 929 if err = db.AddString(i.Db, string); err != nil { 930 l.Error("failed to add string", "err", err) 931 return err 932 } 933 934 return nil 935 936 case jmodels.CommitOperationDelete: 937 if err := db.DeleteString( 938 i.Db, 939 orm.FilterEq("did", did), 940 orm.FilterEq("rkey", rkey), 941 ); err != nil { 942 l.Error("failed to delete", "err", err) 943 return fmt.Errorf("failed to delete string record: %w", err) 944 } 945 946 return nil 947 } 948 949 return nil 950} 951 952func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error { 953 did := e.Did 954 var err error 955 956 l := i.Logger.With("handler", "ingestKnotMember") 957 l = l.With("nsid", e.Commit.Collection) 958 959 switch e.Commit.Operation { 960 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 961 raw := json.RawMessage(e.Commit.Record) 962 record := tangled.KnotMember{} 963 err = json.Unmarshal(raw, &record) 964 if err != nil { 965 l.Error("invalid record", "err", err) 966 return err 967 } 968 969 // only knot owner can invite to knots 970 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 971 if err != nil { 972 return fmt.Errorf("failed to check invite permission: %w", err) 973 } 974 if !ok { 975 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 976 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 977 } 978 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 979 if err != nil { 980 return fmt.Errorf("failed to re-check invite permission: %w", err) 981 } 982 if !ok { 983 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 984 } 985 } 986 987 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 988 if err != nil { 989 return err 990 } 991 992 if memberId.Handle.IsInvalidHandle() { 993 return fmt.Errorf("invalid handle for member %s", record.Subject) 994 } 995 996 existing, err := db.GetKnotMembers(i.Db, 997 orm.FilterEq("did", did), 998 orm.FilterEq("rkey", e.Commit.RKey), 999 ) 1000 if err != nil { 1001 return fmt.Errorf("failed to look up existing member: %w", err) 1002 } 1003 if len(existing) > 1 { 1004 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1005 } 1006 1007 tx, err := i.Db.Begin() 1008 if err != nil { 1009 return fmt.Errorf("failed to start txn: %w", err) 1010 } 1011 committed := false 1012 defer func() { 1013 if committed { 1014 return 1015 } 1016 tx.Rollback() 1017 i.Enforcer.E.LoadPolicy() 1018 }() 1019 1020 if len(existing) == 1 { 1021 prev := existing[0] 1022 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1023 if err = db.RemoveKnotMember(tx, 1024 orm.FilterEq("did", did), 1025 orm.FilterEq("rkey", e.Commit.RKey), 1026 ); err != nil { 1027 return fmt.Errorf("failed to remove stale row: %w", err) 1028 } 1029 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1030 return fmt.Errorf("failed to remove stale ACL: %w", err) 1031 } 1032 } 1033 } 1034 1035 if err = db.AddKnotMember(tx, models.KnotMember{ 1036 Did: syntax.DID(did), 1037 Rkey: e.Commit.RKey, 1038 Domain: record.Domain, 1039 Subject: memberId.DID, 1040 }); err != nil { 1041 return fmt.Errorf("failed to add to db: %w", err) 1042 } 1043 1044 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1045 return fmt.Errorf("failed to update ACLs: %w", err) 1046 } 1047 1048 if err = tx.Commit(); err != nil { 1049 return fmt.Errorf("failed to commit txn: %w", err) 1050 } 1051 1052 if err = i.Enforcer.E.SavePolicy(); err != nil { 1053 return fmt.Errorf("failed to save ACLs: %w", err) 1054 } 1055 committed = true 1056 1057 l.Info("upserted knot member") 1058 case jmodels.CommitOperationDelete: 1059 rkey := e.Commit.RKey 1060 1061 members, err := db.GetKnotMembers( 1062 i.Db, 1063 orm.FilterEq("did", did), 1064 orm.FilterEq("rkey", rkey), 1065 ) 1066 if err != nil { 1067 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1068 } 1069 if len(members) == 0 { 1070 l.Info("knot member already removed", "rkey", rkey) 1071 return nil 1072 } 1073 if len(members) > 1 { 1074 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1075 } 1076 member := members[0] 1077 1078 tx, err := i.Db.Begin() 1079 if err != nil { 1080 return fmt.Errorf("failed to start txn: %w", err) 1081 } 1082 committed := false 1083 defer func() { 1084 if committed { 1085 return 1086 } 1087 tx.Rollback() 1088 i.Enforcer.E.LoadPolicy() 1089 }() 1090 1091 if err = db.RemoveKnotMember( 1092 tx, 1093 orm.FilterEq("did", did), 1094 orm.FilterEq("rkey", rkey), 1095 ); err != nil { 1096 return fmt.Errorf("failed to remove from db: %w", err) 1097 } 1098 1099 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1100 return fmt.Errorf("failed to update ACLs: %w", err) 1101 } 1102 1103 if err = tx.Commit(); err != nil { 1104 return fmt.Errorf("failed to commit txn: %w", err) 1105 } 1106 1107 if err = i.Enforcer.E.SavePolicy(); err != nil { 1108 return fmt.Errorf("failed to save ACLs: %w", err) 1109 } 1110 committed = true 1111 1112 l.Info("removed knot member") 1113 } 1114 1115 return nil 1116} 1117 1118func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error { 1119 did := e.Did 1120 var err error 1121 1122 l := i.Logger.With("handler", "ingestKnot") 1123 l = l.With("nsid", e.Commit.Collection) 1124 1125 switch e.Commit.Operation { 1126 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1127 raw := json.RawMessage(e.Commit.Record) 1128 record := tangled.Knot{} 1129 err = json.Unmarshal(raw, &record) 1130 if err != nil { 1131 l.Error("invalid record", "err", err) 1132 return err 1133 } 1134 1135 domain := e.Commit.RKey 1136 1137 err := db.AddKnot(i.Db, domain, did) 1138 if err != nil { 1139 l.Error("failed to add knot to db", "err", err, "domain", domain) 1140 return err 1141 } 1142 1143 if err := i.verifyKnot(ctx, domain, did); err != nil { 1144 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1145 } 1146 1147 return nil 1148 1149 case jmodels.CommitOperationDelete: 1150 domain := e.Commit.RKey 1151 1152 // get record from db first 1153 registrations, err := db.GetRegistrations( 1154 i.Db, 1155 orm.FilterEq("domain", domain), 1156 orm.FilterEq("did", did), 1157 ) 1158 if err != nil { 1159 return fmt.Errorf("failed to get registration: %w", err) 1160 } 1161 if len(registrations) != 1 { 1162 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1163 } 1164 registration := registrations[0] 1165 1166 tx, err := i.Db.Begin() 1167 if err != nil { 1168 return err 1169 } 1170 defer func() { 1171 tx.Rollback() 1172 i.Enforcer.E.LoadPolicy() 1173 }() 1174 1175 err = db.RemoveKnotMember( 1176 tx, 1177 orm.FilterEq("did", did), 1178 orm.FilterEq("domain", domain), 1179 ) 1180 if err != nil { 1181 return err 1182 } 1183 1184 err = db.DeleteKnot( 1185 tx, 1186 orm.FilterEq("did", did), 1187 orm.FilterEq("domain", domain), 1188 ) 1189 if err != nil { 1190 return err 1191 } 1192 1193 if registration.Registered != nil { 1194 err = i.Enforcer.RemoveKnot(domain) 1195 if err != nil { 1196 return err 1197 } 1198 } 1199 1200 err = tx.Commit() 1201 if err != nil { 1202 return err 1203 } 1204 1205 err = i.Enforcer.E.SavePolicy() 1206 if err != nil { 1207 return err 1208 } 1209 } 1210 1211 return nil 1212} 1213 1214const ( 1215 verifyAttempts = 4 1216 verifyMinDelay = 1 * time.Second 1217 verifyMaxDelay = 5 * time.Second 1218) 1219 1220func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1221 regs, err := db.GetRegistrations(i.Db, 1222 orm.FilterEq("domain", domain), 1223 orm.FilterEq("did", did), 1224 ) 1225 if err != nil { 1226 return fmt.Errorf("look up registration: %w", err) 1227 } 1228 if len(regs) != 1 { 1229 return fmt.Errorf("no registration for %s by %s", domain, did) 1230 } 1231 if regs[0].Registered != nil { 1232 return nil 1233 } 1234 1235 err = retry.Do( 1236 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1237 retry.Context(ctx), 1238 retry.Attempts(verifyAttempts), 1239 retry.Delay(verifyMinDelay), 1240 retry.MaxDelay(verifyMaxDelay), 1241 retry.DelayType(retry.BackOffDelay), 1242 retry.LastErrorOnly(true), 1243 ) 1244 if err != nil { 1245 return fmt.Errorf("verify: %w", err) 1246 } 1247 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1248} 1249 1250func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1251 spindles, err := db.GetSpindles(ctx, i.Db, 1252 orm.FilterEq("instance", instance), 1253 orm.FilterEq("owner", did), 1254 ) 1255 if err != nil { 1256 return fmt.Errorf("look up spindle: %w", err) 1257 } 1258 if len(spindles) != 1 { 1259 return fmt.Errorf("no spindle for %s by %s", instance, did) 1260 } 1261 if spindles[0].Verified != nil { 1262 return nil 1263 } 1264 1265 err = retry.Do( 1266 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1267 retry.Context(ctx), 1268 retry.Attempts(verifyAttempts), 1269 retry.Delay(verifyMinDelay), 1270 retry.MaxDelay(verifyMaxDelay), 1271 retry.DelayType(retry.BackOffDelay), 1272 retry.LastErrorOnly(true), 1273 ) 1274 if err != nil { 1275 return fmt.Errorf("verify: %w", err) 1276 } 1277 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1278 return err 1279} 1280 1281const sweepConcurrency = 4 1282 1283func (i *Ingester) SweepPendingVerifications() { 1284 l := i.Logger.With("handler", "SweepPendingVerifications") 1285 1286 var g errgroup.Group 1287 g.SetLimit(sweepConcurrency) 1288 1289 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1290 if err != nil { 1291 l.Error("failed to list unverified knots", "err", err) 1292 } else { 1293 for _, reg := range regs { 1294 g.Go(func() error { 1295 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1296 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1297 } 1298 return nil 1299 }) 1300 } 1301 } 1302 1303 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1304 if err != nil { 1305 l.Error("failed to list unverified spindles", "err", err) 1306 g.Wait() 1307 return 1308 } 1309 for _, s := range spindles { 1310 g.Go(func() error { 1311 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1312 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1313 } 1314 return nil 1315 }) 1316 } 1317 g.Wait() 1318} 1319 1320func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1321 did := e.Did 1322 rkey := e.Commit.RKey 1323 1324 var err error 1325 1326 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1327 l.Info("ingesting record") 1328 1329 switch e.Commit.Operation { 1330 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1331 raw := json.RawMessage(e.Commit.Record) 1332 record := tangled.RepoIssue{} 1333 err = json.Unmarshal(raw, &record) 1334 if err != nil { 1335 l.Error("invalid record", "err", err) 1336 return err 1337 } 1338 1339 issue := models.IssueFromRecord(did, rkey, record) 1340 1341 if issue.RepoDid == "" { 1342 return fmt.Errorf("issue record has no repo field") 1343 } 1344 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1345 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1346 } 1347 1348 if err := i.Validator.ValidateIssue(&issue); err != nil { 1349 return fmt.Errorf("failed to validate issue: %w", err) 1350 } 1351 1352 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1353 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1354 if repoErr == nil && repo.RepoDid != "" { 1355 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1356 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1357 } 1358 } 1359 } 1360 1361 tx, err := i.Db.BeginTx(ctx, nil) 1362 if err != nil { 1363 l.Error("failed to begin transaction", "err", err) 1364 return err 1365 } 1366 defer tx.Rollback() 1367 1368 err = db.PutIssue(tx, &issue) 1369 if err != nil { 1370 l.Error("failed to create issue", "err", err) 1371 return err 1372 } 1373 1374 err = tx.Commit() 1375 if err != nil { 1376 l.Error("failed to commit txn", "err", err) 1377 return err 1378 } 1379 1380 return nil 1381 1382 case jmodels.CommitOperationDelete: 1383 tx, err := i.Db.BeginTx(ctx, nil) 1384 if err != nil { 1385 l.Error("failed to begin transaction", "err", err) 1386 return err 1387 } 1388 defer tx.Rollback() 1389 1390 if err := db.DeleteIssues( 1391 tx, 1392 did, 1393 rkey, 1394 ); err != nil { 1395 l.Error("failed to delete", "err", err) 1396 return fmt.Errorf("failed to delete issue record: %w", err) 1397 } 1398 if err := tx.Commit(); err != nil { 1399 l.Error("failed to commit txn", "err", err) 1400 return err 1401 } 1402 1403 return nil 1404 } 1405 1406 return nil 1407} 1408 1409func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1410 did := e.Did 1411 rkey := e.Commit.RKey 1412 1413 var err error 1414 1415 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1416 l.Info("ingesting record") 1417 1418 switch e.Commit.Operation { 1419 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1420 raw := json.RawMessage(e.Commit.Record) 1421 record := tangled.RepoPull{} 1422 err = json.Unmarshal(raw, &record) 1423 if err != nil { 1424 l.Error("invalid record", "err", err) 1425 return err 1426 } 1427 1428 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1429 if err != nil { 1430 l.Error("failed to resolve did") 1431 return err 1432 } 1433 1434 // go through and fetch all blobs in parallel 1435 readers := make([]*io.ReadCloser, len(record.Rounds)) 1436 var mu sync.Mutex 1437 1438 g, gctx := errgroup.WithContext(ctx) 1439 1440 for idx, b := range record.Rounds { 1441 g.Go(func() error { 1442 // for some reason, a blob is empty 1443 if b.PatchBlob == nil { 1444 return fmt.Errorf("missing patchBlob in round %d", idx) 1445 } 1446 1447 ownerPds := ownerId.PDSEndpoint() 1448 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1449 q := url.Query() 1450 q.Set("cid", b.PatchBlob.Ref.String()) 1451 q.Set("did", did) 1452 url.RawQuery = q.Encode() 1453 1454 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1455 if err != nil { 1456 l.Error("failed to create request") 1457 return err 1458 } 1459 req.Header.Set("Content-Type", "application/json") 1460 1461 resp, err := http.DefaultClient.Do(req) 1462 if err != nil { 1463 l.Error("failed to make request") 1464 return err 1465 } 1466 1467 mu.Lock() 1468 readers[idx] = &resp.Body 1469 mu.Unlock() 1470 1471 return nil 1472 }) 1473 } 1474 1475 if err := g.Wait(); err != nil { 1476 for _, r := range readers { 1477 if r != nil && *r != nil { 1478 (*r).Close() 1479 } 1480 } 1481 return err 1482 } 1483 1484 defer func() { 1485 for _, r := range readers { 1486 if r != nil && *r != nil { 1487 (*r).Close() 1488 } 1489 } 1490 }() 1491 1492 pull, err := models.PullFromRecord(did, rkey, record, readers) 1493 if err != nil { 1494 return fmt.Errorf("failed to parse pull from record: %w", err) 1495 } 1496 if err := i.Validator.ValidatePull(pull); err != nil { 1497 return fmt.Errorf("failed to validate pull: %w", err) 1498 } 1499 1500 tx, err := i.Db.BeginTx(ctx, nil) 1501 if err != nil { 1502 l.Error("failed to begin transaction", "err", err) 1503 return err 1504 } 1505 defer tx.Rollback() 1506 1507 err = db.PutPull(tx, pull) 1508 if err != nil { 1509 l.Error("failed to create pull", "err", err) 1510 return err 1511 } 1512 1513 err = tx.Commit() 1514 if err != nil { 1515 l.Error("failed to commit txn", "err", err) 1516 return err 1517 } 1518 1519 return nil 1520 1521 case jmodels.CommitOperationDelete: 1522 tx, err := i.Db.BeginTx(ctx, nil) 1523 if err != nil { 1524 l.Error("failed to begin transaction", "err", err) 1525 return err 1526 } 1527 defer tx.Rollback() 1528 1529 if err := db.AbandonPulls( 1530 tx, 1531 orm.FilterEq("owner_did", did), 1532 orm.FilterEq("rkey", rkey), 1533 ); err != nil { 1534 l.Error("failed to abandon", "err", err) 1535 return fmt.Errorf("failed to abandon pull record: %w", err) 1536 } 1537 if err := tx.Commit(); err != nil { 1538 l.Error("failed to commit txn", "err", err) 1539 return err 1540 } 1541 1542 return nil 1543 } 1544 1545 return nil 1546} 1547 1548// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1549func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1550 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1551 l.Info("ingesting record") 1552 1553 switch e.Commit.Operation { 1554 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1555 // no-op. sh.tangled.repo.issue.comment is deprecated 1556 1557 case jmodels.CommitOperationDelete: 1558 if err := db.PurgeComments( 1559 i.Db, 1560 orm.FilterEq("did", e.Did), 1561 orm.FilterEq("collection", e.Commit.Collection), 1562 orm.FilterEq("rkey", e.Commit.RKey), 1563 ); err != nil { 1564 return fmt.Errorf("failed to delete comment record: %w", err) 1565 } 1566 } 1567 1568 return nil 1569} 1570 1571// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1572func (i *Ingester) ingestPullComment(e *jmodels.Event) error { 1573 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1574 l.Info("ingesting record") 1575 1576 switch e.Commit.Operation { 1577 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1578 // no-op. sh.tangled.repo.pull.comment is deprecated 1579 1580 case jmodels.CommitOperationDelete: 1581 if err := db.PurgeComments( 1582 i.Db, 1583 orm.FilterEq("did", e.Did), 1584 orm.FilterEq("collection", e.Commit.Collection), 1585 orm.FilterEq("rkey", e.Commit.RKey), 1586 ); err != nil { 1587 return fmt.Errorf("failed to delete comment record: %w", err) 1588 } 1589 } 1590 1591 return nil 1592} 1593 1594func (i *Ingester) ingestComment(e *jmodels.Event) error { 1595 did := e.Did 1596 rkey := e.Commit.RKey 1597 cid := e.Commit.CID 1598 1599 var err error 1600 1601 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1602 l.Info("ingesting record") 1603 1604 ctx := context.Background() 1605 1606 switch e.Commit.Operation { 1607 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1608 raw := json.RawMessage(e.Commit.Record) 1609 record := tangled.FeedComment{} 1610 err = json.Unmarshal(raw, &record) 1611 if err != nil { 1612 return fmt.Errorf("invalid record: %w", err) 1613 } 1614 1615 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1616 if err != nil { 1617 return fmt.Errorf("failed to parse comment from record: %w", err) 1618 } 1619 1620 if err := comment.Validate(); err != nil { 1621 return fmt.Errorf("failed to validate comment: %w", err) 1622 } 1623 1624 var references []syntax.ATURI 1625 if comment.Body.Original != nil { 1626 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1627 } 1628 1629 tx, err := i.Db.Begin() 1630 if err != nil { 1631 return fmt.Errorf("failed to start transaction: %w", err) 1632 } 1633 defer tx.Rollback() 1634 1635 _, err = db.PutComment(tx, comment, references) 1636 if err != nil { 1637 return fmt.Errorf("failed to create comment: %w", err) 1638 } 1639 1640 if err := tx.Commit(); err != nil { 1641 return err 1642 } 1643 1644 case jmodels.CommitOperationDelete: 1645 if err := db.DeleteComments( 1646 i.Db, 1647 orm.FilterEq("did", did), 1648 orm.FilterEq("collection", e.Commit.Collection), 1649 orm.FilterEq("rkey", rkey), 1650 ); err != nil { 1651 return fmt.Errorf("failed to delete comment record: %w", err) 1652 } 1653 1654 return nil 1655 } 1656 1657 return nil 1658} 1659 1660func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1661 did := e.Did 1662 rkey := e.Commit.RKey 1663 1664 var err error 1665 1666 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1667 l.Info("ingesting record") 1668 1669 switch e.Commit.Operation { 1670 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1671 raw := json.RawMessage(e.Commit.Record) 1672 record := tangled.LabelDefinition{} 1673 err = json.Unmarshal(raw, &record) 1674 if err != nil { 1675 return fmt.Errorf("invalid record: %w", err) 1676 } 1677 1678 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1679 if err != nil { 1680 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1681 } 1682 1683 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1684 return fmt.Errorf("failed to validate labeldef: %w", err) 1685 } 1686 1687 _, err = db.AddLabelDefinition(i.Db, def) 1688 if err != nil { 1689 return fmt.Errorf("failed to create labeldef: %w", err) 1690 } 1691 1692 return nil 1693 1694 case jmodels.CommitOperationDelete: 1695 if err := db.DeleteLabelDefinition( 1696 i.Db, 1697 orm.FilterEq("did", did), 1698 orm.FilterEq("rkey", rkey), 1699 ); err != nil { 1700 return fmt.Errorf("failed to delete labeldef record: %w", err) 1701 } 1702 1703 return nil 1704 } 1705 1706 return nil 1707} 1708 1709func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1710 did := e.Did 1711 rkey := e.Commit.RKey 1712 1713 var err error 1714 1715 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1716 l.Info("ingesting record") 1717 1718 switch e.Commit.Operation { 1719 case jmodels.CommitOperationCreate: 1720 raw := json.RawMessage(e.Commit.Record) 1721 record := tangled.LabelOp{} 1722 err = json.Unmarshal(raw, &record) 1723 if err != nil { 1724 return fmt.Errorf("invalid record: %w", err) 1725 } 1726 1727 subject := syntax.ATURI(record.Subject) 1728 collection := subject.Collection() 1729 1730 var repo *models.Repo 1731 switch collection { 1732 case tangled.RepoIssueNSID: 1733 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1734 if err != nil || len(i) != 1 { 1735 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1736 } 1737 repo = i[0].Repo 1738 default: 1739 return fmt.Errorf("unsupported label subject: %s", collection) 1740 } 1741 1742 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1743 if err != nil { 1744 return fmt.Errorf("failed to build label application ctx: %w", err) 1745 } 1746 1747 ops := models.LabelOpsFromRecord(did, rkey, record) 1748 1749 for _, o := range ops { 1750 def, ok := actx.Defs[o.OperandKey] 1751 if !ok { 1752 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1753 } 1754 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1755 return fmt.Errorf("failed to validate labelop: %w", err) 1756 } 1757 } 1758 1759 tx, err := i.Db.Begin() 1760 if err != nil { 1761 return err 1762 } 1763 defer tx.Rollback() 1764 1765 for _, o := range ops { 1766 _, err = db.AddLabelOp(tx, &o) 1767 if err != nil { 1768 return fmt.Errorf("failed to add labelop: %w", err) 1769 } 1770 } 1771 1772 if err = tx.Commit(); err != nil { 1773 return err 1774 } 1775 } 1776 1777 return nil 1778}