Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "compress/gzip" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "log/slog" 12 "maps" 13 "net/http" 14 "net/url" 15 "slices" 16 "strings" 17 "sync" 18 19 "time" 20 21 "github.com/avast/retry-go/v4" 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 jmodels "github.com/bluesky-social/jetstream/pkg/models" 24 "github.com/go-git/go-git/v5/plumbing" 25 "github.com/ipfs/go-cid" 26 "golang.org/x/sync/errgroup" 27 "tangled.org/core/api/tangled" 28 "tangled.org/core/appview/cache" 29 "tangled.org/core/appview/config" 30 "tangled.org/core/appview/db" 31 "tangled.org/core/appview/knotacl" 32 "tangled.org/core/appview/mentions" 33 "tangled.org/core/appview/models" 34 "tangled.org/core/appview/notify" 35 "tangled.org/core/appview/repoverify" 36 "tangled.org/core/appview/serververify" 37 "tangled.org/core/appview/validator" 38 "tangled.org/core/blobstore" 39 "tangled.org/core/idresolver" 40 "tangled.org/core/orm" 41 "tangled.org/core/rbac" 42) 43 44type Ingester struct { 45 Ctx context.Context 46 Db *db.DB 47 Enforcer *rbac.Enforcer 48 Acl *knotacl.Service 49 IdResolver *idresolver.Resolver 50 BlobStore blobstore.BlobStore 51 Cache *cache.Cache 52 Config *config.Config 53 Logger *slog.Logger 54 Validator *validator.Validator 55 MentionsResolver *mentions.Resolver 56 Notifier notify.Notifier 57 Verifier repoverify.Verifier 58} 59 60type processFunc func(ctx context.Context, e *jmodels.Event) error 61 62func (i *Ingester) Ingest() processFunc { 63 return func(ctx context.Context, e *jmodels.Event) error { 64 var err error 65 66 l := i.Logger.With("kind", e.Kind) 67 switch e.Kind { 68 case jmodels.EventKindAccount: 69 // TODO: sync account state to db 70 if e.Account.Active { 71 break 72 } 73 // TODO: revoke sessions by DID 74 if *e.Account.Status == "deactivated" { 75 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 76 } 77 case jmodels.EventKindIdentity: 78 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 79 case jmodels.EventKindCommit: 80 l = l.With( 81 "nsid", e.Commit.Collection, 82 "did", e.Did, 83 "rkey", e.Commit.RKey, 84 "cid", e.Commit.CID, 85 "op", e.Commit.Operation, 86 ) 87 switch e.Commit.Collection { 88 case tangled.GraphFollowNSID: 89 err = i.ingestFollow(e, l) 90 case tangled.GraphVouchNSID: 91 err = i.ingestVouch(ctx, e, l) 92 case tangled.FeedStarNSID: 93 err = i.ingestStar(ctx, e, l) 94 case tangled.FeedReactionNSID: 95 err = i.ingestReaction(e, l) 96 case tangled.PublicKeyNSID: 97 err = i.ingestPublicKey(e, l) 98 case tangled.RepoArtifactNSID: 99 err = i.ingestArtifact(ctx, e, l) 100 case tangled.ActorProfileNSID: 101 err = i.ingestProfile(ctx, e, l) 102 case tangled.SpindleMemberNSID: 103 err = i.ingestSpindleMember(ctx, e, l) 104 case tangled.SpindleNSID: 105 err = i.ingestSpindle(ctx, e, l) 106 case tangled.KnotMemberNSID: 107 err = i.ingestKnotMember(ctx, e, l) 108 case tangled.KnotNSID: 109 err = i.ingestKnot(ctx, e, l) 110 case tangled.StringNSID: 111 err = i.ingestString(ctx, e, l) 112 case tangled.RepoIssueNSID: 113 err = i.ingestIssue(ctx, e, l) 114 case tangled.RepoPullNSID: 115 err = i.ingestPull(ctx, e, l) 116 case tangled.FeedCommentNSID: 117 err = i.ingestComment(e, l) 118 case tangled.RepoIssueCommentNSID: 119 err = i.ingestIssueComment(e, l) 120 case tangled.RepoPullCommentNSID: 121 err = i.ingestPullComment(e, l) 122 case tangled.LabelDefinitionNSID: 123 err = i.ingestLabelDefinition(e, l) 124 case tangled.LabelOpNSID: 125 err = i.ingestLabelOp(ctx, e, l) 126 case tangled.RepoNSID: 127 err = i.ingestRepo(ctx, e, l) 128 } 129 } 130 131 if err != nil { 132 l.Warn("failed to ingest record, skipping", "err", err) 133 } 134 135 lastTimeUs := e.TimeUS + 1 136 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 137 l.Error("failed to save cursor", "err", saveErr) 138 } 139 140 return nil 141 } 142} 143 144func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 145 if strings.HasPrefix(ref, "did:") { 146 return db.GetRepoByDid(i.Db, ref) 147 } 148 return db.GetRepoByAtUri(i.Db, ref) 149} 150 151func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 152 var legacy struct { 153 Subject *string `json:"subject"` 154 SubjectDid *string `json:"subjectDid"` 155 } 156 if err := json.Unmarshal(raw, &legacy); err != nil { 157 return false, err 158 } 159 160 switch { 161 case legacy.SubjectDid != nil: 162 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 163 if err != nil { 164 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 165 return false, nil 166 } 167 star.SubjectType = models.StarSubjectRepo 168 star.Subject = repo.RepoDid 169 return true, nil 170 171 case legacy.Subject != nil: 172 uri, err := syntax.ParseATURI(*legacy.Subject) 173 if err != nil { 174 return false, fmt.Errorf("invalid old-format star subject: %w", err) 175 } 176 switch uri.Collection().String() { 177 case tangled.RepoNSID: 178 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 179 if err != nil { 180 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 181 return false, nil 182 } 183 star.SubjectType = models.StarSubjectRepo 184 star.Subject = repo.RepoDid 185 return true, nil 186 default: 187 star.SubjectType = models.StarSubjectString 188 star.Subject = *legacy.Subject 189 return true, nil 190 } 191 192 default: 193 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 194 } 195} 196 197func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 198 var err error 199 did := e.Did 200 201 l = l.With("handler", "ingestStar") 202 203 switch e.Commit.Operation { 204 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 205 raw := json.RawMessage(e.Commit.Record) 206 record := tangled.FeedStar{} 207 unmarshalErr := json.Unmarshal(raw, &record) 208 209 star := &models.Star{ 210 Did: did, 211 Rkey: e.Commit.RKey, 212 } 213 214 switch { 215 case unmarshalErr != nil: 216 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 217 if resolveErr != nil { 218 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 219 return unmarshalErr 220 } 221 if !resolved { 222 return nil 223 } 224 225 case record.Subject == nil: 226 return fmt.Errorf("star record has nil subject") 227 228 case record.Subject.FeedStar_Repo != nil: 229 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 230 if repoErr != nil { 231 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 232 return nil 233 } 234 star.SubjectType = models.StarSubjectRepo 235 star.Subject = repo.RepoDid 236 237 case record.Subject.FeedStar_String != nil: 238 star.SubjectType = models.StarSubjectString 239 star.Subject = record.Subject.FeedStar_String.Uri 240 241 default: 242 return fmt.Errorf("star record has empty subject union") 243 } 244 245 err = db.AddStar(i.Db, star) 246 case jmodels.CommitOperationDelete: 247 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 248 } 249 250 if err != nil { 251 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 252 } 253 254 l.Info("ingested record") 255 return nil 256} 257 258func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error { 259 var err error 260 did := e.Did 261 262 l = l.With("handler", "ingestFollow") 263 264 switch e.Commit.Operation { 265 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 266 raw := json.RawMessage(e.Commit.Record) 267 record := tangled.GraphFollow{} 268 err = json.Unmarshal(raw, &record) 269 if err != nil { 270 l.Error("invalid record", "err", err) 271 return err 272 } 273 274 err = db.AddFollow(i.Db, &models.Follow{ 275 UserDid: did, 276 SubjectDid: record.Subject, 277 Rkey: e.Commit.RKey, 278 }) 279 case jmodels.CommitOperationDelete: 280 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 281 } 282 283 if err != nil { 284 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 285 } 286 287 l.Info("ingested record") 288 return nil 289} 290 291func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 292 var err error 293 did := e.Did 294 295 l = l.With("handler", "ingestVouch") 296 297 switch e.Commit.Operation { 298 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 299 raw := json.RawMessage(e.Commit.Record) 300 record := tangled.GraphVouch{} 301 err = json.Unmarshal(raw, &record) 302 if err != nil { 303 l.Error("invalid record", "err", err) 304 return err 305 } 306 307 // rkey is the subject_did being vouched for/denounced 308 subjectDID := e.Commit.RKey 309 310 _, err = syntax.ParseDID(subjectDID) 311 if err != nil { 312 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 313 return fmt.Errorf("invalid subject_did: %w", err) 314 } 315 316 if did == subjectDID { 317 l.Warn("attempted self-vouch", "did", did) 318 return fmt.Errorf("cannot vouch for self") 319 } 320 321 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 322 if err != nil { 323 return err 324 } 325 326 if subjectId.Handle.IsInvalidHandle() { 327 return err 328 } 329 330 kind, err := models.ParseVouchKind(record.Kind) 331 if err != nil { 332 l.Error("invalid kind", "kind", kind) 333 return fmt.Errorf("invalid kind: %s", kind) 334 } 335 336 recordCid, err := cid.Parse(e.Commit.CID) 337 if err != nil { 338 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 339 return fmt.Errorf("invalid cid: %w", err) 340 } 341 342 var evidences []syntax.ATURI 343 for _, raw := range record.Evidences { 344 uri, parseErr := syntax.ParseATURI(raw) 345 if parseErr != nil { 346 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 347 continue 348 } 349 evidences = append(evidences, uri) 350 } 351 352 tx, txErr := i.Db.Begin() 353 if txErr != nil { 354 return fmt.Errorf("failed to start transaction: %w", txErr) 355 } 356 357 addErr := db.AddVouch(tx, &models.Vouch{ 358 Did: syntax.DID(did), 359 SubjectDid: subjectId.DID, 360 Cid: recordCid, 361 Kind: kind, 362 Reason: record.Reason, 363 Evidences: evidences, 364 }) 365 if addErr != nil { 366 tx.Rollback() 367 err = addErr 368 } else { 369 err = tx.Commit() 370 } 371 372 case jmodels.CommitOperationDelete: 373 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 374 } 375 376 if err != nil { 377 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 378 } 379 380 l.Info("ingested record") 381 return nil 382} 383 384func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error { 385 did := e.Did 386 var err error 387 388 l = l.With("handler", "ingestPublicKey") 389 390 switch e.Commit.Operation { 391 case jmodels.CommitOperationCreate: 392 l.Debug("processing add of pubkey") 393 raw := json.RawMessage(e.Commit.Record) 394 record := tangled.PublicKey{} 395 err = json.Unmarshal(raw, &record) 396 if err != nil { 397 l.Error("invalid record", "err", err) 398 return err 399 } 400 401 name := record.Name 402 key := record.Key 403 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 404 case jmodels.CommitOperationUpdate: 405 l.Debug("processing update of pubkey") 406 raw := json.RawMessage(e.Commit.Record) 407 record := tangled.PublicKey{} 408 err = json.Unmarshal(raw, &record) 409 if err != nil { 410 l.Error("invalid record", "err", err) 411 return err 412 } 413 414 name := record.Name 415 key := record.Key 416 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 417 case jmodels.CommitOperationDelete: 418 l.Debug("processing delete of pubkey") 419 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 420 } 421 422 if err != nil { 423 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 424 } 425 426 l.Info("ingested record") 427 return nil 428} 429 430func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 431 did := e.Did 432 var err error 433 434 l = l.With("handler", "ingestArtifact") 435 436 switch e.Commit.Operation { 437 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 438 raw := json.RawMessage(e.Commit.Record) 439 record := tangled.RepoArtifact{} 440 err = json.Unmarshal(raw, &record) 441 if err != nil { 442 l.Error("invalid record", "err", err) 443 return err 444 } 445 446 var repo *models.Repo 447 if record.RepoDid != nil && *record.RepoDid != "" { 448 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 449 if err != nil && !errors.Is(err, sql.ErrNoRows) { 450 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 451 } 452 } 453 if repo == nil && record.Repo != nil { 454 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 455 if parseErr != nil { 456 return parseErr 457 } 458 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 459 if err != nil { 460 return err 461 } 462 } 463 if repo == nil { 464 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 465 } 466 467 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push") 468 if permErr != nil { 469 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr) 470 } else if !allowed { 471 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier()) 472 return nil 473 } 474 475 repoDid := repo.RepoDid 476 if repoDid == "" && record.RepoDid != nil { 477 repoDid = *record.RepoDid 478 } 479 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 480 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 481 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 482 } 483 } 484 485 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 486 if parseErr != nil { 487 createdAt = time.Now() 488 } 489 490 artifact := models.Artifact{ 491 Did: did, 492 Rkey: e.Commit.RKey, 493 RepoDid: syntax.DID(repo.RepoDid), 494 Tag: plumbing.Hash(record.Tag), 495 CreatedAt: createdAt, 496 BlobCid: cid.Cid(record.Artifact.Ref), 497 Name: record.Name, 498 Size: uint64(record.Artifact.Size), 499 MimeType: record.Artifact.MimeType, 500 } 501 502 err = db.AddArtifact(i.Db, artifact) 503 case jmodels.CommitOperationDelete: 504 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 505 } 506 507 if err != nil { 508 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 509 } 510 511 l.Info("ingested record") 512 return nil 513} 514 515func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 516 did := e.Did 517 var err error 518 519 l = l.With("handler", "ingestProfile") 520 521 if e.Commit.RKey != "self" { 522 return fmt.Errorf("ingestProfile only ingests `self` record") 523 } 524 525 switch e.Commit.Operation { 526 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 527 raw := json.RawMessage(e.Commit.Record) 528 record := tangled.ActorProfile{} 529 err = json.Unmarshal(raw, &record) 530 if err != nil { 531 l.Error("invalid record", "err", err) 532 return err 533 } 534 535 avatar := "" 536 if record.Avatar != nil { 537 avatar = record.Avatar.Ref.String() 538 } 539 540 description := "" 541 if record.Description != nil { 542 description = *record.Description 543 } 544 545 includeBluesky := record.Bluesky 546 547 pronouns := "" 548 if record.Pronouns != nil { 549 pronouns = *record.Pronouns 550 } 551 552 location := "" 553 if record.Location != nil { 554 location = *record.Location 555 } 556 557 var links [5]string 558 for i, l := range record.Links { 559 if i < 5 { 560 links[i] = l 561 } 562 } 563 564 var stats [2]models.VanityStat 565 for i, s := range record.Stats { 566 if i < 2 { 567 stats[i].Kind = models.ParseVanityStatKind(s) 568 } 569 } 570 571 var pinned [6]string 572 for i, r := range record.PinnedRepositories { 573 if i < 6 { 574 pinned[i] = r 575 } 576 } 577 578 var preferredHandle syntax.Handle 579 if record.PreferredHandle != nil { 580 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 581 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 582 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 583 preferredHandle = h 584 } 585 } 586 } 587 588 profile := models.Profile{ 589 Did: did, 590 Avatar: avatar, 591 Description: description, 592 IncludeBluesky: includeBluesky, 593 Location: location, 594 Links: links, 595 Stats: stats, 596 PinnedRepos: pinned, 597 Pronouns: pronouns, 598 PreferredHandle: preferredHandle, 599 } 600 601 tx, err := i.Db.Begin() 602 if err != nil { 603 return fmt.Errorf("failed to start transaction: %w", err) 604 } 605 606 err = db.ValidateProfile(tx, &profile) 607 if err != nil { 608 return fmt.Errorf("invalid profile record") 609 } 610 611 err = db.UpsertProfile(tx, &profile) 612 if err == nil && i.Cache != nil { 613 pipe := i.Cache.Pipeline() 614 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 615 if preferredHandle != "" { 616 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 617 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 618 } else { 619 pipe.Del(ctx, didKey) 620 } 621 if _, execErr := pipe.Exec(ctx); execErr != nil { 622 l.Warn("failed to update preferred handle cache", "err", execErr) 623 } 624 } 625 case jmodels.CommitOperationDelete: 626 tx, beginErr := i.Db.Begin() 627 if beginErr != nil { 628 return fmt.Errorf("failed to start transaction: %w", beginErr) 629 } 630 631 priorHandle, phErr := db.GetPreferredHandle(tx, did) 632 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 633 l.Warn("failed to read prior preferred handle", "err", phErr) 634 } 635 636 err = db.DeleteProfile(tx, did) 637 if err == nil && i.Cache != nil { 638 pipe := i.Cache.Pipeline() 639 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 640 if priorHandle != "" { 641 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 642 } 643 if _, execErr := pipe.Exec(ctx); execErr != nil { 644 l.Warn("failed to evict preferred handle cache", "err", execErr) 645 } 646 } 647 } 648 649 if err != nil { 650 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 651 } 652 653 l.Info("ingested record") 654 return nil 655} 656 657func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 658 did := e.Did 659 var err error 660 661 l = l.With("handler", "ingestSpindleMember") 662 663 switch e.Commit.Operation { 664 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 665 raw := json.RawMessage(e.Commit.Record) 666 record := tangled.SpindleMember{} 667 err = json.Unmarshal(raw, &record) 668 if err != nil { 669 l.Error("invalid record", "err", err) 670 return err 671 } 672 673 // only spindle owner can invite to spindles 674 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 675 if err != nil { 676 return fmt.Errorf("failed to check invite permission: %w", err) 677 } 678 if !ok { 679 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 680 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 681 } 682 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 683 if err != nil { 684 return fmt.Errorf("failed to re-check invite permission: %w", err) 685 } 686 if !ok { 687 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 688 } 689 } 690 691 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 692 if err != nil { 693 return err 694 } 695 696 if memberId.Handle.IsInvalidHandle() { 697 return fmt.Errorf("invalid handle for member %s", record.Subject) 698 } 699 700 existing, err := db.GetSpindleMembers(i.Db, 701 orm.FilterEq("did", did), 702 orm.FilterEq("rkey", e.Commit.RKey), 703 ) 704 if err != nil { 705 return fmt.Errorf("failed to look up existing member: %w", err) 706 } 707 if len(existing) > 1 { 708 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 709 } 710 711 tx, err := i.Db.Begin() 712 if err != nil { 713 return fmt.Errorf("failed to start txn: %w", err) 714 } 715 committed := false 716 defer func() { 717 if committed { 718 return 719 } 720 tx.Rollback() 721 i.Enforcer.E.LoadPolicy() 722 }() 723 724 if len(existing) == 1 { 725 prev := existing[0] 726 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 727 if err = db.RemoveSpindleMember(tx, 728 orm.FilterEq("did", did), 729 orm.FilterEq("rkey", e.Commit.RKey), 730 ); err != nil { 731 return fmt.Errorf("failed to remove stale row: %w", err) 732 } 733 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 734 return fmt.Errorf("failed to remove stale ACL: %w", err) 735 } 736 } 737 } 738 739 if err = db.AddSpindleMember(tx, models.SpindleMember{ 740 Did: syntax.DID(did), 741 Rkey: e.Commit.RKey, 742 Instance: record.Instance, 743 Subject: memberId.DID, 744 }); err != nil { 745 return fmt.Errorf("failed to add to db: %w", err) 746 } 747 748 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 749 return fmt.Errorf("failed to update ACLs: %w", err) 750 } 751 752 if err = tx.Commit(); err != nil { 753 return fmt.Errorf("failed to commit txn: %w", err) 754 } 755 756 if err = i.Enforcer.E.SavePolicy(); err != nil { 757 return fmt.Errorf("failed to save ACLs: %w", err) 758 } 759 committed = true 760 761 l.Info("upserted spindle member") 762 case jmodels.CommitOperationDelete: 763 rkey := e.Commit.RKey 764 765 // get record from db first 766 members, err := db.GetSpindleMembers( 767 i.Db, 768 orm.FilterEq("did", did), 769 orm.FilterEq("rkey", rkey), 770 ) 771 if err != nil || len(members) != 1 { 772 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 773 } 774 member := members[0] 775 776 tx, err := i.Db.Begin() 777 if err != nil { 778 return fmt.Errorf("failed to start txn: %w", err) 779 } 780 committed := false 781 defer func() { 782 if committed { 783 return 784 } 785 tx.Rollback() 786 i.Enforcer.E.LoadPolicy() 787 }() 788 789 // remove record by rkey && update enforcer 790 if err = db.RemoveSpindleMember( 791 tx, 792 orm.FilterEq("did", did), 793 orm.FilterEq("rkey", rkey), 794 ); err != nil { 795 return fmt.Errorf("failed to remove from db: %w", err) 796 } 797 798 // update enforcer 799 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 800 if err != nil { 801 return fmt.Errorf("failed to update ACLs: %w", err) 802 } 803 804 if err = tx.Commit(); err != nil { 805 return fmt.Errorf("failed to commit txn: %w", err) 806 } 807 808 if err = i.Enforcer.E.SavePolicy(); err != nil { 809 return fmt.Errorf("failed to save ACLs: %w", err) 810 } 811 committed = true 812 813 l.Info("removed spindle member") 814 } 815 816 return nil 817} 818 819func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 820 did := e.Did 821 var err error 822 823 l = l.With("handler", "ingestSpindle") 824 825 switch e.Commit.Operation { 826 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 827 raw := json.RawMessage(e.Commit.Record) 828 record := tangled.Spindle{} 829 err = json.Unmarshal(raw, &record) 830 if err != nil { 831 l.Error("invalid record", "err", err) 832 return err 833 } 834 835 instance := e.Commit.RKey 836 837 err := db.AddSpindle(i.Db, models.Spindle{ 838 Owner: syntax.DID(did), 839 Instance: instance, 840 }) 841 if err != nil { 842 l.Error("failed to add spindle to db", "err", err, "instance", instance) 843 return err 844 } 845 846 if err := i.verifySpindle(ctx, instance, did); err != nil { 847 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 848 } 849 850 l.Info("ingested record", "instance", instance) 851 return nil 852 853 case jmodels.CommitOperationDelete: 854 instance := e.Commit.RKey 855 856 // get record from db first 857 spindles, err := db.GetSpindles( 858 ctx, 859 i.Db, 860 orm.FilterEq("owner", did), 861 orm.FilterEq("instance", instance), 862 ) 863 if err != nil || len(spindles) != 1 { 864 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 865 } 866 spindle := spindles[0] 867 868 tx, err := i.Db.Begin() 869 if err != nil { 870 return fmt.Errorf("failed to start txn: %w", err) 871 } 872 defer func() { 873 tx.Rollback() 874 i.Enforcer.E.LoadPolicy() 875 }() 876 877 // remove spindle members first 878 err = db.RemoveSpindleMember( 879 tx, 880 orm.FilterEq("owner", did), 881 orm.FilterEq("instance", instance), 882 ) 883 if err != nil { 884 return fmt.Errorf("failed to remove spindle members: %w", err) 885 } 886 887 err = db.DeleteSpindle( 888 tx, 889 orm.FilterEq("owner", did), 890 orm.FilterEq("instance", instance), 891 ) 892 if err != nil { 893 return fmt.Errorf("failed to delete spindle: %w", err) 894 } 895 896 if spindle.Verified != nil { 897 err = i.Enforcer.RemoveSpindle(instance) 898 if err != nil { 899 return fmt.Errorf("failed to remove spindle from enforcer: %w", err) 900 } 901 } 902 903 err = tx.Commit() 904 if err != nil { 905 return fmt.Errorf("failed to commit txn: %w", err) 906 } 907 908 err = i.Enforcer.E.SavePolicy() 909 if err != nil { 910 return fmt.Errorf("failed to save ACLs: %w", err) 911 } 912 913 l.Info("ingested record", "instance", instance) 914 } 915 916 return nil 917} 918 919func (i *Ingester) ingestString(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 920 did := e.Did 921 rkey := e.Commit.RKey 922 923 var err error 924 925 l = l.With("handler", "ingestString") 926 927 switch e.Commit.Operation { 928 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 929 raw := json.RawMessage(e.Commit.Record) 930 record := tangled.String{} 931 err = json.Unmarshal(raw, &record) 932 if err != nil { 933 l.Error("invalid record", "err", err) 934 return err 935 } 936 937 str, err := models.StringFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(e.Commit.CID), record) 938 if err != nil { 939 return fmt.Errorf("failed to parse string record: %w", err) 940 } 941 if err = str.Validate(); err != nil { 942 l.Error("invalid record", "err", err) 943 return err 944 } 945 946 g, gctx := errgroup.WithContext(ctx) 947 for idx, file := range str.Files { 948 if file.Gzip == nil { 949 continue 950 } 951 g.Go(func() error { 952 blob, err := i.BlobStore.GetBlob(gctx, str.Did, cid.Cid(file.Content.Ref)) 953 if err != nil { 954 return fmt.Errorf("files[%d]: failed to fetch blob: %w", idx, err) 955 } 956 defer blob.Close() 957 gzr, err := gzip.NewReader(blob) 958 if err != nil { 959 return fmt.Errorf("files[%d]: invalid gzip stream: %w", idx, err) 960 } 961 gzr.Close() 962 963 content, err := io.ReadAll(gzr) 964 if err != nil { 965 return fmt.Errorf("files[%d]: failed to read blob: %w", idx, err) 966 } 967 file.Gzip.Content = string(content) 968 str.Files[idx] = file 969 return nil 970 }) 971 } 972 if err := g.Wait(); err != nil { 973 return err 974 } 975 976 if err = db.AddString(i.Db, str); err != nil { 977 l.Error("failed to add string", "err", err) 978 return err 979 } 980 981 l.Info("ingested record") 982 return nil 983 984 case jmodels.CommitOperationDelete: 985 if err := db.DeleteString( 986 i.Db, 987 orm.FilterEq("did", did), 988 orm.FilterEq("rkey", rkey), 989 ); err != nil { 990 l.Error("failed to delete", "err", err) 991 return fmt.Errorf("failed to delete string record: %w", err) 992 } 993 994 l.Info("ingested record") 995 return nil 996 } 997 998 return nil 999} 1000 1001func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1002 did := e.Did 1003 var err error 1004 1005 l = l.With("handler", "ingestKnotMember") 1006 1007 switch e.Commit.Operation { 1008 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1009 raw := json.RawMessage(e.Commit.Record) 1010 record := tangled.KnotMember{} 1011 err = json.Unmarshal(raw, &record) 1012 if err != nil { 1013 l.Error("invalid record", "err", err) 1014 return err 1015 } 1016 1017 // only knot owner can invite to knots 1018 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 1019 if err != nil { 1020 return fmt.Errorf("failed to check invite permission: %w", err) 1021 } 1022 if !ok { 1023 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 1024 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 1025 } 1026 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 1027 if err != nil { 1028 return fmt.Errorf("failed to re-check invite permission: %w", err) 1029 } 1030 if !ok { 1031 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 1032 } 1033 } 1034 1035 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 1036 if err != nil { 1037 return err 1038 } 1039 1040 if memberId.Handle.IsInvalidHandle() { 1041 return fmt.Errorf("invalid handle for member %s", record.Subject) 1042 } 1043 1044 existing, err := db.GetKnotMembers(i.Db, 1045 orm.FilterEq("did", did), 1046 orm.FilterEq("rkey", e.Commit.RKey), 1047 ) 1048 if err != nil { 1049 return fmt.Errorf("failed to look up existing member: %w", err) 1050 } 1051 if len(existing) > 1 { 1052 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1053 } 1054 1055 tx, err := i.Db.Begin() 1056 if err != nil { 1057 return fmt.Errorf("failed to start txn: %w", err) 1058 } 1059 committed := false 1060 defer func() { 1061 if committed { 1062 return 1063 } 1064 tx.Rollback() 1065 i.Enforcer.E.LoadPolicy() 1066 }() 1067 1068 if len(existing) == 1 { 1069 prev := existing[0] 1070 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1071 if err = db.RemoveKnotMember(tx, 1072 orm.FilterEq("did", did), 1073 orm.FilterEq("rkey", e.Commit.RKey), 1074 ); err != nil { 1075 return fmt.Errorf("failed to remove stale row: %w", err) 1076 } 1077 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1078 return fmt.Errorf("failed to remove stale ACL: %w", err) 1079 } 1080 } 1081 } 1082 1083 if err = db.AddKnotMember(tx, models.KnotMember{ 1084 Did: syntax.DID(did), 1085 Rkey: e.Commit.RKey, 1086 Domain: record.Domain, 1087 Subject: memberId.DID, 1088 }); err != nil { 1089 return fmt.Errorf("failed to add to db: %w", err) 1090 } 1091 1092 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1093 return fmt.Errorf("failed to update ACLs: %w", err) 1094 } 1095 1096 if err = tx.Commit(); err != nil { 1097 return fmt.Errorf("failed to commit txn: %w", err) 1098 } 1099 1100 if err = i.Enforcer.E.SavePolicy(); err != nil { 1101 return fmt.Errorf("failed to save ACLs: %w", err) 1102 } 1103 committed = true 1104 1105 l.Info("upserted knot member") 1106 case jmodels.CommitOperationDelete: 1107 rkey := e.Commit.RKey 1108 1109 members, err := db.GetKnotMembers( 1110 i.Db, 1111 orm.FilterEq("did", did), 1112 orm.FilterEq("rkey", rkey), 1113 ) 1114 if err != nil { 1115 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1116 } 1117 if len(members) == 0 { 1118 l.Info("knot member already removed", "rkey", rkey) 1119 return nil 1120 } 1121 if len(members) > 1 { 1122 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1123 } 1124 member := members[0] 1125 1126 tx, err := i.Db.Begin() 1127 if err != nil { 1128 return fmt.Errorf("failed to start txn: %w", err) 1129 } 1130 committed := false 1131 defer func() { 1132 if committed { 1133 return 1134 } 1135 tx.Rollback() 1136 i.Enforcer.E.LoadPolicy() 1137 }() 1138 1139 if err = db.RemoveKnotMember( 1140 tx, 1141 orm.FilterEq("did", did), 1142 orm.FilterEq("rkey", rkey), 1143 ); err != nil { 1144 return fmt.Errorf("failed to remove from db: %w", err) 1145 } 1146 1147 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1148 return fmt.Errorf("failed to update ACLs: %w", err) 1149 } 1150 1151 if err = tx.Commit(); err != nil { 1152 return fmt.Errorf("failed to commit txn: %w", err) 1153 } 1154 1155 if err = i.Enforcer.E.SavePolicy(); err != nil { 1156 return fmt.Errorf("failed to save ACLs: %w", err) 1157 } 1158 committed = true 1159 1160 l.Info("removed knot member") 1161 } 1162 1163 return nil 1164} 1165 1166func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1167 did := e.Did 1168 var err error 1169 1170 l = l.With("handler", "ingestKnot") 1171 1172 switch e.Commit.Operation { 1173 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1174 raw := json.RawMessage(e.Commit.Record) 1175 record := tangled.Knot{} 1176 err = json.Unmarshal(raw, &record) 1177 if err != nil { 1178 l.Error("invalid record", "err", err) 1179 return err 1180 } 1181 1182 domain := e.Commit.RKey 1183 1184 err := db.AddKnot(i.Db, domain, did) 1185 if err != nil { 1186 l.Error("failed to add knot to db", "err", err, "domain", domain) 1187 return err 1188 } 1189 1190 if err := i.verifyKnot(ctx, domain, did); err != nil { 1191 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1192 } 1193 1194 l.Info("ingested record", "domain", domain) 1195 return nil 1196 1197 case jmodels.CommitOperationDelete: 1198 domain := e.Commit.RKey 1199 1200 // get record from db first 1201 registrations, err := db.GetRegistrations( 1202 i.Db, 1203 orm.FilterEq("domain", domain), 1204 orm.FilterEq("did", did), 1205 ) 1206 if err != nil { 1207 return fmt.Errorf("failed to get registration: %w", err) 1208 } 1209 if len(registrations) != 1 { 1210 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1211 } 1212 registration := registrations[0] 1213 1214 tx, err := i.Db.Begin() 1215 if err != nil { 1216 return fmt.Errorf("failed to start txn: %w", err) 1217 } 1218 defer func() { 1219 tx.Rollback() 1220 i.Enforcer.E.LoadPolicy() 1221 }() 1222 1223 err = db.RemoveKnotMember( 1224 tx, 1225 orm.FilterEq("did", did), 1226 orm.FilterEq("domain", domain), 1227 ) 1228 if err != nil { 1229 return fmt.Errorf("failed to remove knot members: %w", err) 1230 } 1231 1232 err = db.DeleteKnot( 1233 tx, 1234 orm.FilterEq("did", did), 1235 orm.FilterEq("domain", domain), 1236 ) 1237 if err != nil { 1238 return fmt.Errorf("failed to delete knot: %w", err) 1239 } 1240 1241 err = db.RemoveReposByKnot(tx, domain) 1242 if err != nil { 1243 return fmt.Errorf("failed to remove repos by knot: %w", err) 1244 } 1245 1246 if registration.Registered != nil { 1247 err = i.Enforcer.RemoveKnot(domain) 1248 if err != nil { 1249 return fmt.Errorf("failed to remove knot from enforcer: %w", err) 1250 } 1251 } 1252 1253 err = tx.Commit() 1254 if err != nil { 1255 return fmt.Errorf("failed to commit txn: %w", err) 1256 } 1257 1258 err = i.Enforcer.E.SavePolicy() 1259 if err != nil { 1260 return fmt.Errorf("failed to save ACLs: %w", err) 1261 } 1262 1263 l.Info("ingested record", "domain", domain) 1264 } 1265 1266 return nil 1267} 1268 1269const ( 1270 verifyAttempts = 4 1271 verifyMinDelay = 1 * time.Second 1272 verifyMaxDelay = 5 * time.Second 1273) 1274 1275func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1276 regs, err := db.GetRegistrations(i.Db, 1277 orm.FilterEq("domain", domain), 1278 orm.FilterEq("did", did), 1279 ) 1280 if err != nil { 1281 return fmt.Errorf("look up registration: %w", err) 1282 } 1283 if len(regs) != 1 { 1284 return fmt.Errorf("no registration for %s by %s", domain, did) 1285 } 1286 if regs[0].Registered != nil { 1287 return nil 1288 } 1289 1290 err = retry.Do( 1291 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1292 retry.Context(ctx), 1293 retry.Attempts(verifyAttempts), 1294 retry.Delay(verifyMinDelay), 1295 retry.MaxDelay(verifyMaxDelay), 1296 retry.DelayType(retry.BackOffDelay), 1297 retry.LastErrorOnly(true), 1298 ) 1299 if err != nil { 1300 return fmt.Errorf("verify: %w", err) 1301 } 1302 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1303} 1304 1305func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1306 spindles, err := db.GetSpindles(ctx, i.Db, 1307 orm.FilterEq("instance", instance), 1308 orm.FilterEq("owner", did), 1309 ) 1310 if err != nil { 1311 return fmt.Errorf("look up spindle: %w", err) 1312 } 1313 if len(spindles) != 1 { 1314 return fmt.Errorf("no spindle for %s by %s", instance, did) 1315 } 1316 if spindles[0].Verified != nil { 1317 return nil 1318 } 1319 1320 err = retry.Do( 1321 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1322 retry.Context(ctx), 1323 retry.Attempts(verifyAttempts), 1324 retry.Delay(verifyMinDelay), 1325 retry.MaxDelay(verifyMaxDelay), 1326 retry.DelayType(retry.BackOffDelay), 1327 retry.LastErrorOnly(true), 1328 ) 1329 if err != nil { 1330 return fmt.Errorf("verify: %w", err) 1331 } 1332 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1333 return err 1334} 1335 1336const sweepConcurrency = 4 1337 1338func (i *Ingester) SweepPendingVerifications() { 1339 l := i.Logger.With("handler", "SweepPendingVerifications") 1340 1341 var g errgroup.Group 1342 g.SetLimit(sweepConcurrency) 1343 1344 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1345 if err != nil { 1346 l.Error("failed to list unverified knots", "err", err) 1347 } else { 1348 for _, reg := range regs { 1349 g.Go(func() error { 1350 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1351 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1352 } 1353 return nil 1354 }) 1355 } 1356 } 1357 1358 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1359 if err != nil { 1360 l.Error("failed to list unverified spindles", "err", err) 1361 g.Wait() 1362 return 1363 } 1364 for _, s := range spindles { 1365 g.Go(func() error { 1366 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1367 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1368 } 1369 return nil 1370 }) 1371 } 1372 g.Wait() 1373} 1374 1375func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1376 did := e.Did 1377 rkey := e.Commit.RKey 1378 1379 var err error 1380 1381 l = l.With("handler", "ingestIssue") 1382 1383 switch e.Commit.Operation { 1384 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1385 raw := json.RawMessage(e.Commit.Record) 1386 record := tangled.RepoIssue{} 1387 err = json.Unmarshal(raw, &record) 1388 if err != nil { 1389 l.Error("invalid record", "err", err) 1390 return err 1391 } 1392 1393 issue := models.IssueFromRecord(did, rkey, record) 1394 1395 if issue.RepoDid == "" { 1396 return fmt.Errorf("issue record has no repo field") 1397 } 1398 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1399 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1400 } 1401 1402 if err := i.Validator.ValidateIssue(&issue); err != nil { 1403 return fmt.Errorf("failed to validate issue: %w", err) 1404 } 1405 1406 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1407 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1408 if repoErr == nil && repo.RepoDid != "" { 1409 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1410 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1411 } 1412 } 1413 } 1414 1415 tx, err := i.Db.BeginTx(ctx, nil) 1416 if err != nil { 1417 l.Error("failed to begin transaction", "err", err) 1418 return err 1419 } 1420 defer tx.Rollback() 1421 1422 err = db.PutIssue(tx, &issue) 1423 if err != nil { 1424 l.Error("failed to create issue", "err", err) 1425 return err 1426 } 1427 1428 err = tx.Commit() 1429 if err != nil { 1430 l.Error("failed to commit txn", "err", err) 1431 return err 1432 } 1433 1434 l.Info("ingested record") 1435 return nil 1436 1437 case jmodels.CommitOperationDelete: 1438 tx, err := i.Db.BeginTx(ctx, nil) 1439 if err != nil { 1440 l.Error("failed to begin transaction", "err", err) 1441 return err 1442 } 1443 defer tx.Rollback() 1444 1445 if err := db.DeleteIssues( 1446 tx, 1447 did, 1448 rkey, 1449 ); err != nil { 1450 l.Error("failed to delete", "err", err) 1451 return fmt.Errorf("failed to delete issue record: %w", err) 1452 } 1453 if err := tx.Commit(); err != nil { 1454 l.Error("failed to commit txn", "err", err) 1455 return err 1456 } 1457 1458 l.Info("ingested record") 1459 return nil 1460 } 1461 1462 return nil 1463} 1464 1465func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1466 did := e.Did 1467 rkey := e.Commit.RKey 1468 1469 var err error 1470 1471 l = l.With("handler", "ingestPull") 1472 1473 switch e.Commit.Operation { 1474 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1475 raw := json.RawMessage(e.Commit.Record) 1476 record := tangled.RepoPull{} 1477 err = json.Unmarshal(raw, &record) 1478 if err != nil { 1479 l.Error("invalid record", "err", err) 1480 return err 1481 } 1482 1483 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1484 if err != nil { 1485 l.Error("failed to resolve did", "err", err) 1486 return err 1487 } 1488 1489 // go through and fetch all blobs in parallel 1490 readers := make([]*io.ReadCloser, len(record.Rounds)) 1491 var mu sync.Mutex 1492 1493 g, gctx := errgroup.WithContext(ctx) 1494 1495 for idx, b := range record.Rounds { 1496 g.Go(func() error { 1497 // for some reason, a blob is empty 1498 if b.PatchBlob == nil { 1499 return fmt.Errorf("missing patchBlob in round %d", idx) 1500 } 1501 1502 ownerPds := ownerId.PDSEndpoint() 1503 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1504 q := url.Query() 1505 q.Set("cid", b.PatchBlob.Ref.String()) 1506 q.Set("did", did) 1507 url.RawQuery = q.Encode() 1508 1509 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1510 if err != nil { 1511 l.Error("failed to create request") 1512 return err 1513 } 1514 req.Header.Set("Content-Type", "application/json") 1515 1516 resp, err := http.DefaultClient.Do(req) 1517 if err != nil { 1518 l.Error("failed to make request") 1519 return err 1520 } 1521 1522 mu.Lock() 1523 readers[idx] = &resp.Body 1524 mu.Unlock() 1525 1526 return nil 1527 }) 1528 } 1529 1530 if err := g.Wait(); err != nil { 1531 for _, r := range readers { 1532 if r != nil && *r != nil { 1533 (*r).Close() 1534 } 1535 } 1536 return err 1537 } 1538 1539 defer func() { 1540 for _, r := range readers { 1541 if r != nil && *r != nil { 1542 (*r).Close() 1543 } 1544 } 1545 }() 1546 1547 pull, err := models.PullFromRecord(did, rkey, record, readers) 1548 if err != nil { 1549 return fmt.Errorf("failed to parse pull from record: %w", err) 1550 } 1551 if err := i.Validator.ValidatePull(pull); err != nil { 1552 return fmt.Errorf("failed to validate pull: %w", err) 1553 } 1554 1555 tx, err := i.Db.BeginTx(ctx, nil) 1556 if err != nil { 1557 l.Error("failed to begin transaction", "err", err) 1558 return err 1559 } 1560 defer tx.Rollback() 1561 1562 err = db.PutPull(tx, pull) 1563 if err != nil { 1564 l.Error("failed to create pull", "err", err) 1565 return err 1566 } 1567 1568 err = tx.Commit() 1569 if err != nil { 1570 l.Error("failed to commit txn", "err", err) 1571 return err 1572 } 1573 1574 l.Info("ingested record") 1575 return nil 1576 1577 case jmodels.CommitOperationDelete: 1578 tx, err := i.Db.BeginTx(ctx, nil) 1579 if err != nil { 1580 l.Error("failed to begin transaction", "err", err) 1581 return err 1582 } 1583 defer tx.Rollback() 1584 1585 if err := db.AbandonPulls( 1586 tx, 1587 orm.FilterEq("owner_did", did), 1588 orm.FilterEq("rkey", rkey), 1589 ); err != nil { 1590 l.Error("failed to abandon", "err", err) 1591 return fmt.Errorf("failed to abandon pull record: %w", err) 1592 } 1593 if err := tx.Commit(); err != nil { 1594 l.Error("failed to commit txn", "err", err) 1595 return err 1596 } 1597 1598 l.Info("ingested record") 1599 return nil 1600 } 1601 1602 return nil 1603} 1604 1605// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1606func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error { 1607 l = l.With("handler", "ingestIssueComment") 1608 1609 switch e.Commit.Operation { 1610 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1611 // no-op. sh.tangled.repo.issue.comment is deprecated 1612 1613 case jmodels.CommitOperationDelete: 1614 if err := db.PurgeComments( 1615 i.Db, 1616 orm.FilterEq("did", e.Did), 1617 orm.FilterEq("collection", e.Commit.Collection), 1618 orm.FilterEq("rkey", e.Commit.RKey), 1619 ); err != nil { 1620 return fmt.Errorf("failed to delete comment record: %w", err) 1621 } 1622 } 1623 1624 l.Info("ingested record") 1625 return nil 1626} 1627 1628// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1629func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error { 1630 l = l.With("handler", "ingestPullComment") 1631 1632 switch e.Commit.Operation { 1633 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1634 // no-op. sh.tangled.repo.pull.comment is deprecated 1635 1636 case jmodels.CommitOperationDelete: 1637 if err := db.PurgeComments( 1638 i.Db, 1639 orm.FilterEq("did", e.Did), 1640 orm.FilterEq("collection", e.Commit.Collection), 1641 orm.FilterEq("rkey", e.Commit.RKey), 1642 ); err != nil { 1643 return fmt.Errorf("failed to delete comment record: %w", err) 1644 } 1645 } 1646 1647 l.Info("ingested record") 1648 return nil 1649} 1650 1651func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error { 1652 did := e.Did 1653 rkey := e.Commit.RKey 1654 cid := e.Commit.CID 1655 1656 var err error 1657 1658 l = l.With("handler", "ingestComment") 1659 1660 ctx := context.Background() 1661 1662 switch e.Commit.Operation { 1663 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1664 raw := json.RawMessage(e.Commit.Record) 1665 record := tangled.FeedComment{} 1666 err = json.Unmarshal(raw, &record) 1667 if err != nil { 1668 return fmt.Errorf("invalid record: %w", err) 1669 } 1670 1671 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1672 if err != nil { 1673 return fmt.Errorf("failed to parse comment from record: %w", err) 1674 } 1675 1676 if err := comment.Validate(); err != nil { 1677 return fmt.Errorf("failed to validate comment: %w", err) 1678 } 1679 1680 var references []syntax.ATURI 1681 if comment.Body.Original != nil { 1682 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1683 } 1684 1685 tx, err := i.Db.Begin() 1686 if err != nil { 1687 return fmt.Errorf("failed to start transaction: %w", err) 1688 } 1689 defer tx.Rollback() 1690 1691 _, err = db.PutComment(tx, comment, references) 1692 if err != nil { 1693 return fmt.Errorf("failed to create comment: %w", err) 1694 } 1695 1696 if err := tx.Commit(); err != nil { 1697 return err 1698 } 1699 1700 case jmodels.CommitOperationDelete: 1701 if err := db.DeleteComments( 1702 i.Db, 1703 orm.FilterEq("did", did), 1704 orm.FilterEq("collection", e.Commit.Collection), 1705 orm.FilterEq("rkey", rkey), 1706 ); err != nil { 1707 return fmt.Errorf("failed to delete comment record: %w", err) 1708 } 1709 } 1710 1711 l.Info("ingested record") 1712 return nil 1713} 1714 1715func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error { 1716 did := e.Did 1717 rkey := e.Commit.RKey 1718 1719 l = l.With("handler", "ingestReaction") 1720 1721 switch e.Commit.Operation { 1722 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1723 raw := json.RawMessage(e.Commit.Record) 1724 record := tangled.FeedReaction{} 1725 if err := json.Unmarshal(raw, &record); err != nil { 1726 return fmt.Errorf("invalid record: %w", err) 1727 } 1728 1729 subjectUri, err := syntax.ParseATURI(record.Subject) 1730 if err != nil { 1731 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1732 } 1733 subjectUri = models.NormalizeReactionSubject(subjectUri) 1734 1735 kind, ok := models.ParseReactionKind(record.Reaction) 1736 if !ok { 1737 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1738 } 1739 1740 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1741 if parseErr != nil { 1742 created = time.Now() 1743 } 1744 1745 tx, err := i.Db.Begin() 1746 if err != nil { 1747 return fmt.Errorf("failed to start transaction: %w", err) 1748 } 1749 defer tx.Rollback() 1750 1751 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1752 return fmt.Errorf("failed to clear existing reaction: %w", err) 1753 } 1754 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1755 return fmt.Errorf("failed to add reaction: %w", err) 1756 } 1757 1758 if err := tx.Commit(); err != nil { 1759 return err 1760 } 1761 1762 case jmodels.CommitOperationDelete: 1763 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1764 return fmt.Errorf("failed to delete reaction record: %w", err) 1765 } 1766 } 1767 1768 l.Info("ingested record") 1769 return nil 1770} 1771 1772func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error { 1773 did := e.Did 1774 rkey := e.Commit.RKey 1775 1776 var err error 1777 1778 l = l.With("handler", "ingestLabelDefinition") 1779 1780 switch e.Commit.Operation { 1781 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1782 raw := json.RawMessage(e.Commit.Record) 1783 record := tangled.LabelDefinition{} 1784 err = json.Unmarshal(raw, &record) 1785 if err != nil { 1786 return fmt.Errorf("invalid record: %w", err) 1787 } 1788 1789 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1790 if err != nil { 1791 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1792 } 1793 1794 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1795 return fmt.Errorf("failed to validate labeldef: %w", err) 1796 } 1797 1798 _, err = db.AddLabelDefinition(i.Db, def) 1799 if err != nil { 1800 return fmt.Errorf("failed to create labeldef: %w", err) 1801 } 1802 1803 l.Info("ingested record") 1804 return nil 1805 1806 case jmodels.CommitOperationDelete: 1807 if err := db.DeleteLabelDefinition( 1808 i.Db, 1809 orm.FilterEq("did", did), 1810 orm.FilterEq("rkey", rkey), 1811 ); err != nil { 1812 return fmt.Errorf("failed to delete labeldef record: %w", err) 1813 } 1814 1815 l.Info("ingested record") 1816 return nil 1817 } 1818 1819 return nil 1820} 1821 1822func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1823 did := e.Did 1824 rkey := e.Commit.RKey 1825 1826 var err error 1827 1828 l = l.With("handler", "ingestLabelOp") 1829 1830 switch e.Commit.Operation { 1831 case jmodels.CommitOperationCreate: 1832 raw := json.RawMessage(e.Commit.Record) 1833 record := tangled.LabelOp{} 1834 err = json.Unmarshal(raw, &record) 1835 if err != nil { 1836 return fmt.Errorf("invalid record: %w", err) 1837 } 1838 1839 subject := syntax.ATURI(record.Subject) 1840 collection := subject.Collection() 1841 1842 var repo *models.Repo 1843 switch collection { 1844 case tangled.RepoIssueNSID: 1845 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1846 if err != nil || len(i) != 1 { 1847 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1848 } 1849 repo = i[0].Repo 1850 case tangled.RepoPullNSID: 1851 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1852 if err != nil || len(p) != 1 { 1853 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1854 } 1855 repo = p[0].Repo 1856 default: 1857 return fmt.Errorf("unsupported label subject: %s", collection) 1858 } 1859 1860 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1861 if err != nil { 1862 return fmt.Errorf("failed to build label application ctx: %w", err) 1863 } 1864 1865 ops := models.LabelOpsFromRecord(did, rkey, record) 1866 1867 for _, o := range ops { 1868 def, ok := actx.Defs[o.OperandKey] 1869 if !ok { 1870 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1871 } 1872 if err := i.Validator.ValidateLabelOp(ctx, def, repo, &o); err != nil { 1873 if !errors.Is(err, knotacl.ErrKnotUnreachable) { 1874 return fmt.Errorf("failed to validate labelop: %w", err) 1875 } 1876 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", err) 1877 } 1878 } 1879 1880 tx, err := i.Db.Begin() 1881 if err != nil { 1882 return err 1883 } 1884 defer tx.Rollback() 1885 1886 for _, o := range ops { 1887 _, err = db.AddLabelOp(tx, &o) 1888 if err != nil { 1889 return fmt.Errorf("failed to add labelop: %w", err) 1890 } 1891 } 1892 1893 if err = tx.Commit(); err != nil { 1894 return err 1895 } 1896 1897 l.Info("ingested record") 1898 } 1899 1900 return nil 1901}