Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/knotacl" 31 "tangled.org/core/appview/mentions" 32 "tangled.org/core/appview/models" 33 "tangled.org/core/appview/notify" 34 "tangled.org/core/appview/repoverify" 35 "tangled.org/core/appview/serververify" 36 "tangled.org/core/appview/validator" 37 "tangled.org/core/blobstore" 38 "tangled.org/core/idresolver" 39 "tangled.org/core/orm" 40 "tangled.org/core/rbac" 41) 42 43type Ingester struct { 44 Ctx context.Context 45 Db *db.DB 46 Enforcer *rbac.Enforcer 47 Acl *knotacl.Service 48 IdResolver *idresolver.Resolver 49 BlobStore blobstore.BlobStore 50 Cache *cache.Cache 51 Config *config.Config 52 Logger *slog.Logger 53 Validator *validator.Validator 54 MentionsResolver *mentions.Resolver 55 Notifier notify.Notifier 56 Verifier repoverify.Verifier 57} 58 59type processFunc func(ctx context.Context, e *jmodels.Event) error 60 61func (i *Ingester) Ingest() processFunc { 62 return func(ctx context.Context, e *jmodels.Event) error { 63 var err error 64 65 l := i.Logger.With("kind", e.Kind) 66 switch e.Kind { 67 case jmodels.EventKindAccount: 68 // TODO: sync account state to db 69 if e.Account.Active { 70 break 71 } 72 // TODO: revoke sessions by DID 73 if *e.Account.Status == "deactivated" { 74 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 75 } 76 case jmodels.EventKindIdentity: 77 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 78 case jmodels.EventKindCommit: 79 l = l.With( 80 "nsid", e.Commit.Collection, 81 "did", e.Did, 82 "rkey", e.Commit.RKey, 83 "cid", e.Commit.CID, 84 "op", e.Commit.Operation, 85 ) 86 switch e.Commit.Collection { 87 case tangled.GraphFollowNSID: 88 err = i.ingestFollow(e, l) 89 case tangled.GraphVouchNSID: 90 err = i.ingestVouch(ctx, e, l) 91 case tangled.FeedStarNSID: 92 err = i.ingestStar(ctx, e, l) 93 case tangled.FeedReactionNSID: 94 err = i.ingestReaction(e, l) 95 case tangled.PublicKeyNSID: 96 err = i.ingestPublicKey(e, l) 97 case tangled.RepoArtifactNSID: 98 err = i.ingestArtifact(ctx, e, l) 99 case tangled.ActorProfileNSID: 100 err = i.ingestProfile(ctx, e, l) 101 case tangled.SpindleMemberNSID: 102 err = i.ingestSpindleMember(ctx, e, l) 103 case tangled.SpindleNSID: 104 err = i.ingestSpindle(ctx, e, l) 105 case tangled.KnotMemberNSID: 106 err = i.ingestKnotMember(ctx, e, l) 107 case tangled.KnotNSID: 108 err = i.ingestKnot(ctx, e, l) 109 case tangled.StringNSID: 110 err = i.ingestString(ctx, e, l) 111 case tangled.RepoIssueNSID: 112 err = i.ingestIssue(ctx, e, l) 113 case tangled.RepoPullNSID: 114 err = i.ingestPull(ctx, e, l) 115 case tangled.FeedCommentNSID: 116 err = i.ingestComment(e, l) 117 case tangled.RepoIssueCommentNSID: 118 err = i.ingestIssueComment(e, l) 119 case tangled.RepoPullCommentNSID: 120 err = i.ingestPullComment(e, l) 121 case tangled.LabelDefinitionNSID: 122 err = i.ingestLabelDefinition(e, l) 123 case tangled.LabelOpNSID: 124 err = i.ingestLabelOp(ctx, e, l) 125 case tangled.RepoNSID: 126 err = i.ingestRepo(ctx, e, l) 127 } 128 } 129 130 if err != nil { 131 l.Warn("failed to ingest record, skipping", "err", err) 132 } 133 134 lastTimeUs := e.TimeUS + 1 135 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 136 l.Error("failed to save cursor", "err", saveErr) 137 } 138 139 return nil 140 } 141} 142 143func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 144 if strings.HasPrefix(ref, "did:") { 145 return db.GetRepoByDid(i.Db, ref) 146 } 147 return db.GetRepoByAtUri(i.Db, ref) 148} 149 150func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 151 var legacy struct { 152 Subject *string `json:"subject"` 153 SubjectDid *string `json:"subjectDid"` 154 } 155 if err := json.Unmarshal(raw, &legacy); err != nil { 156 return false, err 157 } 158 159 switch { 160 case legacy.SubjectDid != nil: 161 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 162 if err != nil { 163 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 164 return false, nil 165 } 166 star.SubjectType = models.StarSubjectRepo 167 star.Subject = repo.RepoDid 168 return true, nil 169 170 case legacy.Subject != nil: 171 uri, err := syntax.ParseATURI(*legacy.Subject) 172 if err != nil { 173 return false, fmt.Errorf("invalid old-format star subject: %w", err) 174 } 175 switch uri.Collection().String() { 176 case tangled.RepoNSID: 177 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 178 if err != nil { 179 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 180 return false, nil 181 } 182 star.SubjectType = models.StarSubjectRepo 183 star.Subject = repo.RepoDid 184 return true, nil 185 default: 186 star.SubjectType = models.StarSubjectString 187 star.Subject = *legacy.Subject 188 return true, nil 189 } 190 191 default: 192 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 193 } 194} 195 196func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 197 var err error 198 did := e.Did 199 200 l = l.With("handler", "ingestStar") 201 202 switch e.Commit.Operation { 203 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 204 raw := json.RawMessage(e.Commit.Record) 205 record := tangled.FeedStar{} 206 unmarshalErr := json.Unmarshal(raw, &record) 207 208 star := &models.Star{ 209 Did: did, 210 Rkey: e.Commit.RKey, 211 } 212 213 switch { 214 case unmarshalErr != nil: 215 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 216 if resolveErr != nil { 217 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 218 return unmarshalErr 219 } 220 if !resolved { 221 return nil 222 } 223 224 case record.Subject == nil: 225 return fmt.Errorf("star record has nil subject") 226 227 case record.Subject.FeedStar_Repo != nil: 228 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 229 if repoErr != nil { 230 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 231 return nil 232 } 233 star.SubjectType = models.StarSubjectRepo 234 star.Subject = repo.RepoDid 235 236 case record.Subject.FeedStar_String != nil: 237 star.SubjectType = models.StarSubjectString 238 star.Subject = record.Subject.FeedStar_String.Uri 239 240 default: 241 return fmt.Errorf("star record has empty subject union") 242 } 243 244 err = db.AddStar(i.Db, star) 245 case jmodels.CommitOperationDelete: 246 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 247 } 248 249 if err != nil { 250 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 251 } 252 253 l.Info("ingested record") 254 return nil 255} 256 257func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error { 258 var err error 259 did := e.Did 260 261 l = l.With("handler", "ingestFollow") 262 263 switch e.Commit.Operation { 264 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 265 raw := json.RawMessage(e.Commit.Record) 266 record := tangled.GraphFollow{} 267 err = json.Unmarshal(raw, &record) 268 if err != nil { 269 l.Error("invalid record", "err", err) 270 return err 271 } 272 273 err = db.AddFollow(i.Db, &models.Follow{ 274 UserDid: did, 275 SubjectDid: record.Subject, 276 Rkey: e.Commit.RKey, 277 }) 278 case jmodels.CommitOperationDelete: 279 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 280 } 281 282 if err != nil { 283 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 284 } 285 286 l.Info("ingested record") 287 return nil 288} 289 290func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 291 var err error 292 did := e.Did 293 294 l = l.With("handler", "ingestVouch") 295 296 switch e.Commit.Operation { 297 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 298 raw := json.RawMessage(e.Commit.Record) 299 record := tangled.GraphVouch{} 300 err = json.Unmarshal(raw, &record) 301 if err != nil { 302 l.Error("invalid record", "err", err) 303 return err 304 } 305 306 // rkey is the subject_did being vouched for/denounced 307 subjectDID := e.Commit.RKey 308 309 _, err = syntax.ParseDID(subjectDID) 310 if err != nil { 311 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 312 return fmt.Errorf("invalid subject_did: %w", err) 313 } 314 315 if did == subjectDID { 316 l.Warn("attempted self-vouch", "did", did) 317 return fmt.Errorf("cannot vouch for self") 318 } 319 320 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 321 if err != nil { 322 return err 323 } 324 325 if subjectId.Handle.IsInvalidHandle() { 326 return err 327 } 328 329 kind, err := models.ParseVouchKind(record.Kind) 330 if err != nil { 331 l.Error("invalid kind", "kind", kind) 332 return fmt.Errorf("invalid kind: %s", kind) 333 } 334 335 recordCid, err := cid.Parse(e.Commit.CID) 336 if err != nil { 337 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 338 return fmt.Errorf("invalid cid: %w", err) 339 } 340 341 var evidences []syntax.ATURI 342 for _, raw := range record.Evidences { 343 uri, parseErr := syntax.ParseATURI(raw) 344 if parseErr != nil { 345 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 346 continue 347 } 348 evidences = append(evidences, uri) 349 } 350 351 tx, txErr := i.Db.Begin() 352 if txErr != nil { 353 return fmt.Errorf("failed to start transaction: %w", txErr) 354 } 355 356 addErr := db.AddVouch(tx, &models.Vouch{ 357 Did: syntax.DID(did), 358 SubjectDid: subjectId.DID, 359 Cid: recordCid, 360 Kind: kind, 361 Reason: record.Reason, 362 Evidences: evidences, 363 }) 364 if addErr != nil { 365 tx.Rollback() 366 err = addErr 367 } else { 368 err = tx.Commit() 369 } 370 371 case jmodels.CommitOperationDelete: 372 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 373 } 374 375 if err != nil { 376 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 377 } 378 379 l.Info("ingested record") 380 return nil 381} 382 383func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error { 384 did := e.Did 385 var err error 386 387 l = l.With("handler", "ingestPublicKey") 388 389 switch e.Commit.Operation { 390 case jmodels.CommitOperationCreate: 391 l.Debug("processing add of pubkey") 392 raw := json.RawMessage(e.Commit.Record) 393 record := tangled.PublicKey{} 394 err = json.Unmarshal(raw, &record) 395 if err != nil { 396 l.Error("invalid record", "err", err) 397 return err 398 } 399 400 name := record.Name 401 key := record.Key 402 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 403 case jmodels.CommitOperationUpdate: 404 l.Debug("processing update of pubkey") 405 raw := json.RawMessage(e.Commit.Record) 406 record := tangled.PublicKey{} 407 err = json.Unmarshal(raw, &record) 408 if err != nil { 409 l.Error("invalid record", "err", err) 410 return err 411 } 412 413 name := record.Name 414 key := record.Key 415 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 416 case jmodels.CommitOperationDelete: 417 l.Debug("processing delete of pubkey") 418 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 419 } 420 421 if err != nil { 422 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 423 } 424 425 l.Info("ingested record") 426 return nil 427} 428 429func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 430 did := e.Did 431 var err error 432 433 l = l.With("handler", "ingestArtifact") 434 435 switch e.Commit.Operation { 436 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 437 raw := json.RawMessage(e.Commit.Record) 438 record := tangled.RepoArtifact{} 439 err = json.Unmarshal(raw, &record) 440 if err != nil { 441 l.Error("invalid record", "err", err) 442 return err 443 } 444 445 var repo *models.Repo 446 if record.RepoDid != nil && *record.RepoDid != "" { 447 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 448 if err != nil && !errors.Is(err, sql.ErrNoRows) { 449 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 450 } 451 } 452 if repo == nil && record.Repo != nil { 453 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 454 if parseErr != nil { 455 return parseErr 456 } 457 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 458 if err != nil { 459 return err 460 } 461 } 462 if repo == nil { 463 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 464 } 465 466 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push") 467 if permErr != nil { 468 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr) 469 } else if !allowed { 470 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier()) 471 return nil 472 } 473 474 repoDid := repo.RepoDid 475 if repoDid == "" && record.RepoDid != nil { 476 repoDid = *record.RepoDid 477 } 478 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 479 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 480 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 481 } 482 } 483 484 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 485 if parseErr != nil { 486 createdAt = time.Now() 487 } 488 489 artifact := models.Artifact{ 490 Did: did, 491 Rkey: e.Commit.RKey, 492 RepoDid: syntax.DID(repo.RepoDid), 493 Tag: plumbing.Hash(record.Tag), 494 CreatedAt: createdAt, 495 BlobCid: cid.Cid(record.Artifact.Ref), 496 Name: record.Name, 497 Size: uint64(record.Artifact.Size), 498 MimeType: record.Artifact.MimeType, 499 } 500 501 err = db.AddArtifact(i.Db, artifact) 502 case jmodels.CommitOperationDelete: 503 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 504 } 505 506 if err != nil { 507 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 508 } 509 510 l.Info("ingested record") 511 return nil 512} 513 514func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 515 did := e.Did 516 var err error 517 518 l = l.With("handler", "ingestProfile") 519 520 if e.Commit.RKey != "self" { 521 return fmt.Errorf("ingestProfile only ingests `self` record") 522 } 523 524 switch e.Commit.Operation { 525 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 526 raw := json.RawMessage(e.Commit.Record) 527 record := tangled.ActorProfile{} 528 err = json.Unmarshal(raw, &record) 529 if err != nil { 530 l.Error("invalid record", "err", err) 531 return err 532 } 533 534 avatar := "" 535 if record.Avatar != nil { 536 avatar = record.Avatar.Ref.String() 537 } 538 539 description := "" 540 if record.Description != nil { 541 description = *record.Description 542 } 543 544 includeBluesky := record.Bluesky 545 546 pronouns := "" 547 if record.Pronouns != nil { 548 pronouns = *record.Pronouns 549 } 550 551 location := "" 552 if record.Location != nil { 553 location = *record.Location 554 } 555 556 var links [5]string 557 for i, l := range record.Links { 558 if i < 5 { 559 links[i] = l 560 } 561 } 562 563 var stats [2]models.VanityStat 564 for i, s := range record.Stats { 565 if i < 2 { 566 stats[i].Kind = models.ParseVanityStatKind(s) 567 } 568 } 569 570 var pinned [6]string 571 for i, r := range record.PinnedRepositories { 572 if i < 6 { 573 pinned[i] = r 574 } 575 } 576 577 var preferredHandle syntax.Handle 578 if record.PreferredHandle != nil { 579 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 580 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 581 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 582 preferredHandle = h 583 } 584 } 585 } 586 587 profile := models.Profile{ 588 Did: did, 589 Avatar: avatar, 590 Description: description, 591 IncludeBluesky: includeBluesky, 592 Location: location, 593 Links: links, 594 Stats: stats, 595 PinnedRepos: pinned, 596 Pronouns: pronouns, 597 PreferredHandle: preferredHandle, 598 } 599 600 tx, err := i.Db.Begin() 601 if err != nil { 602 return fmt.Errorf("failed to start transaction: %w", err) 603 } 604 605 err = db.ValidateProfile(tx, &profile) 606 if err != nil { 607 return fmt.Errorf("invalid profile record") 608 } 609 610 err = db.UpsertProfile(tx, &profile) 611 if err == nil && i.Cache != nil { 612 pipe := i.Cache.Pipeline() 613 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 614 if preferredHandle != "" { 615 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 616 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 617 } else { 618 pipe.Del(ctx, didKey) 619 } 620 if _, execErr := pipe.Exec(ctx); execErr != nil { 621 l.Warn("failed to update preferred handle cache", "err", execErr) 622 } 623 } 624 case jmodels.CommitOperationDelete: 625 tx, beginErr := i.Db.Begin() 626 if beginErr != nil { 627 return fmt.Errorf("failed to start transaction: %w", beginErr) 628 } 629 630 priorHandle, phErr := db.GetPreferredHandle(tx, did) 631 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 632 l.Warn("failed to read prior preferred handle", "err", phErr) 633 } 634 635 err = db.DeleteProfile(tx, did) 636 if err == nil && i.Cache != nil { 637 pipe := i.Cache.Pipeline() 638 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 639 if priorHandle != "" { 640 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 641 } 642 if _, execErr := pipe.Exec(ctx); execErr != nil { 643 l.Warn("failed to evict preferred handle cache", "err", execErr) 644 } 645 } 646 } 647 648 if err != nil { 649 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 650 } 651 652 l.Info("ingested record") 653 return nil 654} 655 656func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 657 did := e.Did 658 var err error 659 660 l = l.With("handler", "ingestSpindleMember") 661 662 switch e.Commit.Operation { 663 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 664 raw := json.RawMessage(e.Commit.Record) 665 record := tangled.SpindleMember{} 666 err = json.Unmarshal(raw, &record) 667 if err != nil { 668 l.Error("invalid record", "err", err) 669 return err 670 } 671 672 // only spindle owner can invite to spindles 673 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 674 if err != nil { 675 return fmt.Errorf("failed to check invite permission: %w", err) 676 } 677 if !ok { 678 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 679 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 680 } 681 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 682 if err != nil { 683 return fmt.Errorf("failed to re-check invite permission: %w", err) 684 } 685 if !ok { 686 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 687 } 688 } 689 690 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 691 if err != nil { 692 return err 693 } 694 695 if memberId.Handle.IsInvalidHandle() { 696 return fmt.Errorf("invalid handle for member %s", record.Subject) 697 } 698 699 existing, err := db.GetSpindleMembers(i.Db, 700 orm.FilterEq("did", did), 701 orm.FilterEq("rkey", e.Commit.RKey), 702 ) 703 if err != nil { 704 return fmt.Errorf("failed to look up existing member: %w", err) 705 } 706 if len(existing) > 1 { 707 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 708 } 709 710 tx, err := i.Db.Begin() 711 if err != nil { 712 return fmt.Errorf("failed to start txn: %w", err) 713 } 714 committed := false 715 defer func() { 716 if committed { 717 return 718 } 719 tx.Rollback() 720 i.Enforcer.E.LoadPolicy() 721 }() 722 723 if len(existing) == 1 { 724 prev := existing[0] 725 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 726 if err = db.RemoveSpindleMember(tx, 727 orm.FilterEq("did", did), 728 orm.FilterEq("rkey", e.Commit.RKey), 729 ); err != nil { 730 return fmt.Errorf("failed to remove stale row: %w", err) 731 } 732 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 733 return fmt.Errorf("failed to remove stale ACL: %w", err) 734 } 735 } 736 } 737 738 if err = db.AddSpindleMember(tx, models.SpindleMember{ 739 Did: syntax.DID(did), 740 Rkey: e.Commit.RKey, 741 Instance: record.Instance, 742 Subject: memberId.DID, 743 }); err != nil { 744 return fmt.Errorf("failed to add to db: %w", err) 745 } 746 747 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 748 return fmt.Errorf("failed to update ACLs: %w", err) 749 } 750 751 if err = tx.Commit(); err != nil { 752 return fmt.Errorf("failed to commit txn: %w", err) 753 } 754 755 if err = i.Enforcer.E.SavePolicy(); err != nil { 756 return fmt.Errorf("failed to save ACLs: %w", err) 757 } 758 committed = true 759 760 l.Info("upserted spindle member") 761 case jmodels.CommitOperationDelete: 762 rkey := e.Commit.RKey 763 764 // get record from db first 765 members, err := db.GetSpindleMembers( 766 i.Db, 767 orm.FilterEq("did", did), 768 orm.FilterEq("rkey", rkey), 769 ) 770 if err != nil || len(members) != 1 { 771 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 772 } 773 member := members[0] 774 775 tx, err := i.Db.Begin() 776 if err != nil { 777 return fmt.Errorf("failed to start txn: %w", err) 778 } 779 committed := false 780 defer func() { 781 if committed { 782 return 783 } 784 tx.Rollback() 785 i.Enforcer.E.LoadPolicy() 786 }() 787 788 // remove record by rkey && update enforcer 789 if err = db.RemoveSpindleMember( 790 tx, 791 orm.FilterEq("did", did), 792 orm.FilterEq("rkey", rkey), 793 ); err != nil { 794 return fmt.Errorf("failed to remove from db: %w", err) 795 } 796 797 // update enforcer 798 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 799 if err != nil { 800 return fmt.Errorf("failed to update ACLs: %w", err) 801 } 802 803 if err = tx.Commit(); err != nil { 804 return fmt.Errorf("failed to commit txn: %w", err) 805 } 806 807 if err = i.Enforcer.E.SavePolicy(); err != nil { 808 return fmt.Errorf("failed to save ACLs: %w", err) 809 } 810 committed = true 811 812 l.Info("removed spindle member") 813 } 814 815 return nil 816} 817 818func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 819 did := e.Did 820 var err error 821 822 l = l.With("handler", "ingestSpindle") 823 824 switch e.Commit.Operation { 825 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 826 raw := json.RawMessage(e.Commit.Record) 827 record := tangled.Spindle{} 828 err = json.Unmarshal(raw, &record) 829 if err != nil { 830 l.Error("invalid record", "err", err) 831 return err 832 } 833 834 instance := e.Commit.RKey 835 836 err := db.AddSpindle(i.Db, models.Spindle{ 837 Owner: syntax.DID(did), 838 Instance: instance, 839 }) 840 if err != nil { 841 l.Error("failed to add spindle to db", "err", err, "instance", instance) 842 return err 843 } 844 845 if err := i.verifySpindle(ctx, instance, did); err != nil { 846 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 847 } 848 849 l.Info("ingested record", "instance", instance) 850 return nil 851 852 case jmodels.CommitOperationDelete: 853 instance := e.Commit.RKey 854 855 // get record from db first 856 spindles, err := db.GetSpindles( 857 ctx, 858 i.Db, 859 orm.FilterEq("owner", did), 860 orm.FilterEq("instance", instance), 861 ) 862 if err != nil || len(spindles) != 1 { 863 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 864 } 865 spindle := spindles[0] 866 867 tx, err := i.Db.Begin() 868 if err != nil { 869 return fmt.Errorf("failed to start txn: %w", err) 870 } 871 defer func() { 872 tx.Rollback() 873 i.Enforcer.E.LoadPolicy() 874 }() 875 876 // remove spindle members first 877 err = db.RemoveSpindleMember( 878 tx, 879 orm.FilterEq("owner", did), 880 orm.FilterEq("instance", instance), 881 ) 882 if err != nil { 883 return fmt.Errorf("failed to remove spindle members: %w", err) 884 } 885 886 err = db.DeleteSpindle( 887 tx, 888 orm.FilterEq("owner", did), 889 orm.FilterEq("instance", instance), 890 ) 891 if err != nil { 892 return fmt.Errorf("failed to delete spindle: %w", err) 893 } 894 895 if spindle.Verified != nil { 896 err = i.Enforcer.RemoveSpindle(instance) 897 if err != nil { 898 return fmt.Errorf("failed to remove spindle from enforcer: %w", err) 899 } 900 } 901 902 err = tx.Commit() 903 if err != nil { 904 return fmt.Errorf("failed to commit txn: %w", err) 905 } 906 907 err = i.Enforcer.E.SavePolicy() 908 if err != nil { 909 return fmt.Errorf("failed to save ACLs: %w", err) 910 } 911 912 l.Info("ingested record", "instance", instance) 913 } 914 915 return nil 916} 917 918func (i *Ingester) ingestString(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 919 did := e.Did 920 rkey := e.Commit.RKey 921 922 var err error 923 924 l = l.With("handler", "ingestString") 925 926 switch e.Commit.Operation { 927 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 928 raw := json.RawMessage(e.Commit.Record) 929 record := tangled.String{} 930 err = json.Unmarshal(raw, &record) 931 if err != nil { 932 l.Error("invalid record", "err", err) 933 return err 934 } 935 936 str, err := models.StringFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(e.Commit.CID), record) 937 if err != nil { 938 return fmt.Errorf("failed to parse string record: %w", err) 939 } 940 if err = str.Validate(); err != nil { 941 l.Error("invalid record", "err", err) 942 return err 943 } 944 945 if err = db.AddString(i.Db, str); err != nil { 946 l.Error("failed to add string", "err", err) 947 return err 948 } 949 950 l.Info("ingested record") 951 return nil 952 953 case jmodels.CommitOperationDelete: 954 if err := db.DeleteString( 955 i.Db, 956 orm.FilterEq("did", did), 957 orm.FilterEq("rkey", rkey), 958 ); err != nil { 959 l.Error("failed to delete", "err", err) 960 return fmt.Errorf("failed to delete string record: %w", err) 961 } 962 963 l.Info("ingested record") 964 return nil 965 } 966 967 return nil 968} 969 970func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 971 did := e.Did 972 var err error 973 974 l = l.With("handler", "ingestKnotMember") 975 976 switch e.Commit.Operation { 977 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 978 raw := json.RawMessage(e.Commit.Record) 979 record := tangled.KnotMember{} 980 err = json.Unmarshal(raw, &record) 981 if err != nil { 982 l.Error("invalid record", "err", err) 983 return err 984 } 985 986 // only knot owner can invite to knots 987 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 988 if err != nil { 989 return fmt.Errorf("failed to check invite permission: %w", err) 990 } 991 if !ok { 992 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 993 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 994 } 995 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 996 if err != nil { 997 return fmt.Errorf("failed to re-check invite permission: %w", err) 998 } 999 if !ok { 1000 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 1001 } 1002 } 1003 1004 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 1005 if err != nil { 1006 return err 1007 } 1008 1009 if memberId.Handle.IsInvalidHandle() { 1010 return fmt.Errorf("invalid handle for member %s", record.Subject) 1011 } 1012 1013 existing, err := db.GetKnotMembers(i.Db, 1014 orm.FilterEq("did", did), 1015 orm.FilterEq("rkey", e.Commit.RKey), 1016 ) 1017 if err != nil { 1018 return fmt.Errorf("failed to look up existing member: %w", err) 1019 } 1020 if len(existing) > 1 { 1021 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1022 } 1023 1024 tx, err := i.Db.Begin() 1025 if err != nil { 1026 return fmt.Errorf("failed to start txn: %w", err) 1027 } 1028 committed := false 1029 defer func() { 1030 if committed { 1031 return 1032 } 1033 tx.Rollback() 1034 i.Enforcer.E.LoadPolicy() 1035 }() 1036 1037 if len(existing) == 1 { 1038 prev := existing[0] 1039 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1040 if err = db.RemoveKnotMember(tx, 1041 orm.FilterEq("did", did), 1042 orm.FilterEq("rkey", e.Commit.RKey), 1043 ); err != nil { 1044 return fmt.Errorf("failed to remove stale row: %w", err) 1045 } 1046 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1047 return fmt.Errorf("failed to remove stale ACL: %w", err) 1048 } 1049 } 1050 } 1051 1052 if err = db.AddKnotMember(tx, models.KnotMember{ 1053 Did: syntax.DID(did), 1054 Rkey: e.Commit.RKey, 1055 Domain: record.Domain, 1056 Subject: memberId.DID, 1057 }); err != nil { 1058 return fmt.Errorf("failed to add to db: %w", err) 1059 } 1060 1061 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1062 return fmt.Errorf("failed to update ACLs: %w", err) 1063 } 1064 1065 if err = tx.Commit(); err != nil { 1066 return fmt.Errorf("failed to commit txn: %w", err) 1067 } 1068 1069 if err = i.Enforcer.E.SavePolicy(); err != nil { 1070 return fmt.Errorf("failed to save ACLs: %w", err) 1071 } 1072 committed = true 1073 1074 l.Info("upserted knot member") 1075 case jmodels.CommitOperationDelete: 1076 rkey := e.Commit.RKey 1077 1078 members, err := db.GetKnotMembers( 1079 i.Db, 1080 orm.FilterEq("did", did), 1081 orm.FilterEq("rkey", rkey), 1082 ) 1083 if err != nil { 1084 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1085 } 1086 if len(members) == 0 { 1087 l.Info("knot member already removed", "rkey", rkey) 1088 return nil 1089 } 1090 if len(members) > 1 { 1091 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1092 } 1093 member := members[0] 1094 1095 tx, err := i.Db.Begin() 1096 if err != nil { 1097 return fmt.Errorf("failed to start txn: %w", err) 1098 } 1099 committed := false 1100 defer func() { 1101 if committed { 1102 return 1103 } 1104 tx.Rollback() 1105 i.Enforcer.E.LoadPolicy() 1106 }() 1107 1108 if err = db.RemoveKnotMember( 1109 tx, 1110 orm.FilterEq("did", did), 1111 orm.FilterEq("rkey", rkey), 1112 ); err != nil { 1113 return fmt.Errorf("failed to remove from db: %w", err) 1114 } 1115 1116 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1117 return fmt.Errorf("failed to update ACLs: %w", err) 1118 } 1119 1120 if err = tx.Commit(); err != nil { 1121 return fmt.Errorf("failed to commit txn: %w", err) 1122 } 1123 1124 if err = i.Enforcer.E.SavePolicy(); err != nil { 1125 return fmt.Errorf("failed to save ACLs: %w", err) 1126 } 1127 committed = true 1128 1129 l.Info("removed knot member") 1130 } 1131 1132 return nil 1133} 1134 1135func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1136 did := e.Did 1137 var err error 1138 1139 l = l.With("handler", "ingestKnot") 1140 1141 switch e.Commit.Operation { 1142 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1143 raw := json.RawMessage(e.Commit.Record) 1144 record := tangled.Knot{} 1145 err = json.Unmarshal(raw, &record) 1146 if err != nil { 1147 l.Error("invalid record", "err", err) 1148 return err 1149 } 1150 1151 domain := e.Commit.RKey 1152 1153 err := db.AddKnot(i.Db, domain, did) 1154 if err != nil { 1155 l.Error("failed to add knot to db", "err", err, "domain", domain) 1156 return err 1157 } 1158 1159 if err := i.verifyKnot(ctx, domain, did); err != nil { 1160 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1161 } 1162 1163 l.Info("ingested record", "domain", domain) 1164 return nil 1165 1166 case jmodels.CommitOperationDelete: 1167 domain := e.Commit.RKey 1168 1169 // get record from db first 1170 registrations, err := db.GetRegistrations( 1171 i.Db, 1172 orm.FilterEq("domain", domain), 1173 orm.FilterEq("did", did), 1174 ) 1175 if err != nil { 1176 return fmt.Errorf("failed to get registration: %w", err) 1177 } 1178 if len(registrations) != 1 { 1179 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1180 } 1181 registration := registrations[0] 1182 1183 tx, err := i.Db.Begin() 1184 if err != nil { 1185 return fmt.Errorf("failed to start txn: %w", err) 1186 } 1187 defer func() { 1188 tx.Rollback() 1189 i.Enforcer.E.LoadPolicy() 1190 }() 1191 1192 err = db.RemoveKnotMember( 1193 tx, 1194 orm.FilterEq("did", did), 1195 orm.FilterEq("domain", domain), 1196 ) 1197 if err != nil { 1198 return fmt.Errorf("failed to remove knot members: %w", err) 1199 } 1200 1201 err = db.DeleteKnot( 1202 tx, 1203 orm.FilterEq("did", did), 1204 orm.FilterEq("domain", domain), 1205 ) 1206 if err != nil { 1207 return fmt.Errorf("failed to delete knot: %w", err) 1208 } 1209 1210 err = db.RemoveReposByKnot(tx, domain) 1211 if err != nil { 1212 return fmt.Errorf("failed to remove repos by knot: %w", err) 1213 } 1214 1215 if registration.Registered != nil { 1216 err = i.Enforcer.RemoveKnot(domain) 1217 if err != nil { 1218 return fmt.Errorf("failed to remove knot from enforcer: %w", err) 1219 } 1220 } 1221 1222 err = tx.Commit() 1223 if err != nil { 1224 return fmt.Errorf("failed to commit txn: %w", err) 1225 } 1226 1227 err = i.Enforcer.E.SavePolicy() 1228 if err != nil { 1229 return fmt.Errorf("failed to save ACLs: %w", err) 1230 } 1231 1232 l.Info("ingested record", "domain", domain) 1233 } 1234 1235 return nil 1236} 1237 1238const ( 1239 verifyAttempts = 4 1240 verifyMinDelay = 1 * time.Second 1241 verifyMaxDelay = 5 * time.Second 1242) 1243 1244func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1245 regs, err := db.GetRegistrations(i.Db, 1246 orm.FilterEq("domain", domain), 1247 orm.FilterEq("did", did), 1248 ) 1249 if err != nil { 1250 return fmt.Errorf("look up registration: %w", err) 1251 } 1252 if len(regs) != 1 { 1253 return fmt.Errorf("no registration for %s by %s", domain, did) 1254 } 1255 if regs[0].Registered != nil { 1256 return nil 1257 } 1258 1259 err = retry.Do( 1260 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1261 retry.Context(ctx), 1262 retry.Attempts(verifyAttempts), 1263 retry.Delay(verifyMinDelay), 1264 retry.MaxDelay(verifyMaxDelay), 1265 retry.DelayType(retry.BackOffDelay), 1266 retry.LastErrorOnly(true), 1267 ) 1268 if err != nil { 1269 return fmt.Errorf("verify: %w", err) 1270 } 1271 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1272} 1273 1274func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1275 spindles, err := db.GetSpindles(ctx, i.Db, 1276 orm.FilterEq("instance", instance), 1277 orm.FilterEq("owner", did), 1278 ) 1279 if err != nil { 1280 return fmt.Errorf("look up spindle: %w", err) 1281 } 1282 if len(spindles) != 1 { 1283 return fmt.Errorf("no spindle for %s by %s", instance, did) 1284 } 1285 if spindles[0].Verified != nil { 1286 return nil 1287 } 1288 1289 err = retry.Do( 1290 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1291 retry.Context(ctx), 1292 retry.Attempts(verifyAttempts), 1293 retry.Delay(verifyMinDelay), 1294 retry.MaxDelay(verifyMaxDelay), 1295 retry.DelayType(retry.BackOffDelay), 1296 retry.LastErrorOnly(true), 1297 ) 1298 if err != nil { 1299 return fmt.Errorf("verify: %w", err) 1300 } 1301 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1302 return err 1303} 1304 1305const sweepConcurrency = 4 1306 1307func (i *Ingester) SweepPendingVerifications() { 1308 l := i.Logger.With("handler", "SweepPendingVerifications") 1309 1310 var g errgroup.Group 1311 g.SetLimit(sweepConcurrency) 1312 1313 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1314 if err != nil { 1315 l.Error("failed to list unverified knots", "err", err) 1316 } else { 1317 for _, reg := range regs { 1318 g.Go(func() error { 1319 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1320 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1321 } 1322 return nil 1323 }) 1324 } 1325 } 1326 1327 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1328 if err != nil { 1329 l.Error("failed to list unverified spindles", "err", err) 1330 g.Wait() 1331 return 1332 } 1333 for _, s := range spindles { 1334 g.Go(func() error { 1335 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1336 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1337 } 1338 return nil 1339 }) 1340 } 1341 g.Wait() 1342} 1343 1344func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1345 did := e.Did 1346 rkey := e.Commit.RKey 1347 1348 var err error 1349 1350 l = l.With("handler", "ingestIssue") 1351 1352 switch e.Commit.Operation { 1353 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1354 raw := json.RawMessage(e.Commit.Record) 1355 record := tangled.RepoIssue{} 1356 err = json.Unmarshal(raw, &record) 1357 if err != nil { 1358 l.Error("invalid record", "err", err) 1359 return err 1360 } 1361 1362 issue := models.IssueFromRecord(did, rkey, record) 1363 1364 if issue.RepoDid == "" { 1365 return fmt.Errorf("issue record has no repo field") 1366 } 1367 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1368 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1369 } 1370 1371 if err := i.Validator.ValidateIssue(&issue); err != nil { 1372 return fmt.Errorf("failed to validate issue: %w", err) 1373 } 1374 1375 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1376 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1377 if repoErr == nil && repo.RepoDid != "" { 1378 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1379 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1380 } 1381 } 1382 } 1383 1384 tx, err := i.Db.BeginTx(ctx, nil) 1385 if err != nil { 1386 l.Error("failed to begin transaction", "err", err) 1387 return err 1388 } 1389 defer tx.Rollback() 1390 1391 err = db.PutIssue(tx, &issue) 1392 if err != nil { 1393 l.Error("failed to create issue", "err", err) 1394 return err 1395 } 1396 1397 err = tx.Commit() 1398 if err != nil { 1399 l.Error("failed to commit txn", "err", err) 1400 return err 1401 } 1402 1403 l.Info("ingested record") 1404 return nil 1405 1406 case jmodels.CommitOperationDelete: 1407 tx, err := i.Db.BeginTx(ctx, nil) 1408 if err != nil { 1409 l.Error("failed to begin transaction", "err", err) 1410 return err 1411 } 1412 defer tx.Rollback() 1413 1414 if err := db.DeleteIssues( 1415 tx, 1416 did, 1417 rkey, 1418 ); err != nil { 1419 l.Error("failed to delete", "err", err) 1420 return fmt.Errorf("failed to delete issue record: %w", err) 1421 } 1422 if err := tx.Commit(); err != nil { 1423 l.Error("failed to commit txn", "err", err) 1424 return err 1425 } 1426 1427 l.Info("ingested record") 1428 return nil 1429 } 1430 1431 return nil 1432} 1433 1434func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1435 did := e.Did 1436 rkey := e.Commit.RKey 1437 1438 var err error 1439 1440 l = l.With("handler", "ingestPull") 1441 1442 switch e.Commit.Operation { 1443 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1444 raw := json.RawMessage(e.Commit.Record) 1445 record := tangled.RepoPull{} 1446 err = json.Unmarshal(raw, &record) 1447 if err != nil { 1448 l.Error("invalid record", "err", err) 1449 return err 1450 } 1451 1452 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1453 if err != nil { 1454 l.Error("failed to resolve did", "err", err) 1455 return err 1456 } 1457 1458 // go through and fetch all blobs in parallel 1459 readers := make([]*io.ReadCloser, len(record.Rounds)) 1460 var mu sync.Mutex 1461 1462 g, gctx := errgroup.WithContext(ctx) 1463 1464 for idx, b := range record.Rounds { 1465 g.Go(func() error { 1466 // for some reason, a blob is empty 1467 if b.PatchBlob == nil { 1468 return fmt.Errorf("missing patchBlob in round %d", idx) 1469 } 1470 1471 ownerPds := ownerId.PDSEndpoint() 1472 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1473 q := url.Query() 1474 q.Set("cid", b.PatchBlob.Ref.String()) 1475 q.Set("did", did) 1476 url.RawQuery = q.Encode() 1477 1478 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1479 if err != nil { 1480 l.Error("failed to create request") 1481 return err 1482 } 1483 req.Header.Set("Content-Type", "application/json") 1484 1485 resp, err := http.DefaultClient.Do(req) 1486 if err != nil { 1487 l.Error("failed to make request") 1488 return err 1489 } 1490 1491 mu.Lock() 1492 readers[idx] = &resp.Body 1493 mu.Unlock() 1494 1495 return nil 1496 }) 1497 } 1498 1499 if err := g.Wait(); err != nil { 1500 for _, r := range readers { 1501 if r != nil && *r != nil { 1502 (*r).Close() 1503 } 1504 } 1505 return err 1506 } 1507 1508 defer func() { 1509 for _, r := range readers { 1510 if r != nil && *r != nil { 1511 (*r).Close() 1512 } 1513 } 1514 }() 1515 1516 pull, err := models.PullFromRecord(did, rkey, record, readers) 1517 if err != nil { 1518 return fmt.Errorf("failed to parse pull from record: %w", err) 1519 } 1520 if err := i.Validator.ValidatePull(pull); err != nil { 1521 return fmt.Errorf("failed to validate pull: %w", err) 1522 } 1523 1524 tx, err := i.Db.BeginTx(ctx, nil) 1525 if err != nil { 1526 l.Error("failed to begin transaction", "err", err) 1527 return err 1528 } 1529 defer tx.Rollback() 1530 1531 err = db.PutPull(tx, pull) 1532 if err != nil { 1533 l.Error("failed to create pull", "err", err) 1534 return err 1535 } 1536 1537 err = tx.Commit() 1538 if err != nil { 1539 l.Error("failed to commit txn", "err", err) 1540 return err 1541 } 1542 1543 l.Info("ingested record") 1544 return nil 1545 1546 case jmodels.CommitOperationDelete: 1547 tx, err := i.Db.BeginTx(ctx, nil) 1548 if err != nil { 1549 l.Error("failed to begin transaction", "err", err) 1550 return err 1551 } 1552 defer tx.Rollback() 1553 1554 if err := db.AbandonPulls( 1555 tx, 1556 orm.FilterEq("owner_did", did), 1557 orm.FilterEq("rkey", rkey), 1558 ); err != nil { 1559 l.Error("failed to abandon", "err", err) 1560 return fmt.Errorf("failed to abandon pull record: %w", err) 1561 } 1562 if err := tx.Commit(); err != nil { 1563 l.Error("failed to commit txn", "err", err) 1564 return err 1565 } 1566 1567 l.Info("ingested record") 1568 return nil 1569 } 1570 1571 return nil 1572} 1573 1574// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1575func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error { 1576 l = l.With("handler", "ingestIssueComment") 1577 1578 switch e.Commit.Operation { 1579 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1580 // no-op. sh.tangled.repo.issue.comment is deprecated 1581 1582 case jmodels.CommitOperationDelete: 1583 if err := db.PurgeComments( 1584 i.Db, 1585 orm.FilterEq("did", e.Did), 1586 orm.FilterEq("collection", e.Commit.Collection), 1587 orm.FilterEq("rkey", e.Commit.RKey), 1588 ); err != nil { 1589 return fmt.Errorf("failed to delete comment record: %w", err) 1590 } 1591 } 1592 1593 l.Info("ingested record") 1594 return nil 1595} 1596 1597// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1598func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error { 1599 l = l.With("handler", "ingestPullComment") 1600 1601 switch e.Commit.Operation { 1602 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1603 // no-op. sh.tangled.repo.pull.comment is deprecated 1604 1605 case jmodels.CommitOperationDelete: 1606 if err := db.PurgeComments( 1607 i.Db, 1608 orm.FilterEq("did", e.Did), 1609 orm.FilterEq("collection", e.Commit.Collection), 1610 orm.FilterEq("rkey", e.Commit.RKey), 1611 ); err != nil { 1612 return fmt.Errorf("failed to delete comment record: %w", err) 1613 } 1614 } 1615 1616 l.Info("ingested record") 1617 return nil 1618} 1619 1620func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error { 1621 did := e.Did 1622 rkey := e.Commit.RKey 1623 cid := e.Commit.CID 1624 1625 var err error 1626 1627 l = l.With("handler", "ingestComment") 1628 1629 ctx := context.Background() 1630 1631 switch e.Commit.Operation { 1632 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1633 raw := json.RawMessage(e.Commit.Record) 1634 record := tangled.FeedComment{} 1635 err = json.Unmarshal(raw, &record) 1636 if err != nil { 1637 return fmt.Errorf("invalid record: %w", err) 1638 } 1639 1640 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1641 if err != nil { 1642 return fmt.Errorf("failed to parse comment from record: %w", err) 1643 } 1644 1645 if err := comment.Validate(); err != nil { 1646 return fmt.Errorf("failed to validate comment: %w", err) 1647 } 1648 1649 var references []syntax.ATURI 1650 if comment.Body.Original != nil { 1651 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1652 } 1653 1654 tx, err := i.Db.Begin() 1655 if err != nil { 1656 return fmt.Errorf("failed to start transaction: %w", err) 1657 } 1658 defer tx.Rollback() 1659 1660 _, err = db.PutComment(tx, comment, references) 1661 if err != nil { 1662 return fmt.Errorf("failed to create comment: %w", err) 1663 } 1664 1665 if err := tx.Commit(); err != nil { 1666 return err 1667 } 1668 1669 case jmodels.CommitOperationDelete: 1670 if err := db.DeleteComments( 1671 i.Db, 1672 orm.FilterEq("did", did), 1673 orm.FilterEq("collection", e.Commit.Collection), 1674 orm.FilterEq("rkey", rkey), 1675 ); err != nil { 1676 return fmt.Errorf("failed to delete comment record: %w", err) 1677 } 1678 } 1679 1680 l.Info("ingested record") 1681 return nil 1682} 1683 1684func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error { 1685 did := e.Did 1686 rkey := e.Commit.RKey 1687 1688 l = l.With("handler", "ingestReaction") 1689 1690 switch e.Commit.Operation { 1691 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1692 raw := json.RawMessage(e.Commit.Record) 1693 record := tangled.FeedReaction{} 1694 if err := json.Unmarshal(raw, &record); err != nil { 1695 return fmt.Errorf("invalid record: %w", err) 1696 } 1697 1698 subjectUri, err := syntax.ParseATURI(record.Subject) 1699 if err != nil { 1700 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1701 } 1702 subjectUri = models.NormalizeReactionSubject(subjectUri) 1703 1704 kind, ok := models.ParseReactionKind(record.Reaction) 1705 if !ok { 1706 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1707 } 1708 1709 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1710 if parseErr != nil { 1711 created = time.Now() 1712 } 1713 1714 tx, err := i.Db.Begin() 1715 if err != nil { 1716 return fmt.Errorf("failed to start transaction: %w", err) 1717 } 1718 defer tx.Rollback() 1719 1720 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1721 return fmt.Errorf("failed to clear existing reaction: %w", err) 1722 } 1723 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1724 return fmt.Errorf("failed to add reaction: %w", err) 1725 } 1726 1727 if err := tx.Commit(); err != nil { 1728 return err 1729 } 1730 1731 case jmodels.CommitOperationDelete: 1732 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1733 return fmt.Errorf("failed to delete reaction record: %w", err) 1734 } 1735 } 1736 1737 l.Info("ingested record") 1738 return nil 1739} 1740 1741func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error { 1742 did := e.Did 1743 rkey := e.Commit.RKey 1744 1745 var err error 1746 1747 l = l.With("handler", "ingestLabelDefinition") 1748 1749 switch e.Commit.Operation { 1750 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1751 raw := json.RawMessage(e.Commit.Record) 1752 record := tangled.LabelDefinition{} 1753 err = json.Unmarshal(raw, &record) 1754 if err != nil { 1755 return fmt.Errorf("invalid record: %w", err) 1756 } 1757 1758 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1759 if err != nil { 1760 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1761 } 1762 1763 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1764 return fmt.Errorf("failed to validate labeldef: %w", err) 1765 } 1766 1767 _, err = db.AddLabelDefinition(i.Db, def) 1768 if err != nil { 1769 return fmt.Errorf("failed to create labeldef: %w", err) 1770 } 1771 1772 l.Info("ingested record") 1773 return nil 1774 1775 case jmodels.CommitOperationDelete: 1776 if err := db.DeleteLabelDefinition( 1777 i.Db, 1778 orm.FilterEq("did", did), 1779 orm.FilterEq("rkey", rkey), 1780 ); err != nil { 1781 return fmt.Errorf("failed to delete labeldef record: %w", err) 1782 } 1783 1784 l.Info("ingested record") 1785 return nil 1786 } 1787 1788 return nil 1789} 1790 1791func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1792 did := e.Did 1793 rkey := e.Commit.RKey 1794 1795 var err error 1796 1797 l = l.With("handler", "ingestLabelOp") 1798 1799 switch e.Commit.Operation { 1800 case jmodels.CommitOperationCreate: 1801 raw := json.RawMessage(e.Commit.Record) 1802 record := tangled.LabelOp{} 1803 err = json.Unmarshal(raw, &record) 1804 if err != nil { 1805 return fmt.Errorf("invalid record: %w", err) 1806 } 1807 1808 subject := syntax.ATURI(record.Subject) 1809 collection := subject.Collection() 1810 1811 var repo *models.Repo 1812 switch collection { 1813 case tangled.RepoIssueNSID: 1814 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1815 if err != nil || len(i) != 1 { 1816 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1817 } 1818 repo = i[0].Repo 1819 case tangled.RepoPullNSID: 1820 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1821 if err != nil || len(p) != 1 { 1822 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1823 } 1824 repo = p[0].Repo 1825 default: 1826 return fmt.Errorf("unsupported label subject: %s", collection) 1827 } 1828 1829 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1830 if err != nil { 1831 return fmt.Errorf("failed to build label application ctx: %w", err) 1832 } 1833 1834 ops := models.LabelOpsFromRecord(did, rkey, record) 1835 1836 for _, o := range ops { 1837 def, ok := actx.Defs[o.OperandKey] 1838 if !ok { 1839 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1840 } 1841 if err := i.Validator.ValidateLabelOp(ctx, def, repo, &o); err != nil { 1842 if !errors.Is(err, knotacl.ErrKnotUnreachable) { 1843 return fmt.Errorf("failed to validate labelop: %w", err) 1844 } 1845 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", err) 1846 } 1847 } 1848 1849 tx, err := i.Db.Begin() 1850 if err != nil { 1851 return err 1852 } 1853 defer tx.Rollback() 1854 1855 for _, o := range ops { 1856 _, err = db.AddLabelOp(tx, &o) 1857 if err != nil { 1858 return fmt.Errorf("failed to add labelop: %w", err) 1859 } 1860 } 1861 1862 if err = tx.Commit(); err != nil { 1863 return err 1864 } 1865 1866 l.Info("ingested record") 1867 } 1868 1869 return nil 1870}