Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/knotacl" 31 "tangled.org/core/appview/mentions" 32 "tangled.org/core/appview/models" 33 "tangled.org/core/appview/notify" 34 "tangled.org/core/appview/repoverify" 35 "tangled.org/core/appview/serververify" 36 "tangled.org/core/appview/validator" 37 "tangled.org/core/idresolver" 38 "tangled.org/core/orm" 39 "tangled.org/core/rbac" 40) 41 42type Ingester struct { 43 Ctx context.Context 44 Db *db.DB 45 Enforcer *rbac.Enforcer 46 Acl *knotacl.Service 47 IdResolver *idresolver.Resolver 48 Cache *cache.Cache 49 Config *config.Config 50 Logger *slog.Logger 51 Validator *validator.Validator 52 MentionsResolver *mentions.Resolver 53 Notifier notify.Notifier 54 Verifier repoverify.Verifier 55} 56 57type processFunc func(ctx context.Context, e *jmodels.Event) error 58 59func (i *Ingester) Ingest() processFunc { 60 return func(ctx context.Context, e *jmodels.Event) error { 61 var err error 62 63 l := i.Logger.With("kind", e.Kind) 64 switch e.Kind { 65 case jmodels.EventKindAccount: 66 // TODO: sync account state to db 67 if e.Account.Active { 68 break 69 } 70 // TODO: revoke sessions by DID 71 if *e.Account.Status == "deactivated" { 72 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 73 } 74 case jmodels.EventKindIdentity: 75 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 76 case jmodels.EventKindCommit: 77 l = l.With( 78 "nsid", e.Commit.Collection, 79 "did", e.Did, 80 "rkey", e.Commit.RKey, 81 "op", e.Commit.Operation, 82 ) 83 switch e.Commit.Collection { 84 case tangled.GraphFollowNSID: 85 err = i.ingestFollow(e, l) 86 case tangled.GraphVouchNSID: 87 err = i.ingestVouch(ctx, e, l) 88 case tangled.FeedStarNSID: 89 err = i.ingestStar(ctx, e, l) 90 case tangled.FeedReactionNSID: 91 err = i.ingestReaction(e, l) 92 case tangled.PublicKeyNSID: 93 err = i.ingestPublicKey(e, l) 94 case tangled.RepoArtifactNSID: 95 err = i.ingestArtifact(ctx, e, l) 96 case tangled.ActorProfileNSID: 97 err = i.ingestProfile(ctx, e, l) 98 case tangled.SpindleMemberNSID: 99 err = i.ingestSpindleMember(ctx, e, l) 100 case tangled.SpindleNSID: 101 err = i.ingestSpindle(ctx, e, l) 102 case tangled.KnotMemberNSID: 103 err = i.ingestKnotMember(ctx, e, l) 104 case tangled.KnotNSID: 105 err = i.ingestKnot(ctx, e, l) 106 case tangled.StringNSID: 107 err = i.ingestString(e, l) 108 case tangled.RepoIssueNSID: 109 err = i.ingestIssue(ctx, e, l) 110 case tangled.RepoPullNSID: 111 err = i.ingestPull(ctx, e, l) 112 case tangled.FeedCommentNSID: 113 err = i.ingestComment(e, l) 114 case tangled.RepoIssueCommentNSID: 115 err = i.ingestIssueComment(e, l) 116 case tangled.RepoPullCommentNSID: 117 err = i.ingestPullComment(e, l) 118 case tangled.LabelDefinitionNSID: 119 err = i.ingestLabelDefinition(e, l) 120 case tangled.LabelOpNSID: 121 err = i.ingestLabelOp(ctx, e, l) 122 case tangled.RepoNSID: 123 err = i.ingestRepo(ctx, e, l) 124 } 125 } 126 127 if err != nil { 128 l.Warn("failed to ingest record, skipping", "err", err) 129 } 130 131 lastTimeUs := e.TimeUS + 1 132 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 133 l.Error("failed to save cursor", "err", saveErr) 134 } 135 136 return nil 137 } 138} 139 140func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 141 if strings.HasPrefix(ref, "did:") { 142 return db.GetRepoByDid(i.Db, ref) 143 } 144 return db.GetRepoByAtUri(i.Db, ref) 145} 146 147func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 148 var legacy struct { 149 Subject *string `json:"subject"` 150 SubjectDid *string `json:"subjectDid"` 151 } 152 if err := json.Unmarshal(raw, &legacy); err != nil { 153 return false, err 154 } 155 156 switch { 157 case legacy.SubjectDid != nil: 158 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 159 if err != nil { 160 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 161 return false, nil 162 } 163 star.SubjectType = models.StarSubjectRepo 164 star.Subject = repo.RepoDid 165 return true, nil 166 167 case legacy.Subject != nil: 168 uri, err := syntax.ParseATURI(*legacy.Subject) 169 if err != nil { 170 return false, fmt.Errorf("invalid old-format star subject: %w", err) 171 } 172 switch uri.Collection().String() { 173 case tangled.RepoNSID: 174 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 175 if err != nil { 176 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 177 return false, nil 178 } 179 star.SubjectType = models.StarSubjectRepo 180 star.Subject = repo.RepoDid 181 return true, nil 182 default: 183 star.SubjectType = models.StarSubjectString 184 star.Subject = *legacy.Subject 185 return true, nil 186 } 187 188 default: 189 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 190 } 191} 192 193func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 194 var err error 195 did := e.Did 196 197 l = l.With("handler", "ingestStar") 198 199 switch e.Commit.Operation { 200 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 201 raw := json.RawMessage(e.Commit.Record) 202 record := tangled.FeedStar{} 203 unmarshalErr := json.Unmarshal(raw, &record) 204 205 star := &models.Star{ 206 Did: did, 207 Rkey: e.Commit.RKey, 208 } 209 210 switch { 211 case unmarshalErr != nil: 212 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 213 if resolveErr != nil { 214 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 215 return unmarshalErr 216 } 217 if !resolved { 218 return nil 219 } 220 221 case record.Subject == nil: 222 return fmt.Errorf("star record has nil subject") 223 224 case record.Subject.FeedStar_Repo != nil: 225 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 226 if repoErr != nil { 227 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 228 return nil 229 } 230 star.SubjectType = models.StarSubjectRepo 231 star.Subject = repo.RepoDid 232 233 case record.Subject.FeedStar_String != nil: 234 star.SubjectType = models.StarSubjectString 235 star.Subject = record.Subject.FeedStar_String.Uri 236 237 default: 238 return fmt.Errorf("star record has empty subject union") 239 } 240 241 err = db.AddStar(i.Db, star) 242 case jmodels.CommitOperationDelete: 243 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 244 } 245 246 if err != nil { 247 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 248 } 249 250 l.Info("ingested record") 251 return nil 252} 253 254func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error { 255 var err error 256 did := e.Did 257 258 l = l.With("handler", "ingestFollow") 259 260 switch e.Commit.Operation { 261 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 262 raw := json.RawMessage(e.Commit.Record) 263 record := tangled.GraphFollow{} 264 err = json.Unmarshal(raw, &record) 265 if err != nil { 266 l.Error("invalid record", "err", err) 267 return err 268 } 269 270 err = db.AddFollow(i.Db, &models.Follow{ 271 UserDid: did, 272 SubjectDid: record.Subject, 273 Rkey: e.Commit.RKey, 274 }) 275 case jmodels.CommitOperationDelete: 276 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 277 } 278 279 if err != nil { 280 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 281 } 282 283 l.Info("ingested record") 284 return nil 285} 286 287func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 288 var err error 289 did := e.Did 290 291 l = l.With("handler", "ingestVouch") 292 293 switch e.Commit.Operation { 294 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 295 raw := json.RawMessage(e.Commit.Record) 296 record := tangled.GraphVouch{} 297 err = json.Unmarshal(raw, &record) 298 if err != nil { 299 l.Error("invalid record", "err", err) 300 return err 301 } 302 303 // rkey is the subject_did being vouched for/denounced 304 subjectDID := e.Commit.RKey 305 306 _, err = syntax.ParseDID(subjectDID) 307 if err != nil { 308 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 309 return fmt.Errorf("invalid subject_did: %w", err) 310 } 311 312 if did == subjectDID { 313 l.Warn("attempted self-vouch", "did", did) 314 return fmt.Errorf("cannot vouch for self") 315 } 316 317 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 318 if err != nil { 319 return err 320 } 321 322 if subjectId.Handle.IsInvalidHandle() { 323 return err 324 } 325 326 kind, err := models.ParseVouchKind(record.Kind) 327 if err != nil { 328 l.Error("invalid kind", "kind", kind) 329 return fmt.Errorf("invalid kind: %s", kind) 330 } 331 332 recordCid, err := cid.Parse(e.Commit.CID) 333 if err != nil { 334 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 335 return fmt.Errorf("invalid cid: %w", err) 336 } 337 338 var evidences []syntax.ATURI 339 for _, raw := range record.Evidences { 340 uri, parseErr := syntax.ParseATURI(raw) 341 if parseErr != nil { 342 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 343 continue 344 } 345 evidences = append(evidences, uri) 346 } 347 348 tx, txErr := i.Db.Begin() 349 if txErr != nil { 350 return fmt.Errorf("failed to start transaction: %w", txErr) 351 } 352 353 addErr := db.AddVouch(tx, &models.Vouch{ 354 Did: syntax.DID(did), 355 SubjectDid: subjectId.DID, 356 Cid: recordCid, 357 Kind: kind, 358 Reason: record.Reason, 359 Evidences: evidences, 360 }) 361 if addErr != nil { 362 tx.Rollback() 363 err = addErr 364 } else { 365 err = tx.Commit() 366 } 367 368 case jmodels.CommitOperationDelete: 369 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 370 } 371 372 if err != nil { 373 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 374 } 375 376 l.Info("ingested record") 377 return nil 378} 379 380func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error { 381 did := e.Did 382 var err error 383 384 l = l.With("handler", "ingestPublicKey") 385 386 switch e.Commit.Operation { 387 case jmodels.CommitOperationCreate: 388 l.Debug("processing add of pubkey") 389 raw := json.RawMessage(e.Commit.Record) 390 record := tangled.PublicKey{} 391 err = json.Unmarshal(raw, &record) 392 if err != nil { 393 l.Error("invalid record", "err", err) 394 return err 395 } 396 397 name := record.Name 398 key := record.Key 399 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 400 case jmodels.CommitOperationUpdate: 401 l.Debug("processing update of pubkey") 402 raw := json.RawMessage(e.Commit.Record) 403 record := tangled.PublicKey{} 404 err = json.Unmarshal(raw, &record) 405 if err != nil { 406 l.Error("invalid record", "err", err) 407 return err 408 } 409 410 name := record.Name 411 key := record.Key 412 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 413 case jmodels.CommitOperationDelete: 414 l.Debug("processing delete of pubkey") 415 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 416 } 417 418 if err != nil { 419 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 420 } 421 422 l.Info("ingested record") 423 return nil 424} 425 426func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 427 did := e.Did 428 var err error 429 430 l = l.With("handler", "ingestArtifact") 431 432 switch e.Commit.Operation { 433 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 434 raw := json.RawMessage(e.Commit.Record) 435 record := tangled.RepoArtifact{} 436 err = json.Unmarshal(raw, &record) 437 if err != nil { 438 l.Error("invalid record", "err", err) 439 return err 440 } 441 442 var repo *models.Repo 443 if record.RepoDid != nil && *record.RepoDid != "" { 444 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 445 if err != nil && !errors.Is(err, sql.ErrNoRows) { 446 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 447 } 448 } 449 if repo == nil && record.Repo != nil { 450 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 451 if parseErr != nil { 452 return parseErr 453 } 454 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 455 if err != nil { 456 return err 457 } 458 } 459 if repo == nil { 460 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 461 } 462 463 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push") 464 if permErr != nil { 465 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr) 466 } else if !allowed { 467 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier()) 468 return nil 469 } 470 471 repoDid := repo.RepoDid 472 if repoDid == "" && record.RepoDid != nil { 473 repoDid = *record.RepoDid 474 } 475 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 476 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 477 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 478 } 479 } 480 481 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 482 if parseErr != nil { 483 createdAt = time.Now() 484 } 485 486 artifact := models.Artifact{ 487 Did: did, 488 Rkey: e.Commit.RKey, 489 RepoDid: syntax.DID(repo.RepoDid), 490 Tag: plumbing.Hash(record.Tag), 491 CreatedAt: createdAt, 492 BlobCid: cid.Cid(record.Artifact.Ref), 493 Name: record.Name, 494 Size: uint64(record.Artifact.Size), 495 MimeType: record.Artifact.MimeType, 496 } 497 498 err = db.AddArtifact(i.Db, artifact) 499 case jmodels.CommitOperationDelete: 500 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 501 } 502 503 if err != nil { 504 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 505 } 506 507 l.Info("ingested record") 508 return nil 509} 510 511func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 512 did := e.Did 513 var err error 514 515 l = l.With("handler", "ingestProfile") 516 517 if e.Commit.RKey != "self" { 518 return fmt.Errorf("ingestProfile only ingests `self` record") 519 } 520 521 switch e.Commit.Operation { 522 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 523 raw := json.RawMessage(e.Commit.Record) 524 record := tangled.ActorProfile{} 525 err = json.Unmarshal(raw, &record) 526 if err != nil { 527 l.Error("invalid record", "err", err) 528 return err 529 } 530 531 avatar := "" 532 if record.Avatar != nil { 533 avatar = record.Avatar.Ref.String() 534 } 535 536 description := "" 537 if record.Description != nil { 538 description = *record.Description 539 } 540 541 includeBluesky := record.Bluesky 542 543 pronouns := "" 544 if record.Pronouns != nil { 545 pronouns = *record.Pronouns 546 } 547 548 location := "" 549 if record.Location != nil { 550 location = *record.Location 551 } 552 553 var links [5]string 554 for i, l := range record.Links { 555 if i < 5 { 556 links[i] = l 557 } 558 } 559 560 var stats [2]models.VanityStat 561 for i, s := range record.Stats { 562 if i < 2 { 563 stats[i].Kind = models.ParseVanityStatKind(s) 564 } 565 } 566 567 var pinned [6]string 568 for i, r := range record.PinnedRepositories { 569 if i < 6 { 570 pinned[i] = r 571 } 572 } 573 574 var preferredHandle syntax.Handle 575 if record.PreferredHandle != nil { 576 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 577 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 578 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 579 preferredHandle = h 580 } 581 } 582 } 583 584 profile := models.Profile{ 585 Did: did, 586 Avatar: avatar, 587 Description: description, 588 IncludeBluesky: includeBluesky, 589 Location: location, 590 Links: links, 591 Stats: stats, 592 PinnedRepos: pinned, 593 Pronouns: pronouns, 594 PreferredHandle: preferredHandle, 595 } 596 597 tx, err := i.Db.Begin() 598 if err != nil { 599 return fmt.Errorf("failed to start transaction: %w", err) 600 } 601 602 err = db.ValidateProfile(tx, &profile) 603 if err != nil { 604 return fmt.Errorf("invalid profile record") 605 } 606 607 err = db.UpsertProfile(tx, &profile) 608 if err == nil && i.Cache != nil { 609 pipe := i.Cache.Pipeline() 610 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 611 if preferredHandle != "" { 612 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 613 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 614 } else { 615 pipe.Del(ctx, didKey) 616 } 617 if _, execErr := pipe.Exec(ctx); execErr != nil { 618 l.Warn("failed to update preferred handle cache", "err", execErr) 619 } 620 } 621 case jmodels.CommitOperationDelete: 622 tx, beginErr := i.Db.Begin() 623 if beginErr != nil { 624 return fmt.Errorf("failed to start transaction: %w", beginErr) 625 } 626 627 priorHandle, phErr := db.GetPreferredHandle(tx, did) 628 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 629 l.Warn("failed to read prior preferred handle", "err", phErr) 630 } 631 632 err = db.DeleteProfile(tx, did) 633 if err == nil && i.Cache != nil { 634 pipe := i.Cache.Pipeline() 635 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 636 if priorHandle != "" { 637 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 638 } 639 if _, execErr := pipe.Exec(ctx); execErr != nil { 640 l.Warn("failed to evict preferred handle cache", "err", execErr) 641 } 642 } 643 } 644 645 if err != nil { 646 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 647 } 648 649 l.Info("ingested record") 650 return nil 651} 652 653func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 654 did := e.Did 655 var err error 656 657 l = l.With("handler", "ingestSpindleMember") 658 659 switch e.Commit.Operation { 660 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 661 raw := json.RawMessage(e.Commit.Record) 662 record := tangled.SpindleMember{} 663 err = json.Unmarshal(raw, &record) 664 if err != nil { 665 l.Error("invalid record", "err", err) 666 return err 667 } 668 669 // only spindle owner can invite to spindles 670 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 671 if err != nil { 672 return fmt.Errorf("failed to check invite permission: %w", err) 673 } 674 if !ok { 675 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 676 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 677 } 678 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 679 if err != nil { 680 return fmt.Errorf("failed to re-check invite permission: %w", err) 681 } 682 if !ok { 683 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 684 } 685 } 686 687 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 688 if err != nil { 689 return err 690 } 691 692 if memberId.Handle.IsInvalidHandle() { 693 return fmt.Errorf("invalid handle for member %s", record.Subject) 694 } 695 696 existing, err := db.GetSpindleMembers(i.Db, 697 orm.FilterEq("did", did), 698 orm.FilterEq("rkey", e.Commit.RKey), 699 ) 700 if err != nil { 701 return fmt.Errorf("failed to look up existing member: %w", err) 702 } 703 if len(existing) > 1 { 704 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 705 } 706 707 tx, err := i.Db.Begin() 708 if err != nil { 709 return fmt.Errorf("failed to start txn: %w", err) 710 } 711 committed := false 712 defer func() { 713 if committed { 714 return 715 } 716 tx.Rollback() 717 i.Enforcer.E.LoadPolicy() 718 }() 719 720 if len(existing) == 1 { 721 prev := existing[0] 722 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 723 if err = db.RemoveSpindleMember(tx, 724 orm.FilterEq("did", did), 725 orm.FilterEq("rkey", e.Commit.RKey), 726 ); err != nil { 727 return fmt.Errorf("failed to remove stale row: %w", err) 728 } 729 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 730 return fmt.Errorf("failed to remove stale ACL: %w", err) 731 } 732 } 733 } 734 735 if err = db.AddSpindleMember(tx, models.SpindleMember{ 736 Did: syntax.DID(did), 737 Rkey: e.Commit.RKey, 738 Instance: record.Instance, 739 Subject: memberId.DID, 740 }); err != nil { 741 return fmt.Errorf("failed to add to db: %w", err) 742 } 743 744 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 745 return fmt.Errorf("failed to update ACLs: %w", err) 746 } 747 748 if err = tx.Commit(); err != nil { 749 return fmt.Errorf("failed to commit txn: %w", err) 750 } 751 752 if err = i.Enforcer.E.SavePolicy(); err != nil { 753 return fmt.Errorf("failed to save ACLs: %w", err) 754 } 755 committed = true 756 757 l.Info("upserted spindle member") 758 case jmodels.CommitOperationDelete: 759 rkey := e.Commit.RKey 760 761 // get record from db first 762 members, err := db.GetSpindleMembers( 763 i.Db, 764 orm.FilterEq("did", did), 765 orm.FilterEq("rkey", rkey), 766 ) 767 if err != nil || len(members) != 1 { 768 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 769 } 770 member := members[0] 771 772 tx, err := i.Db.Begin() 773 if err != nil { 774 return fmt.Errorf("failed to start txn: %w", err) 775 } 776 committed := false 777 defer func() { 778 if committed { 779 return 780 } 781 tx.Rollback() 782 i.Enforcer.E.LoadPolicy() 783 }() 784 785 // remove record by rkey && update enforcer 786 if err = db.RemoveSpindleMember( 787 tx, 788 orm.FilterEq("did", did), 789 orm.FilterEq("rkey", rkey), 790 ); err != nil { 791 return fmt.Errorf("failed to remove from db: %w", err) 792 } 793 794 // update enforcer 795 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 796 if err != nil { 797 return fmt.Errorf("failed to update ACLs: %w", err) 798 } 799 800 if err = tx.Commit(); err != nil { 801 return fmt.Errorf("failed to commit txn: %w", err) 802 } 803 804 if err = i.Enforcer.E.SavePolicy(); err != nil { 805 return fmt.Errorf("failed to save ACLs: %w", err) 806 } 807 committed = true 808 809 l.Info("removed spindle member") 810 } 811 812 return nil 813} 814 815func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 816 did := e.Did 817 var err error 818 819 l = l.With("handler", "ingestSpindle") 820 821 switch e.Commit.Operation { 822 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 823 raw := json.RawMessage(e.Commit.Record) 824 record := tangled.Spindle{} 825 err = json.Unmarshal(raw, &record) 826 if err != nil { 827 l.Error("invalid record", "err", err) 828 return err 829 } 830 831 instance := e.Commit.RKey 832 833 err := db.AddSpindle(i.Db, models.Spindle{ 834 Owner: syntax.DID(did), 835 Instance: instance, 836 }) 837 if err != nil { 838 l.Error("failed to add spindle to db", "err", err, "instance", instance) 839 return err 840 } 841 842 if err := i.verifySpindle(ctx, instance, did); err != nil { 843 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 844 } 845 846 l.Info("ingested record", "instance", instance) 847 return nil 848 849 case jmodels.CommitOperationDelete: 850 instance := e.Commit.RKey 851 852 // get record from db first 853 spindles, err := db.GetSpindles( 854 ctx, 855 i.Db, 856 orm.FilterEq("owner", did), 857 orm.FilterEq("instance", instance), 858 ) 859 if err != nil || len(spindles) != 1 { 860 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 861 } 862 spindle := spindles[0] 863 864 tx, err := i.Db.Begin() 865 if err != nil { 866 return fmt.Errorf("failed to start txn: %w", err) 867 } 868 defer func() { 869 tx.Rollback() 870 i.Enforcer.E.LoadPolicy() 871 }() 872 873 // remove spindle members first 874 err = db.RemoveSpindleMember( 875 tx, 876 orm.FilterEq("owner", did), 877 orm.FilterEq("instance", instance), 878 ) 879 if err != nil { 880 return fmt.Errorf("failed to remove spindle members: %w", err) 881 } 882 883 err = db.DeleteSpindle( 884 tx, 885 orm.FilterEq("owner", did), 886 orm.FilterEq("instance", instance), 887 ) 888 if err != nil { 889 return fmt.Errorf("failed to delete spindle: %w", err) 890 } 891 892 if spindle.Verified != nil { 893 err = i.Enforcer.RemoveSpindle(instance) 894 if err != nil { 895 return fmt.Errorf("failed to remove spindle from enforcer: %w", err) 896 } 897 } 898 899 err = tx.Commit() 900 if err != nil { 901 return fmt.Errorf("failed to commit txn: %w", err) 902 } 903 904 err = i.Enforcer.E.SavePolicy() 905 if err != nil { 906 return fmt.Errorf("failed to save ACLs: %w", err) 907 } 908 909 l.Info("ingested record", "instance", instance) 910 } 911 912 return nil 913} 914 915func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error { 916 did := e.Did 917 rkey := e.Commit.RKey 918 919 var err error 920 921 l = l.With("handler", "ingestString") 922 923 switch e.Commit.Operation { 924 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 925 raw := json.RawMessage(e.Commit.Record) 926 record := tangled.String{} 927 err = json.Unmarshal(raw, &record) 928 if err != nil { 929 l.Error("invalid record", "err", err) 930 return err 931 } 932 933 string := models.StringFromRecord(did, rkey, record) 934 935 if err = i.Validator.ValidateString(&string); err != nil { 936 l.Error("invalid record", "err", err) 937 return err 938 } 939 940 if err = db.AddString(i.Db, string); err != nil { 941 l.Error("failed to add string", "err", err) 942 return err 943 } 944 945 l.Info("ingested record") 946 return nil 947 948 case jmodels.CommitOperationDelete: 949 if err := db.DeleteString( 950 i.Db, 951 orm.FilterEq("did", did), 952 orm.FilterEq("rkey", rkey), 953 ); err != nil { 954 l.Error("failed to delete", "err", err) 955 return fmt.Errorf("failed to delete string record: %w", err) 956 } 957 958 l.Info("ingested record") 959 return nil 960 } 961 962 return nil 963} 964 965func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 966 did := e.Did 967 var err error 968 969 l = l.With("handler", "ingestKnotMember") 970 971 switch e.Commit.Operation { 972 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 973 raw := json.RawMessage(e.Commit.Record) 974 record := tangled.KnotMember{} 975 err = json.Unmarshal(raw, &record) 976 if err != nil { 977 l.Error("invalid record", "err", err) 978 return err 979 } 980 981 // only knot owner can invite to knots 982 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 983 if err != nil { 984 return fmt.Errorf("failed to check invite permission: %w", err) 985 } 986 if !ok { 987 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 988 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 989 } 990 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 991 if err != nil { 992 return fmt.Errorf("failed to re-check invite permission: %w", err) 993 } 994 if !ok { 995 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 996 } 997 } 998 999 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 1000 if err != nil { 1001 return err 1002 } 1003 1004 if memberId.Handle.IsInvalidHandle() { 1005 return fmt.Errorf("invalid handle for member %s", record.Subject) 1006 } 1007 1008 existing, err := db.GetKnotMembers(i.Db, 1009 orm.FilterEq("did", did), 1010 orm.FilterEq("rkey", e.Commit.RKey), 1011 ) 1012 if err != nil { 1013 return fmt.Errorf("failed to look up existing member: %w", err) 1014 } 1015 if len(existing) > 1 { 1016 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1017 } 1018 1019 tx, err := i.Db.Begin() 1020 if err != nil { 1021 return fmt.Errorf("failed to start txn: %w", err) 1022 } 1023 committed := false 1024 defer func() { 1025 if committed { 1026 return 1027 } 1028 tx.Rollback() 1029 i.Enforcer.E.LoadPolicy() 1030 }() 1031 1032 if len(existing) == 1 { 1033 prev := existing[0] 1034 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1035 if err = db.RemoveKnotMember(tx, 1036 orm.FilterEq("did", did), 1037 orm.FilterEq("rkey", e.Commit.RKey), 1038 ); err != nil { 1039 return fmt.Errorf("failed to remove stale row: %w", err) 1040 } 1041 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1042 return fmt.Errorf("failed to remove stale ACL: %w", err) 1043 } 1044 } 1045 } 1046 1047 if err = db.AddKnotMember(tx, models.KnotMember{ 1048 Did: syntax.DID(did), 1049 Rkey: e.Commit.RKey, 1050 Domain: record.Domain, 1051 Subject: memberId.DID, 1052 }); err != nil { 1053 return fmt.Errorf("failed to add to db: %w", err) 1054 } 1055 1056 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1057 return fmt.Errorf("failed to update ACLs: %w", err) 1058 } 1059 1060 if err = tx.Commit(); err != nil { 1061 return fmt.Errorf("failed to commit txn: %w", err) 1062 } 1063 1064 if err = i.Enforcer.E.SavePolicy(); err != nil { 1065 return fmt.Errorf("failed to save ACLs: %w", err) 1066 } 1067 committed = true 1068 1069 l.Info("upserted knot member") 1070 case jmodels.CommitOperationDelete: 1071 rkey := e.Commit.RKey 1072 1073 members, err := db.GetKnotMembers( 1074 i.Db, 1075 orm.FilterEq("did", did), 1076 orm.FilterEq("rkey", rkey), 1077 ) 1078 if err != nil { 1079 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1080 } 1081 if len(members) == 0 { 1082 l.Info("knot member already removed", "rkey", rkey) 1083 return nil 1084 } 1085 if len(members) > 1 { 1086 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1087 } 1088 member := members[0] 1089 1090 tx, err := i.Db.Begin() 1091 if err != nil { 1092 return fmt.Errorf("failed to start txn: %w", err) 1093 } 1094 committed := false 1095 defer func() { 1096 if committed { 1097 return 1098 } 1099 tx.Rollback() 1100 i.Enforcer.E.LoadPolicy() 1101 }() 1102 1103 if err = db.RemoveKnotMember( 1104 tx, 1105 orm.FilterEq("did", did), 1106 orm.FilterEq("rkey", rkey), 1107 ); err != nil { 1108 return fmt.Errorf("failed to remove from db: %w", err) 1109 } 1110 1111 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1112 return fmt.Errorf("failed to update ACLs: %w", err) 1113 } 1114 1115 if err = tx.Commit(); err != nil { 1116 return fmt.Errorf("failed to commit txn: %w", err) 1117 } 1118 1119 if err = i.Enforcer.E.SavePolicy(); err != nil { 1120 return fmt.Errorf("failed to save ACLs: %w", err) 1121 } 1122 committed = true 1123 1124 l.Info("removed knot member") 1125 } 1126 1127 return nil 1128} 1129 1130func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1131 did := e.Did 1132 var err error 1133 1134 l = l.With("handler", "ingestKnot") 1135 1136 switch e.Commit.Operation { 1137 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1138 raw := json.RawMessage(e.Commit.Record) 1139 record := tangled.Knot{} 1140 err = json.Unmarshal(raw, &record) 1141 if err != nil { 1142 l.Error("invalid record", "err", err) 1143 return err 1144 } 1145 1146 domain := e.Commit.RKey 1147 1148 err := db.AddKnot(i.Db, domain, did) 1149 if err != nil { 1150 l.Error("failed to add knot to db", "err", err, "domain", domain) 1151 return err 1152 } 1153 1154 if err := i.verifyKnot(ctx, domain, did); err != nil { 1155 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1156 } 1157 1158 l.Info("ingested record", "domain", domain) 1159 return nil 1160 1161 case jmodels.CommitOperationDelete: 1162 domain := e.Commit.RKey 1163 1164 // get record from db first 1165 registrations, err := db.GetRegistrations( 1166 i.Db, 1167 orm.FilterEq("domain", domain), 1168 orm.FilterEq("did", did), 1169 ) 1170 if err != nil { 1171 return fmt.Errorf("failed to get registration: %w", err) 1172 } 1173 if len(registrations) != 1 { 1174 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1175 } 1176 registration := registrations[0] 1177 1178 tx, err := i.Db.Begin() 1179 if err != nil { 1180 return fmt.Errorf("failed to start txn: %w", err) 1181 } 1182 defer func() { 1183 tx.Rollback() 1184 i.Enforcer.E.LoadPolicy() 1185 }() 1186 1187 err = db.RemoveKnotMember( 1188 tx, 1189 orm.FilterEq("did", did), 1190 orm.FilterEq("domain", domain), 1191 ) 1192 if err != nil { 1193 return fmt.Errorf("failed to remove knot members: %w", err) 1194 } 1195 1196 err = db.DeleteKnot( 1197 tx, 1198 orm.FilterEq("did", did), 1199 orm.FilterEq("domain", domain), 1200 ) 1201 if err != nil { 1202 return fmt.Errorf("failed to delete knot: %w", err) 1203 } 1204 1205 err = db.RemoveReposByKnot(tx, domain) 1206 if err != nil { 1207 return fmt.Errorf("failed to remove repos by knot: %w", err) 1208 } 1209 1210 if registration.Registered != nil { 1211 err = i.Enforcer.RemoveKnot(domain) 1212 if err != nil { 1213 return fmt.Errorf("failed to remove knot from enforcer: %w", err) 1214 } 1215 } 1216 1217 err = tx.Commit() 1218 if err != nil { 1219 return fmt.Errorf("failed to commit txn: %w", err) 1220 } 1221 1222 err = i.Enforcer.E.SavePolicy() 1223 if err != nil { 1224 return fmt.Errorf("failed to save ACLs: %w", err) 1225 } 1226 1227 l.Info("ingested record", "domain", domain) 1228 } 1229 1230 return nil 1231} 1232 1233const ( 1234 verifyAttempts = 4 1235 verifyMinDelay = 1 * time.Second 1236 verifyMaxDelay = 5 * time.Second 1237) 1238 1239func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1240 regs, err := db.GetRegistrations(i.Db, 1241 orm.FilterEq("domain", domain), 1242 orm.FilterEq("did", did), 1243 ) 1244 if err != nil { 1245 return fmt.Errorf("look up registration: %w", err) 1246 } 1247 if len(regs) != 1 { 1248 return fmt.Errorf("no registration for %s by %s", domain, did) 1249 } 1250 if regs[0].Registered != nil { 1251 return nil 1252 } 1253 1254 err = retry.Do( 1255 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1256 retry.Context(ctx), 1257 retry.Attempts(verifyAttempts), 1258 retry.Delay(verifyMinDelay), 1259 retry.MaxDelay(verifyMaxDelay), 1260 retry.DelayType(retry.BackOffDelay), 1261 retry.LastErrorOnly(true), 1262 ) 1263 if err != nil { 1264 return fmt.Errorf("verify: %w", err) 1265 } 1266 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1267} 1268 1269func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1270 spindles, err := db.GetSpindles(ctx, i.Db, 1271 orm.FilterEq("instance", instance), 1272 orm.FilterEq("owner", did), 1273 ) 1274 if err != nil { 1275 return fmt.Errorf("look up spindle: %w", err) 1276 } 1277 if len(spindles) != 1 { 1278 return fmt.Errorf("no spindle for %s by %s", instance, did) 1279 } 1280 if spindles[0].Verified != nil { 1281 return nil 1282 } 1283 1284 err = retry.Do( 1285 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1286 retry.Context(ctx), 1287 retry.Attempts(verifyAttempts), 1288 retry.Delay(verifyMinDelay), 1289 retry.MaxDelay(verifyMaxDelay), 1290 retry.DelayType(retry.BackOffDelay), 1291 retry.LastErrorOnly(true), 1292 ) 1293 if err != nil { 1294 return fmt.Errorf("verify: %w", err) 1295 } 1296 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1297 return err 1298} 1299 1300const sweepConcurrency = 4 1301 1302func (i *Ingester) SweepPendingVerifications() { 1303 l := i.Logger.With("handler", "SweepPendingVerifications") 1304 1305 var g errgroup.Group 1306 g.SetLimit(sweepConcurrency) 1307 1308 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1309 if err != nil { 1310 l.Error("failed to list unverified knots", "err", err) 1311 } else { 1312 for _, reg := range regs { 1313 g.Go(func() error { 1314 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1315 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1316 } 1317 return nil 1318 }) 1319 } 1320 } 1321 1322 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1323 if err != nil { 1324 l.Error("failed to list unverified spindles", "err", err) 1325 g.Wait() 1326 return 1327 } 1328 for _, s := range spindles { 1329 g.Go(func() error { 1330 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1331 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1332 } 1333 return nil 1334 }) 1335 } 1336 g.Wait() 1337} 1338 1339func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1340 did := e.Did 1341 rkey := e.Commit.RKey 1342 1343 var err error 1344 1345 l = l.With("handler", "ingestIssue") 1346 1347 switch e.Commit.Operation { 1348 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1349 raw := json.RawMessage(e.Commit.Record) 1350 record := tangled.RepoIssue{} 1351 err = json.Unmarshal(raw, &record) 1352 if err != nil { 1353 l.Error("invalid record", "err", err) 1354 return err 1355 } 1356 1357 issue := models.IssueFromRecord(did, rkey, record) 1358 1359 if issue.RepoDid == "" { 1360 return fmt.Errorf("issue record has no repo field") 1361 } 1362 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1363 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1364 } 1365 1366 if err := i.Validator.ValidateIssue(&issue); err != nil { 1367 return fmt.Errorf("failed to validate issue: %w", err) 1368 } 1369 1370 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1371 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1372 if repoErr == nil && repo.RepoDid != "" { 1373 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1374 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1375 } 1376 } 1377 } 1378 1379 tx, err := i.Db.BeginTx(ctx, nil) 1380 if err != nil { 1381 l.Error("failed to begin transaction", "err", err) 1382 return err 1383 } 1384 defer tx.Rollback() 1385 1386 err = db.PutIssue(tx, &issue) 1387 if err != nil { 1388 l.Error("failed to create issue", "err", err) 1389 return err 1390 } 1391 1392 err = tx.Commit() 1393 if err != nil { 1394 l.Error("failed to commit txn", "err", err) 1395 return err 1396 } 1397 1398 l.Info("ingested record") 1399 return nil 1400 1401 case jmodels.CommitOperationDelete: 1402 tx, err := i.Db.BeginTx(ctx, nil) 1403 if err != nil { 1404 l.Error("failed to begin transaction", "err", err) 1405 return err 1406 } 1407 defer tx.Rollback() 1408 1409 if err := db.DeleteIssues( 1410 tx, 1411 did, 1412 rkey, 1413 ); err != nil { 1414 l.Error("failed to delete", "err", err) 1415 return fmt.Errorf("failed to delete issue record: %w", err) 1416 } 1417 if err := tx.Commit(); err != nil { 1418 l.Error("failed to commit txn", "err", err) 1419 return err 1420 } 1421 1422 l.Info("ingested record") 1423 return nil 1424 } 1425 1426 return nil 1427} 1428 1429func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1430 did := e.Did 1431 rkey := e.Commit.RKey 1432 1433 var err error 1434 1435 l = l.With("handler", "ingestPull") 1436 1437 switch e.Commit.Operation { 1438 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1439 raw := json.RawMessage(e.Commit.Record) 1440 record := tangled.RepoPull{} 1441 err = json.Unmarshal(raw, &record) 1442 if err != nil { 1443 l.Error("invalid record", "err", err) 1444 return err 1445 } 1446 1447 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1448 if err != nil { 1449 l.Error("failed to resolve did", "err", err) 1450 return err 1451 } 1452 1453 // go through and fetch all blobs in parallel 1454 readers := make([]*io.ReadCloser, len(record.Rounds)) 1455 var mu sync.Mutex 1456 1457 g, gctx := errgroup.WithContext(ctx) 1458 1459 for idx, b := range record.Rounds { 1460 g.Go(func() error { 1461 // for some reason, a blob is empty 1462 if b.PatchBlob == nil { 1463 return fmt.Errorf("missing patchBlob in round %d", idx) 1464 } 1465 1466 ownerPds := ownerId.PDSEndpoint() 1467 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1468 q := url.Query() 1469 q.Set("cid", b.PatchBlob.Ref.String()) 1470 q.Set("did", did) 1471 url.RawQuery = q.Encode() 1472 1473 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1474 if err != nil { 1475 l.Error("failed to create request") 1476 return err 1477 } 1478 req.Header.Set("Content-Type", "application/json") 1479 1480 resp, err := http.DefaultClient.Do(req) 1481 if err != nil { 1482 l.Error("failed to make request") 1483 return err 1484 } 1485 1486 mu.Lock() 1487 readers[idx] = &resp.Body 1488 mu.Unlock() 1489 1490 return nil 1491 }) 1492 } 1493 1494 if err := g.Wait(); err != nil { 1495 for _, r := range readers { 1496 if r != nil && *r != nil { 1497 (*r).Close() 1498 } 1499 } 1500 return err 1501 } 1502 1503 defer func() { 1504 for _, r := range readers { 1505 if r != nil && *r != nil { 1506 (*r).Close() 1507 } 1508 } 1509 }() 1510 1511 pull, err := models.PullFromRecord(did, rkey, record, readers) 1512 if err != nil { 1513 return fmt.Errorf("failed to parse pull from record: %w", err) 1514 } 1515 if err := i.Validator.ValidatePull(pull); err != nil { 1516 return fmt.Errorf("failed to validate pull: %w", err) 1517 } 1518 1519 tx, err := i.Db.BeginTx(ctx, nil) 1520 if err != nil { 1521 l.Error("failed to begin transaction", "err", err) 1522 return err 1523 } 1524 defer tx.Rollback() 1525 1526 err = db.PutPull(tx, pull) 1527 if err != nil { 1528 l.Error("failed to create pull", "err", err) 1529 return err 1530 } 1531 1532 err = tx.Commit() 1533 if err != nil { 1534 l.Error("failed to commit txn", "err", err) 1535 return err 1536 } 1537 1538 l.Info("ingested record") 1539 return nil 1540 1541 case jmodels.CommitOperationDelete: 1542 tx, err := i.Db.BeginTx(ctx, nil) 1543 if err != nil { 1544 l.Error("failed to begin transaction", "err", err) 1545 return err 1546 } 1547 defer tx.Rollback() 1548 1549 if err := db.AbandonPulls( 1550 tx, 1551 orm.FilterEq("owner_did", did), 1552 orm.FilterEq("rkey", rkey), 1553 ); err != nil { 1554 l.Error("failed to abandon", "err", err) 1555 return fmt.Errorf("failed to abandon pull record: %w", err) 1556 } 1557 if err := tx.Commit(); err != nil { 1558 l.Error("failed to commit txn", "err", err) 1559 return err 1560 } 1561 1562 l.Info("ingested record") 1563 return nil 1564 } 1565 1566 return nil 1567} 1568 1569// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1570func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error { 1571 l = l.With("handler", "ingestIssueComment") 1572 1573 switch e.Commit.Operation { 1574 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1575 // no-op. sh.tangled.repo.issue.comment is deprecated 1576 1577 case jmodels.CommitOperationDelete: 1578 if err := db.PurgeComments( 1579 i.Db, 1580 orm.FilterEq("did", e.Did), 1581 orm.FilterEq("collection", e.Commit.Collection), 1582 orm.FilterEq("rkey", e.Commit.RKey), 1583 ); err != nil { 1584 return fmt.Errorf("failed to delete comment record: %w", err) 1585 } 1586 } 1587 1588 l.Info("ingested record") 1589 return nil 1590} 1591 1592// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1593func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error { 1594 l = l.With("handler", "ingestPullComment") 1595 1596 switch e.Commit.Operation { 1597 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1598 // no-op. sh.tangled.repo.pull.comment is deprecated 1599 1600 case jmodels.CommitOperationDelete: 1601 if err := db.PurgeComments( 1602 i.Db, 1603 orm.FilterEq("did", e.Did), 1604 orm.FilterEq("collection", e.Commit.Collection), 1605 orm.FilterEq("rkey", e.Commit.RKey), 1606 ); err != nil { 1607 return fmt.Errorf("failed to delete comment record: %w", err) 1608 } 1609 } 1610 1611 l.Info("ingested record") 1612 return nil 1613} 1614 1615func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error { 1616 did := e.Did 1617 rkey := e.Commit.RKey 1618 cid := e.Commit.CID 1619 1620 var err error 1621 1622 l = l.With("handler", "ingestComment") 1623 1624 ctx := context.Background() 1625 1626 switch e.Commit.Operation { 1627 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1628 raw := json.RawMessage(e.Commit.Record) 1629 record := tangled.FeedComment{} 1630 err = json.Unmarshal(raw, &record) 1631 if err != nil { 1632 return fmt.Errorf("invalid record: %w", err) 1633 } 1634 1635 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1636 if err != nil { 1637 return fmt.Errorf("failed to parse comment from record: %w", err) 1638 } 1639 1640 if err := comment.Validate(); err != nil { 1641 return fmt.Errorf("failed to validate comment: %w", err) 1642 } 1643 1644 var references []syntax.ATURI 1645 if comment.Body.Original != nil { 1646 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1647 } 1648 1649 tx, err := i.Db.Begin() 1650 if err != nil { 1651 return fmt.Errorf("failed to start transaction: %w", err) 1652 } 1653 defer tx.Rollback() 1654 1655 _, err = db.PutComment(tx, comment, references) 1656 if err != nil { 1657 return fmt.Errorf("failed to create comment: %w", err) 1658 } 1659 1660 if err := tx.Commit(); err != nil { 1661 return err 1662 } 1663 1664 case jmodels.CommitOperationDelete: 1665 if err := db.DeleteComments( 1666 i.Db, 1667 orm.FilterEq("did", did), 1668 orm.FilterEq("collection", e.Commit.Collection), 1669 orm.FilterEq("rkey", rkey), 1670 ); err != nil { 1671 return fmt.Errorf("failed to delete comment record: %w", err) 1672 } 1673 } 1674 1675 l.Info("ingested record") 1676 return nil 1677} 1678 1679func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error { 1680 did := e.Did 1681 rkey := e.Commit.RKey 1682 1683 l = l.With("handler", "ingestReaction") 1684 1685 switch e.Commit.Operation { 1686 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1687 raw := json.RawMessage(e.Commit.Record) 1688 record := tangled.FeedReaction{} 1689 if err := json.Unmarshal(raw, &record); err != nil { 1690 return fmt.Errorf("invalid record: %w", err) 1691 } 1692 1693 subjectUri, err := syntax.ParseATURI(record.Subject) 1694 if err != nil { 1695 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1696 } 1697 subjectUri = models.NormalizeReactionSubject(subjectUri) 1698 1699 kind, ok := models.ParseReactionKind(record.Reaction) 1700 if !ok { 1701 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1702 } 1703 1704 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1705 if parseErr != nil { 1706 created = time.Now() 1707 } 1708 1709 tx, err := i.Db.Begin() 1710 if err != nil { 1711 return fmt.Errorf("failed to start transaction: %w", err) 1712 } 1713 defer tx.Rollback() 1714 1715 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1716 return fmt.Errorf("failed to clear existing reaction: %w", err) 1717 } 1718 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1719 return fmt.Errorf("failed to add reaction: %w", err) 1720 } 1721 1722 if err := tx.Commit(); err != nil { 1723 return err 1724 } 1725 1726 case jmodels.CommitOperationDelete: 1727 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1728 return fmt.Errorf("failed to delete reaction record: %w", err) 1729 } 1730 } 1731 1732 l.Info("ingested record") 1733 return nil 1734} 1735 1736func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error { 1737 did := e.Did 1738 rkey := e.Commit.RKey 1739 1740 var err error 1741 1742 l = l.With("handler", "ingestLabelDefinition") 1743 1744 switch e.Commit.Operation { 1745 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1746 raw := json.RawMessage(e.Commit.Record) 1747 record := tangled.LabelDefinition{} 1748 err = json.Unmarshal(raw, &record) 1749 if err != nil { 1750 return fmt.Errorf("invalid record: %w", err) 1751 } 1752 1753 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1754 if err != nil { 1755 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1756 } 1757 1758 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1759 return fmt.Errorf("failed to validate labeldef: %w", err) 1760 } 1761 1762 _, err = db.AddLabelDefinition(i.Db, def) 1763 if err != nil { 1764 return fmt.Errorf("failed to create labeldef: %w", err) 1765 } 1766 1767 l.Info("ingested record") 1768 return nil 1769 1770 case jmodels.CommitOperationDelete: 1771 if err := db.DeleteLabelDefinition( 1772 i.Db, 1773 orm.FilterEq("did", did), 1774 orm.FilterEq("rkey", rkey), 1775 ); err != nil { 1776 return fmt.Errorf("failed to delete labeldef record: %w", err) 1777 } 1778 1779 l.Info("ingested record") 1780 return nil 1781 } 1782 1783 return nil 1784} 1785 1786func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1787 did := e.Did 1788 rkey := e.Commit.RKey 1789 1790 var err error 1791 1792 l = l.With("handler", "ingestLabelOp") 1793 1794 switch e.Commit.Operation { 1795 case jmodels.CommitOperationCreate: 1796 raw := json.RawMessage(e.Commit.Record) 1797 record := tangled.LabelOp{} 1798 err = json.Unmarshal(raw, &record) 1799 if err != nil { 1800 return fmt.Errorf("invalid record: %w", err) 1801 } 1802 1803 subject := syntax.ATURI(record.Subject) 1804 collection := subject.Collection() 1805 1806 var repo *models.Repo 1807 switch collection { 1808 case tangled.RepoIssueNSID: 1809 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1810 if err != nil || len(i) != 1 { 1811 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1812 } 1813 repo = i[0].Repo 1814 case tangled.RepoPullNSID: 1815 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1816 if err != nil || len(p) != 1 { 1817 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1818 } 1819 repo = p[0].Repo 1820 default: 1821 return fmt.Errorf("unsupported label subject: %s", collection) 1822 } 1823 1824 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1825 if err != nil { 1826 return fmt.Errorf("failed to build label application ctx: %w", err) 1827 } 1828 1829 ops := models.LabelOpsFromRecord(did, rkey, record) 1830 1831 for _, o := range ops { 1832 def, ok := actx.Defs[o.OperandKey] 1833 if !ok { 1834 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1835 } 1836 if err := i.Validator.ValidateLabelOp(ctx, def, repo, &o); err != nil { 1837 if !errors.Is(err, knotacl.ErrKnotUnreachable) { 1838 return fmt.Errorf("failed to validate labelop: %w", err) 1839 } 1840 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", err) 1841 } 1842 } 1843 1844 tx, err := i.Db.Begin() 1845 if err != nil { 1846 return err 1847 } 1848 defer tx.Rollback() 1849 1850 for _, o := range ops { 1851 _, err = db.AddLabelOp(tx, &o) 1852 if err != nil { 1853 return fmt.Errorf("failed to add labelop: %w", err) 1854 } 1855 } 1856 1857 if err = tx.Commit(); err != nil { 1858 return err 1859 } 1860 1861 l.Info("ingested record") 1862 } 1863 1864 return nil 1865}