Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/knotacl" 31 "tangled.org/core/appview/mentions" 32 "tangled.org/core/appview/models" 33 "tangled.org/core/appview/notify" 34 "tangled.org/core/appview/repoverify" 35 "tangled.org/core/appview/serververify" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/orm" 38 "tangled.org/core/rbac" 39) 40 41type Ingester struct { 42 Ctx context.Context 43 Db *db.DB 44 Enforcer *rbac.Enforcer 45 Acl *knotacl.Service 46 IdResolver *idresolver.Resolver 47 Cache *cache.Cache 48 Config *config.Config 49 Logger *slog.Logger 50 MentionsResolver *mentions.Resolver 51 Notifier notify.Notifier 52 Verifier repoverify.Verifier 53} 54 55type processFunc func(ctx context.Context, e *jmodels.Event) error 56 57func (i *Ingester) Ingest() processFunc { 58 return func(ctx context.Context, e *jmodels.Event) error { 59 var err error 60 61 l := i.Logger.With("kind", e.Kind) 62 switch e.Kind { 63 case jmodels.EventKindAccount: 64 // TODO: sync account state to db 65 if e.Account.Active { 66 break 67 } 68 // TODO: revoke sessions by DID 69 if *e.Account.Status == "deactivated" { 70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 71 } 72 case jmodels.EventKindIdentity: 73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 74 case jmodels.EventKindCommit: 75 l = l.With( 76 "nsid", e.Commit.Collection, 77 "did", e.Did, 78 "rkey", e.Commit.RKey, 79 "op", e.Commit.Operation, 80 ) 81 switch e.Commit.Collection { 82 case tangled.GraphFollowNSID: 83 err = i.ingestFollow(e, l) 84 case tangled.GraphVouchNSID: 85 err = i.ingestVouch(ctx, e, l) 86 case tangled.FeedStarNSID: 87 err = i.ingestStar(ctx, e, l) 88 case tangled.FeedReactionNSID: 89 err = i.ingestReaction(e, l) 90 case tangled.PublicKeyNSID: 91 err = i.ingestPublicKey(e, l) 92 case tangled.RepoArtifactNSID: 93 err = i.ingestArtifact(ctx, e, l) 94 case tangled.ActorProfileNSID: 95 err = i.ingestProfile(ctx, e, l) 96 case tangled.SpindleMemberNSID: 97 err = i.ingestSpindleMember(ctx, e, l) 98 case tangled.SpindleNSID: 99 err = i.ingestSpindle(ctx, e, l) 100 case tangled.KnotMemberNSID: 101 err = i.ingestKnotMember(ctx, e, l) 102 case tangled.KnotNSID: 103 err = i.ingestKnot(ctx, e, l) 104 case tangled.StringNSID: 105 err = i.ingestString(e, l) 106 case tangled.RepoIssueNSID: 107 err = i.ingestIssue(ctx, e, l) 108 case tangled.RepoPullNSID: 109 err = i.ingestPull(ctx, e, l) 110 case tangled.FeedCommentNSID: 111 err = i.ingestComment(e, l) 112 case tangled.RepoIssueCommentNSID: 113 err = i.ingestIssueComment(e, l) 114 case tangled.RepoPullCommentNSID: 115 err = i.ingestPullComment(e, l) 116 case tangled.LabelDefinitionNSID: 117 err = i.ingestLabelDefinition(e, l) 118 case tangled.LabelOpNSID: 119 err = i.ingestLabelOp(ctx, e, l) 120 case tangled.RepoNSID: 121 err = i.ingestRepo(ctx, e, l) 122 } 123 } 124 125 if err != nil { 126 l.Warn("failed to ingest record, skipping", "err", err) 127 } 128 129 lastTimeUs := e.TimeUS + 1 130 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 131 l.Error("failed to save cursor", "err", saveErr) 132 } 133 134 return nil 135 } 136} 137 138func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 139 if strings.HasPrefix(ref, "did:") { 140 return db.GetRepoByDid(i.Db, ref) 141 } 142 return db.GetRepoByAtUri(i.Db, ref) 143} 144 145func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 146 var legacy struct { 147 Subject *string `json:"subject"` 148 SubjectDid *string `json:"subjectDid"` 149 } 150 if err := json.Unmarshal(raw, &legacy); err != nil { 151 return false, err 152 } 153 154 switch { 155 case legacy.SubjectDid != nil: 156 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 157 if err != nil { 158 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 159 return false, nil 160 } 161 star.SubjectType = models.StarSubjectRepo 162 star.Subject = repo.RepoDid 163 return true, nil 164 165 case legacy.Subject != nil: 166 uri, err := syntax.ParseATURI(*legacy.Subject) 167 if err != nil { 168 return false, fmt.Errorf("invalid old-format star subject: %w", err) 169 } 170 switch uri.Collection().String() { 171 case tangled.RepoNSID: 172 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 173 if err != nil { 174 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 175 return false, nil 176 } 177 star.SubjectType = models.StarSubjectRepo 178 star.Subject = repo.RepoDid 179 return true, nil 180 default: 181 star.SubjectType = models.StarSubjectString 182 star.Subject = *legacy.Subject 183 return true, nil 184 } 185 186 default: 187 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 188 } 189} 190 191func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 192 var err error 193 did := e.Did 194 195 l = l.With("handler", "ingestStar") 196 197 switch e.Commit.Operation { 198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 199 raw := json.RawMessage(e.Commit.Record) 200 record := tangled.FeedStar{} 201 unmarshalErr := json.Unmarshal(raw, &record) 202 203 star := models.Star{ 204 Did: did, 205 Rkey: e.Commit.RKey, 206 } 207 208 switch { 209 case unmarshalErr != nil: 210 resolved, resolveErr := i.resolveOldFormatStar(raw, &star, l) 211 if resolveErr != nil { 212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 213 return unmarshalErr 214 } 215 if !resolved { 216 return nil 217 } 218 219 case record.Subject == nil: 220 return fmt.Errorf("star record has nil subject") 221 222 case record.Subject.FeedStar_Repo != nil: 223 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 224 if repoErr != nil { 225 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 226 return nil 227 } 228 star.SubjectType = models.StarSubjectRepo 229 star.Subject = repo.RepoDid 230 231 case record.Subject.FeedStar_String != nil: 232 star.SubjectType = models.StarSubjectString 233 star.Subject = record.Subject.FeedStar_String.Uri 234 235 default: 236 return fmt.Errorf("star record has empty subject union") 237 } 238 239 err = db.UpsertStar(i.Db, star) 240 case jmodels.CommitOperationDelete: 241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 242 } 243 244 if err != nil { 245 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 246 } 247 l.Info("processed star", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 248 249 l.Info("ingested record") 250 return nil 251} 252 253func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error { 254 var err error 255 did := e.Did 256 257 l = l.With("handler", "ingestFollow") 258 259 switch e.Commit.Operation { 260 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 261 raw := json.RawMessage(e.Commit.Record) 262 record := tangled.GraphFollow{} 263 err = json.Unmarshal(raw, &record) 264 if err != nil { 265 l.Error("invalid record", "err", err) 266 return err 267 } 268 269 err = db.UpsertFollow(i.Db, models.Follow{ 270 UserDid: did, 271 SubjectDid: record.Subject, 272 Rkey: e.Commit.RKey, 273 }) 274 case jmodels.CommitOperationDelete: 275 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 276 } 277 278 if err != nil { 279 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 280 } 281 l.Info("processed follow", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 282 283 l.Info("ingested record") 284 return nil 285} 286 287func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 288 var err error 289 did := e.Did 290 291 l = l.With("handler", "ingestVouch") 292 293 switch e.Commit.Operation { 294 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 295 raw := json.RawMessage(e.Commit.Record) 296 record := tangled.GraphVouch{} 297 err = json.Unmarshal(raw, &record) 298 if err != nil { 299 l.Error("invalid record", "err", err) 300 return err 301 } 302 303 // rkey is the subject_did being vouched for/denounced 304 subjectDID := e.Commit.RKey 305 306 _, err = syntax.ParseDID(subjectDID) 307 if err != nil { 308 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 309 return fmt.Errorf("invalid subject_did: %w", err) 310 } 311 312 if did == subjectDID { 313 l.Warn("attempted self-vouch", "did", did) 314 return fmt.Errorf("cannot vouch for self") 315 } 316 317 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 318 if err != nil { 319 return err 320 } 321 322 if subjectId.Handle.IsInvalidHandle() { 323 return err 324 } 325 326 kind, err := models.ParseVouchKind(record.Kind) 327 if err != nil { 328 l.Error("invalid kind", "kind", kind) 329 return fmt.Errorf("invalid kind: %s", kind) 330 } 331 332 recordCid, err := cid.Parse(e.Commit.CID) 333 if err != nil { 334 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 335 return fmt.Errorf("invalid cid: %w", err) 336 } 337 338 var evidences []syntax.ATURI 339 for _, raw := range record.Evidences { 340 uri, parseErr := syntax.ParseATURI(raw) 341 if parseErr != nil { 342 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 343 continue 344 } 345 evidences = append(evidences, uri) 346 } 347 348 tx, txErr := i.Db.Begin() 349 if txErr != nil { 350 return fmt.Errorf("failed to start transaction: %w", txErr) 351 } 352 353 addErr := db.AddVouch(tx, &models.Vouch{ 354 Did: syntax.DID(did), 355 SubjectDid: subjectId.DID, 356 Cid: recordCid, 357 Kind: kind, 358 Reason: record.Reason, 359 Evidences: evidences, 360 }) 361 if addErr != nil { 362 tx.Rollback() 363 err = addErr 364 } else { 365 err = tx.Commit() 366 } 367 368 case jmodels.CommitOperationDelete: 369 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 370 } 371 372 if err != nil { 373 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 374 } 375 376 l.Info("ingested record") 377 return nil 378} 379 380func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error { 381 did := e.Did 382 var err error 383 384 l = l.With("handler", "ingestPublicKey") 385 386 switch e.Commit.Operation { 387 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 388 l.Debug("processing add of pubkey") 389 raw := json.RawMessage(e.Commit.Record) 390 record := tangled.PublicKey{} 391 err = json.Unmarshal(raw, &record) 392 if err != nil { 393 l.Error("invalid record", "err", err) 394 return err 395 } 396 pubKey, err := models.PublicKeyFromRecord(syntax.DID(did), syntax.RecordKey(e.Commit.RKey), record) 397 if err != nil { 398 l.Error("invalid record", "err", err) 399 return err 400 } 401 if err := pubKey.Validate(); err != nil { 402 l.Error("invalid record", "err", err) 403 return err 404 } 405 406 err = db.UpsertPublicKey(i.Db, pubKey) 407 case jmodels.CommitOperationDelete: 408 l.Debug("processing delete of pubkey") 409 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 410 } 411 412 if err != nil { 413 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 414 } 415 l.Info("processed pubkey", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 416 417 l.Info("ingested record") 418 return nil 419} 420 421func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 422 did := e.Did 423 var err error 424 425 l = l.With("handler", "ingestArtifact") 426 427 switch e.Commit.Operation { 428 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 429 raw := json.RawMessage(e.Commit.Record) 430 record := tangled.RepoArtifact{} 431 err = json.Unmarshal(raw, &record) 432 if err != nil { 433 l.Error("invalid record", "err", err) 434 return err 435 } 436 437 var repo *models.Repo 438 if record.RepoDid != nil && *record.RepoDid != "" { 439 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 440 if err != nil && !errors.Is(err, sql.ErrNoRows) { 441 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 442 } 443 } 444 if repo == nil && record.Repo != nil { 445 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 446 if parseErr != nil { 447 return parseErr 448 } 449 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 450 if err != nil { 451 return err 452 } 453 } 454 if repo == nil { 455 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 456 } 457 458 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push") 459 if permErr != nil { 460 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr) 461 } else if !allowed { 462 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier()) 463 return nil 464 } 465 466 repoDid := repo.RepoDid 467 if repoDid == "" && record.RepoDid != nil { 468 repoDid = *record.RepoDid 469 } 470 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 471 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 472 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 473 } 474 } 475 476 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 477 if parseErr != nil { 478 createdAt = time.Now() 479 } 480 481 artifact := models.Artifact{ 482 Did: did, 483 Rkey: e.Commit.RKey, 484 RepoDid: syntax.DID(repo.RepoDid), 485 Tag: plumbing.Hash(record.Tag), 486 CreatedAt: createdAt, 487 BlobCid: cid.Cid(record.Artifact.Ref), 488 Name: record.Name, 489 Size: uint64(record.Artifact.Size), 490 MimeType: record.Artifact.MimeType, 491 } 492 493 err = db.AddArtifact(i.Db, artifact) 494 case jmodels.CommitOperationDelete: 495 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 496 } 497 498 if err != nil { 499 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 500 } 501 502 l.Info("ingested record") 503 return nil 504} 505 506func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 507 did := e.Did 508 var err error 509 510 l = l.With("handler", "ingestProfile") 511 512 if e.Commit.RKey != "self" { 513 return fmt.Errorf("ingestProfile only ingests `self` record") 514 } 515 516 switch e.Commit.Operation { 517 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 518 raw := json.RawMessage(e.Commit.Record) 519 record := tangled.ActorProfile{} 520 err = json.Unmarshal(raw, &record) 521 if err != nil { 522 l.Error("invalid record", "err", err) 523 return err 524 } 525 526 avatar := "" 527 if record.Avatar != nil { 528 avatar = record.Avatar.Ref.String() 529 } 530 531 description := "" 532 if record.Description != nil { 533 description = *record.Description 534 } 535 536 includeBluesky := record.Bluesky 537 538 pronouns := "" 539 if record.Pronouns != nil { 540 pronouns = *record.Pronouns 541 } 542 543 location := "" 544 if record.Location != nil { 545 location = *record.Location 546 } 547 548 var links [5]string 549 for i, l := range record.Links { 550 if i < 5 { 551 links[i] = l 552 } 553 } 554 555 var stats [2]models.VanityStat 556 for i, s := range record.Stats { 557 if i < 2 { 558 stats[i].Kind = models.ParseVanityStatKind(s) 559 } 560 } 561 562 var pinned [6]string 563 for i, r := range record.PinnedRepositories { 564 if i < 6 { 565 pinned[i] = r 566 } 567 } 568 569 var preferredHandle syntax.Handle 570 if record.PreferredHandle != nil { 571 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 572 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 573 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 574 preferredHandle = h 575 } 576 } 577 } 578 579 profile := models.Profile{ 580 Did: did, 581 Avatar: avatar, 582 Description: description, 583 IncludeBluesky: includeBluesky, 584 Location: location, 585 Links: links, 586 Stats: stats, 587 PinnedRepos: pinned, 588 Pronouns: pronouns, 589 PreferredHandle: preferredHandle, 590 } 591 592 tx, err := i.Db.Begin() 593 if err != nil { 594 return fmt.Errorf("failed to start transaction: %w", err) 595 } 596 defer tx.Rollback() 597 598 err = db.ValidateProfile(tx, &profile) 599 if err != nil { 600 return fmt.Errorf("invalid profile record") 601 } 602 603 err = db.UpsertProfile(tx, &profile) 604 if err != nil { 605 return fmt.Errorf("upserting profile: %w", err) 606 } 607 608 err = tx.Commit() 609 if err != nil { 610 return fmt.Errorf("tx.Commit: %w", err) 611 } 612 if i.Cache != nil { 613 pipe := i.Cache.Pipeline() 614 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 615 if preferredHandle != "" { 616 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 617 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 618 } else { 619 pipe.Del(ctx, didKey) 620 } 621 if _, execErr := pipe.Exec(ctx); execErr != nil { 622 l.Warn("failed to update preferred handle cache", "err", execErr) 623 } 624 } 625 case jmodels.CommitOperationDelete: 626 tx, beginErr := i.Db.Begin() 627 if beginErr != nil { 628 return fmt.Errorf("failed to start transaction: %w", beginErr) 629 } 630 631 priorHandle, phErr := db.GetPreferredHandle(tx, did) 632 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 633 l.Warn("failed to read prior preferred handle", "err", phErr) 634 } 635 636 err = db.DeleteProfile(tx, did) 637 if err == nil && i.Cache != nil { 638 pipe := i.Cache.Pipeline() 639 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 640 if priorHandle != "" { 641 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 642 } 643 if _, execErr := pipe.Exec(ctx); execErr != nil { 644 l.Warn("failed to evict preferred handle cache", "err", execErr) 645 } 646 } 647 } 648 649 if err != nil { 650 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 651 } 652 653 l.Info("ingested record") 654 return nil 655} 656 657func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 658 did := e.Did 659 var err error 660 661 l = l.With("handler", "ingestSpindleMember") 662 663 switch e.Commit.Operation { 664 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 665 raw := json.RawMessage(e.Commit.Record) 666 record := tangled.SpindleMember{} 667 err = json.Unmarshal(raw, &record) 668 if err != nil { 669 l.Error("invalid record", "err", err) 670 return err 671 } 672 673 // only spindle owner can invite to spindles 674 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 675 if err != nil { 676 return fmt.Errorf("failed to check invite permission: %w", err) 677 } 678 if !ok { 679 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 680 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 681 } 682 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 683 if err != nil { 684 return fmt.Errorf("failed to re-check invite permission: %w", err) 685 } 686 if !ok { 687 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 688 } 689 } 690 691 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 692 if err != nil { 693 return err 694 } 695 696 if memberId.Handle.IsInvalidHandle() { 697 return fmt.Errorf("invalid handle for member %s", record.Subject) 698 } 699 700 existing, err := db.GetSpindleMembers(i.Db, 701 orm.FilterEq("did", did), 702 orm.FilterEq("rkey", e.Commit.RKey), 703 ) 704 if err != nil { 705 return fmt.Errorf("failed to look up existing member: %w", err) 706 } 707 if len(existing) > 1 { 708 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 709 } 710 711 tx, err := i.Db.Begin() 712 if err != nil { 713 return fmt.Errorf("failed to start txn: %w", err) 714 } 715 committed := false 716 defer func() { 717 if committed { 718 return 719 } 720 tx.Rollback() 721 i.Enforcer.E.LoadPolicy() 722 }() 723 724 if len(existing) == 1 { 725 prev := existing[0] 726 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 727 if err = db.RemoveSpindleMember(tx, 728 orm.FilterEq("did", did), 729 orm.FilterEq("rkey", e.Commit.RKey), 730 ); err != nil { 731 return fmt.Errorf("failed to remove stale row: %w", err) 732 } 733 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 734 return fmt.Errorf("failed to remove stale ACL: %w", err) 735 } 736 } 737 } 738 739 if err = db.AddSpindleMember(tx, models.SpindleMember{ 740 Did: syntax.DID(did), 741 Rkey: e.Commit.RKey, 742 Instance: record.Instance, 743 Subject: memberId.DID, 744 }); err != nil { 745 return fmt.Errorf("failed to add to db: %w", err) 746 } 747 748 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 749 return fmt.Errorf("failed to update ACLs: %w", err) 750 } 751 752 if err = tx.Commit(); err != nil { 753 return fmt.Errorf("failed to commit txn: %w", err) 754 } 755 756 if err = i.Enforcer.E.SavePolicy(); err != nil { 757 return fmt.Errorf("failed to save ACLs: %w", err) 758 } 759 committed = true 760 761 l.Info("upserted spindle member") 762 case jmodels.CommitOperationDelete: 763 rkey := e.Commit.RKey 764 765 // get record from db first 766 members, err := db.GetSpindleMembers( 767 i.Db, 768 orm.FilterEq("did", did), 769 orm.FilterEq("rkey", rkey), 770 ) 771 if err != nil || len(members) != 1 { 772 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 773 } 774 member := members[0] 775 776 tx, err := i.Db.Begin() 777 if err != nil { 778 return fmt.Errorf("failed to start txn: %w", err) 779 } 780 committed := false 781 defer func() { 782 if committed { 783 return 784 } 785 tx.Rollback() 786 i.Enforcer.E.LoadPolicy() 787 }() 788 789 // remove record by rkey && update enforcer 790 if err = db.RemoveSpindleMember( 791 tx, 792 orm.FilterEq("did", did), 793 orm.FilterEq("rkey", rkey), 794 ); err != nil { 795 return fmt.Errorf("failed to remove from db: %w", err) 796 } 797 798 // update enforcer 799 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 800 if err != nil { 801 return fmt.Errorf("failed to update ACLs: %w", err) 802 } 803 804 if err = tx.Commit(); err != nil { 805 return fmt.Errorf("failed to commit txn: %w", err) 806 } 807 808 if err = i.Enforcer.E.SavePolicy(); err != nil { 809 return fmt.Errorf("failed to save ACLs: %w", err) 810 } 811 committed = true 812 813 l.Info("removed spindle member") 814 } 815 816 return nil 817} 818 819func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 820 did := e.Did 821 var err error 822 823 l = l.With("handler", "ingestSpindle") 824 825 switch e.Commit.Operation { 826 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 827 raw := json.RawMessage(e.Commit.Record) 828 record := tangled.Spindle{} 829 err = json.Unmarshal(raw, &record) 830 if err != nil { 831 l.Error("invalid record", "err", err) 832 return err 833 } 834 835 instance := e.Commit.RKey 836 837 err := db.AddSpindle(i.Db, models.Spindle{ 838 Owner: syntax.DID(did), 839 Instance: instance, 840 }) 841 if err != nil { 842 l.Error("failed to add spindle to db", "err", err, "instance", instance) 843 return err 844 } 845 846 if err := i.verifySpindle(ctx, instance, did); err != nil { 847 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 848 } 849 850 l.Info("ingested record", "instance", instance) 851 return nil 852 853 case jmodels.CommitOperationDelete: 854 instance := e.Commit.RKey 855 856 // get record from db first 857 spindles, err := db.GetSpindles( 858 ctx, 859 i.Db, 860 orm.FilterEq("owner", did), 861 orm.FilterEq("instance", instance), 862 ) 863 if err != nil || len(spindles) != 1 { 864 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 865 } 866 spindle := spindles[0] 867 868 tx, err := i.Db.Begin() 869 if err != nil { 870 return fmt.Errorf("failed to start txn: %w", err) 871 } 872 defer func() { 873 tx.Rollback() 874 i.Enforcer.E.LoadPolicy() 875 }() 876 877 // remove spindle members first 878 err = db.RemoveSpindleMember( 879 tx, 880 orm.FilterEq("owner", did), 881 orm.FilterEq("instance", instance), 882 ) 883 if err != nil { 884 return fmt.Errorf("failed to remove spindle members: %w", err) 885 } 886 887 err = db.DeleteSpindle( 888 tx, 889 orm.FilterEq("owner", did), 890 orm.FilterEq("instance", instance), 891 ) 892 if err != nil { 893 return fmt.Errorf("failed to delete spindle: %w", err) 894 } 895 896 if spindle.Verified != nil { 897 err = i.Enforcer.RemoveSpindle(instance) 898 if err != nil { 899 return fmt.Errorf("failed to remove spindle from enforcer: %w", err) 900 } 901 } 902 903 err = tx.Commit() 904 if err != nil { 905 return fmt.Errorf("failed to commit txn: %w", err) 906 } 907 908 err = i.Enforcer.E.SavePolicy() 909 if err != nil { 910 return fmt.Errorf("failed to save ACLs: %w", err) 911 } 912 913 l.Info("ingested record", "instance", instance) 914 } 915 916 return nil 917} 918 919func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error { 920 did := e.Did 921 rkey := e.Commit.RKey 922 923 var err error 924 925 l = l.With("handler", "ingestString") 926 927 switch e.Commit.Operation { 928 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 929 raw := json.RawMessage(e.Commit.Record) 930 record := tangled.String{} 931 err = json.Unmarshal(raw, &record) 932 if err != nil { 933 l.Error("invalid record", "err", err) 934 return err 935 } 936 937 string := models.StringFromRecord(did, rkey, record) 938 939 if err = string.Validate(); err != nil { 940 l.Error("invalid record", "err", err) 941 return err 942 } 943 944 if err = db.AddString(i.Db, string); err != nil { 945 l.Error("failed to add string", "err", err) 946 return err 947 } 948 949 l.Info("ingested record") 950 return nil 951 952 case jmodels.CommitOperationDelete: 953 if err := db.DeleteString( 954 i.Db, 955 orm.FilterEq("did", did), 956 orm.FilterEq("rkey", rkey), 957 ); err != nil { 958 l.Error("failed to delete", "err", err) 959 return fmt.Errorf("failed to delete string record: %w", err) 960 } 961 962 l.Info("ingested record") 963 return nil 964 } 965 966 return nil 967} 968 969func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 970 did := e.Did 971 var err error 972 973 l = l.With("handler", "ingestKnotMember") 974 975 switch e.Commit.Operation { 976 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 977 raw := json.RawMessage(e.Commit.Record) 978 record := tangled.KnotMember{} 979 err = json.Unmarshal(raw, &record) 980 if err != nil { 981 l.Error("invalid record", "err", err) 982 return err 983 } 984 985 // only knot owner can invite to knots 986 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 987 if err != nil { 988 return fmt.Errorf("failed to check invite permission: %w", err) 989 } 990 if !ok { 991 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 992 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 993 } 994 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 995 if err != nil { 996 return fmt.Errorf("failed to re-check invite permission: %w", err) 997 } 998 if !ok { 999 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 1000 } 1001 } 1002 1003 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 1004 if err != nil { 1005 return err 1006 } 1007 1008 if memberId.Handle.IsInvalidHandle() { 1009 return fmt.Errorf("invalid handle for member %s", record.Subject) 1010 } 1011 1012 existing, err := db.GetKnotMembers(i.Db, 1013 orm.FilterEq("did", did), 1014 orm.FilterEq("rkey", e.Commit.RKey), 1015 ) 1016 if err != nil { 1017 return fmt.Errorf("failed to look up existing member: %w", err) 1018 } 1019 if len(existing) > 1 { 1020 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1021 } 1022 1023 tx, err := i.Db.Begin() 1024 if err != nil { 1025 return fmt.Errorf("failed to start txn: %w", err) 1026 } 1027 committed := false 1028 defer func() { 1029 if committed { 1030 return 1031 } 1032 tx.Rollback() 1033 i.Enforcer.E.LoadPolicy() 1034 }() 1035 1036 if len(existing) == 1 { 1037 prev := existing[0] 1038 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1039 if err = db.RemoveKnotMember(tx, 1040 orm.FilterEq("did", did), 1041 orm.FilterEq("rkey", e.Commit.RKey), 1042 ); err != nil { 1043 return fmt.Errorf("failed to remove stale row: %w", err) 1044 } 1045 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1046 return fmt.Errorf("failed to remove stale ACL: %w", err) 1047 } 1048 } 1049 } 1050 1051 if err = db.AddKnotMember(tx, models.KnotMember{ 1052 Did: syntax.DID(did), 1053 Rkey: e.Commit.RKey, 1054 Domain: record.Domain, 1055 Subject: memberId.DID, 1056 }); err != nil { 1057 return fmt.Errorf("failed to add to db: %w", err) 1058 } 1059 1060 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1061 return fmt.Errorf("failed to update ACLs: %w", err) 1062 } 1063 1064 if err = tx.Commit(); err != nil { 1065 return fmt.Errorf("failed to commit txn: %w", err) 1066 } 1067 1068 if err = i.Enforcer.E.SavePolicy(); err != nil { 1069 return fmt.Errorf("failed to save ACLs: %w", err) 1070 } 1071 committed = true 1072 1073 l.Info("upserted knot member") 1074 case jmodels.CommitOperationDelete: 1075 rkey := e.Commit.RKey 1076 1077 members, err := db.GetKnotMembers( 1078 i.Db, 1079 orm.FilterEq("did", did), 1080 orm.FilterEq("rkey", rkey), 1081 ) 1082 if err != nil { 1083 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1084 } 1085 if len(members) == 0 { 1086 l.Info("knot member already removed", "rkey", rkey) 1087 return nil 1088 } 1089 if len(members) > 1 { 1090 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1091 } 1092 member := members[0] 1093 1094 tx, err := i.Db.Begin() 1095 if err != nil { 1096 return fmt.Errorf("failed to start txn: %w", err) 1097 } 1098 committed := false 1099 defer func() { 1100 if committed { 1101 return 1102 } 1103 tx.Rollback() 1104 i.Enforcer.E.LoadPolicy() 1105 }() 1106 1107 if err = db.RemoveKnotMember( 1108 tx, 1109 orm.FilterEq("did", did), 1110 orm.FilterEq("rkey", rkey), 1111 ); err != nil { 1112 return fmt.Errorf("failed to remove from db: %w", err) 1113 } 1114 1115 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1116 return fmt.Errorf("failed to update ACLs: %w", err) 1117 } 1118 1119 if err = tx.Commit(); err != nil { 1120 return fmt.Errorf("failed to commit txn: %w", err) 1121 } 1122 1123 if err = i.Enforcer.E.SavePolicy(); err != nil { 1124 return fmt.Errorf("failed to save ACLs: %w", err) 1125 } 1126 committed = true 1127 1128 l.Info("removed knot member") 1129 } 1130 1131 return nil 1132} 1133 1134func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1135 did := e.Did 1136 var err error 1137 1138 l = l.With("handler", "ingestKnot") 1139 1140 switch e.Commit.Operation { 1141 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1142 raw := json.RawMessage(e.Commit.Record) 1143 record := tangled.Knot{} 1144 err = json.Unmarshal(raw, &record) 1145 if err != nil { 1146 l.Error("invalid record", "err", err) 1147 return err 1148 } 1149 1150 domain := e.Commit.RKey 1151 1152 err := db.AddKnot(i.Db, domain, did) 1153 if err != nil { 1154 l.Error("failed to add knot to db", "err", err, "domain", domain) 1155 return err 1156 } 1157 1158 if err := i.verifyKnot(ctx, domain, did); err != nil { 1159 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1160 } 1161 1162 l.Info("ingested record", "domain", domain) 1163 return nil 1164 1165 case jmodels.CommitOperationDelete: 1166 domain := e.Commit.RKey 1167 1168 // get record from db first 1169 registrations, err := db.GetRegistrations( 1170 i.Db, 1171 orm.FilterEq("domain", domain), 1172 orm.FilterEq("did", did), 1173 ) 1174 if err != nil { 1175 return fmt.Errorf("failed to get registration: %w", err) 1176 } 1177 if len(registrations) != 1 { 1178 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1179 } 1180 registration := registrations[0] 1181 1182 tx, err := i.Db.Begin() 1183 if err != nil { 1184 return fmt.Errorf("failed to start txn: %w", err) 1185 } 1186 defer func() { 1187 tx.Rollback() 1188 i.Enforcer.E.LoadPolicy() 1189 }() 1190 1191 err = db.RemoveKnotMember( 1192 tx, 1193 orm.FilterEq("did", did), 1194 orm.FilterEq("domain", domain), 1195 ) 1196 if err != nil { 1197 return fmt.Errorf("failed to remove knot members: %w", err) 1198 } 1199 1200 err = db.DeleteKnot( 1201 tx, 1202 orm.FilterEq("did", did), 1203 orm.FilterEq("domain", domain), 1204 ) 1205 if err != nil { 1206 return fmt.Errorf("failed to delete knot: %w", err) 1207 } 1208 1209 err = db.RemoveReposByKnot(tx, domain) 1210 if err != nil { 1211 return fmt.Errorf("failed to remove repos by knot: %w", err) 1212 } 1213 1214 if registration.Registered != nil { 1215 err = i.Enforcer.RemoveKnot(domain) 1216 if err != nil { 1217 return fmt.Errorf("failed to remove knot from enforcer: %w", err) 1218 } 1219 } 1220 1221 err = tx.Commit() 1222 if err != nil { 1223 return fmt.Errorf("failed to commit txn: %w", err) 1224 } 1225 1226 err = i.Enforcer.E.SavePolicy() 1227 if err != nil { 1228 return fmt.Errorf("failed to save ACLs: %w", err) 1229 } 1230 1231 l.Info("ingested record", "domain", domain) 1232 } 1233 1234 return nil 1235} 1236 1237const ( 1238 verifyAttempts = 4 1239 verifyMinDelay = 1 * time.Second 1240 verifyMaxDelay = 5 * time.Second 1241) 1242 1243func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1244 regs, err := db.GetRegistrations(i.Db, 1245 orm.FilterEq("domain", domain), 1246 orm.FilterEq("did", did), 1247 ) 1248 if err != nil { 1249 return fmt.Errorf("look up registration: %w", err) 1250 } 1251 if len(regs) != 1 { 1252 return fmt.Errorf("no registration for %s by %s", domain, did) 1253 } 1254 if regs[0].Registered != nil { 1255 return nil 1256 } 1257 1258 err = retry.Do( 1259 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1260 retry.Context(ctx), 1261 retry.Attempts(verifyAttempts), 1262 retry.Delay(verifyMinDelay), 1263 retry.MaxDelay(verifyMaxDelay), 1264 retry.DelayType(retry.BackOffDelay), 1265 retry.LastErrorOnly(true), 1266 ) 1267 if err != nil { 1268 return fmt.Errorf("verify: %w", err) 1269 } 1270 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1271} 1272 1273func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1274 spindles, err := db.GetSpindles(ctx, i.Db, 1275 orm.FilterEq("instance", instance), 1276 orm.FilterEq("owner", did), 1277 ) 1278 if err != nil { 1279 return fmt.Errorf("look up spindle: %w", err) 1280 } 1281 if len(spindles) != 1 { 1282 return fmt.Errorf("no spindle for %s by %s", instance, did) 1283 } 1284 if spindles[0].Verified != nil { 1285 return nil 1286 } 1287 1288 err = retry.Do( 1289 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1290 retry.Context(ctx), 1291 retry.Attempts(verifyAttempts), 1292 retry.Delay(verifyMinDelay), 1293 retry.MaxDelay(verifyMaxDelay), 1294 retry.DelayType(retry.BackOffDelay), 1295 retry.LastErrorOnly(true), 1296 ) 1297 if err != nil { 1298 return fmt.Errorf("verify: %w", err) 1299 } 1300 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1301 return err 1302} 1303 1304const sweepConcurrency = 4 1305 1306func (i *Ingester) SweepPendingVerifications() { 1307 l := i.Logger.With("handler", "SweepPendingVerifications") 1308 1309 var g errgroup.Group 1310 g.SetLimit(sweepConcurrency) 1311 1312 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1313 if err != nil { 1314 l.Error("failed to list unverified knots", "err", err) 1315 } else { 1316 for _, reg := range regs { 1317 g.Go(func() error { 1318 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1319 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1320 } 1321 return nil 1322 }) 1323 } 1324 } 1325 1326 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1327 if err != nil { 1328 l.Error("failed to list unverified spindles", "err", err) 1329 g.Wait() 1330 return 1331 } 1332 for _, s := range spindles { 1333 g.Go(func() error { 1334 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1335 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1336 } 1337 return nil 1338 }) 1339 } 1340 g.Wait() 1341} 1342 1343func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1344 did := e.Did 1345 rkey := e.Commit.RKey 1346 1347 var err error 1348 1349 l = l.With("handler", "ingestIssue") 1350 1351 switch e.Commit.Operation { 1352 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1353 raw := json.RawMessage(e.Commit.Record) 1354 record := tangled.RepoIssue{} 1355 err = json.Unmarshal(raw, &record) 1356 if err != nil { 1357 l.Error("invalid record", "err", err) 1358 return err 1359 } 1360 1361 issue := models.IssueFromRecord(did, rkey, record) 1362 1363 if issue.RepoDid == "" { 1364 return fmt.Errorf("issue record has no repo field") 1365 } 1366 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1367 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1368 } 1369 1370 if err := issue.Validate(); err != nil { 1371 return fmt.Errorf("failed to validate issue: %w", err) 1372 } 1373 1374 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1375 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1376 if repoErr == nil && repo.RepoDid != "" { 1377 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1378 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1379 } 1380 } 1381 } 1382 1383 tx, err := i.Db.BeginTx(ctx, nil) 1384 if err != nil { 1385 l.Error("failed to begin transaction", "err", err) 1386 return err 1387 } 1388 defer tx.Rollback() 1389 1390 err = db.PutIssue(tx, &issue) 1391 if err != nil { 1392 l.Error("failed to create issue", "err", err) 1393 return err 1394 } 1395 1396 err = tx.Commit() 1397 if err != nil { 1398 l.Error("failed to commit txn", "err", err) 1399 return err 1400 } 1401 1402 l.Info("ingested record") 1403 return nil 1404 1405 case jmodels.CommitOperationDelete: 1406 tx, err := i.Db.BeginTx(ctx, nil) 1407 if err != nil { 1408 l.Error("failed to begin transaction", "err", err) 1409 return err 1410 } 1411 defer tx.Rollback() 1412 1413 if err := db.DeleteIssues( 1414 tx, 1415 did, 1416 rkey, 1417 ); err != nil { 1418 l.Error("failed to delete", "err", err) 1419 return fmt.Errorf("failed to delete issue record: %w", err) 1420 } 1421 if err := tx.Commit(); err != nil { 1422 l.Error("failed to commit txn", "err", err) 1423 return err 1424 } 1425 1426 l.Info("ingested record") 1427 return nil 1428 } 1429 1430 return nil 1431} 1432 1433func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1434 did := e.Did 1435 rkey := e.Commit.RKey 1436 1437 var err error 1438 1439 l = l.With("handler", "ingestPull") 1440 1441 switch e.Commit.Operation { 1442 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1443 raw := json.RawMessage(e.Commit.Record) 1444 record := tangled.RepoPull{} 1445 err = json.Unmarshal(raw, &record) 1446 if err != nil { 1447 l.Error("invalid record", "err", err) 1448 return err 1449 } 1450 1451 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1452 if err != nil { 1453 l.Error("failed to resolve did", "err", err) 1454 return err 1455 } 1456 1457 // go through and fetch all blobs in parallel 1458 readers := make([]*io.ReadCloser, len(record.Rounds)) 1459 var mu sync.Mutex 1460 1461 g, gctx := errgroup.WithContext(ctx) 1462 1463 for idx, b := range record.Rounds { 1464 g.Go(func() error { 1465 // for some reason, a blob is empty 1466 if b.PatchBlob == nil { 1467 return fmt.Errorf("missing patchBlob in round %d", idx) 1468 } 1469 1470 ownerPds := ownerId.PDSEndpoint() 1471 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1472 q := url.Query() 1473 q.Set("cid", b.PatchBlob.Ref.String()) 1474 q.Set("did", did) 1475 url.RawQuery = q.Encode() 1476 1477 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1478 if err != nil { 1479 l.Error("failed to create request") 1480 return err 1481 } 1482 req.Header.Set("Content-Type", "application/json") 1483 1484 resp, err := http.DefaultClient.Do(req) 1485 if err != nil { 1486 l.Error("failed to make request") 1487 return err 1488 } 1489 1490 mu.Lock() 1491 readers[idx] = &resp.Body 1492 mu.Unlock() 1493 1494 return nil 1495 }) 1496 } 1497 1498 if err := g.Wait(); err != nil { 1499 for _, r := range readers { 1500 if r != nil && *r != nil { 1501 (*r).Close() 1502 } 1503 } 1504 return err 1505 } 1506 1507 defer func() { 1508 for _, r := range readers { 1509 if r != nil && *r != nil { 1510 (*r).Close() 1511 } 1512 } 1513 }() 1514 1515 pull, err := models.PullFromRecord(did, rkey, record, readers) 1516 if err != nil { 1517 return fmt.Errorf("failed to parse pull from record: %w", err) 1518 } 1519 if err := pull.Validate(); err != nil { 1520 return fmt.Errorf("failed to validate pull: %w", err) 1521 } 1522 if pull.DependentOn != nil { 1523 if err := func() error { 1524 dependentPull, err := db.GetPull( 1525 i.Db, 1526 orm.FilterEq("dependent_on", pull.DependentOn.String()), 1527 ) 1528 if errors.Is(err, sql.ErrNoRows) { 1529 return nil 1530 } 1531 if err != nil { 1532 return fmt.Errorf("failed to fetch pulls with same dependency: %w", err) 1533 } 1534 if dependentPull.AtUri() == pull.AtUri() { 1535 return nil 1536 } 1537 return fmt.Errorf("another pull already depends on %s, which would form a DAG, this is presently disallowed", pull.DependentOn.String()) 1538 }(); err != nil { 1539 return fmt.Errorf("failed to validate pull stack: %w", err) 1540 } 1541 } 1542 1543 tx, err := i.Db.BeginTx(ctx, nil) 1544 if err != nil { 1545 l.Error("failed to begin transaction", "err", err) 1546 return err 1547 } 1548 defer tx.Rollback() 1549 1550 err = db.PutPull(tx, pull) 1551 if err != nil { 1552 l.Error("failed to create pull", "err", err) 1553 return err 1554 } 1555 1556 err = tx.Commit() 1557 if err != nil { 1558 l.Error("failed to commit txn", "err", err) 1559 return err 1560 } 1561 1562 l.Info("ingested record") 1563 return nil 1564 1565 case jmodels.CommitOperationDelete: 1566 tx, err := i.Db.BeginTx(ctx, nil) 1567 if err != nil { 1568 l.Error("failed to begin transaction", "err", err) 1569 return err 1570 } 1571 defer tx.Rollback() 1572 1573 if err := db.AbandonPulls( 1574 tx, 1575 orm.FilterEq("owner_did", did), 1576 orm.FilterEq("rkey", rkey), 1577 ); err != nil { 1578 l.Error("failed to abandon", "err", err) 1579 return fmt.Errorf("failed to abandon pull record: %w", err) 1580 } 1581 if err := tx.Commit(); err != nil { 1582 l.Error("failed to commit txn", "err", err) 1583 return err 1584 } 1585 1586 l.Info("ingested record") 1587 return nil 1588 } 1589 1590 return nil 1591} 1592 1593// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1594func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error { 1595 l = l.With("handler", "ingestIssueComment") 1596 1597 switch e.Commit.Operation { 1598 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1599 // no-op. sh.tangled.repo.issue.comment is deprecated 1600 1601 case jmodels.CommitOperationDelete: 1602 if err := db.PurgeComments( 1603 i.Db, 1604 orm.FilterEq("did", e.Did), 1605 orm.FilterEq("collection", e.Commit.Collection), 1606 orm.FilterEq("rkey", e.Commit.RKey), 1607 ); err != nil { 1608 return fmt.Errorf("failed to delete comment record: %w", err) 1609 } 1610 } 1611 1612 l.Info("ingested record") 1613 return nil 1614} 1615 1616// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1617func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error { 1618 l = l.With("handler", "ingestPullComment") 1619 1620 switch e.Commit.Operation { 1621 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1622 // no-op. sh.tangled.repo.pull.comment is deprecated 1623 1624 case jmodels.CommitOperationDelete: 1625 if err := db.PurgeComments( 1626 i.Db, 1627 orm.FilterEq("did", e.Did), 1628 orm.FilterEq("collection", e.Commit.Collection), 1629 orm.FilterEq("rkey", e.Commit.RKey), 1630 ); err != nil { 1631 return fmt.Errorf("failed to delete comment record: %w", err) 1632 } 1633 } 1634 1635 l.Info("ingested record") 1636 return nil 1637} 1638 1639func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error { 1640 did := e.Did 1641 rkey := e.Commit.RKey 1642 cid := e.Commit.CID 1643 1644 var err error 1645 1646 l = l.With("handler", "ingestComment") 1647 1648 ctx := context.Background() 1649 1650 switch e.Commit.Operation { 1651 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1652 raw := json.RawMessage(e.Commit.Record) 1653 record := tangled.FeedComment{} 1654 err = json.Unmarshal(raw, &record) 1655 if err != nil { 1656 return fmt.Errorf("invalid record: %w", err) 1657 } 1658 1659 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1660 if err != nil { 1661 return fmt.Errorf("failed to parse comment from record: %w", err) 1662 } 1663 1664 if err := comment.Validate(); err != nil { 1665 return fmt.Errorf("failed to validate comment: %w", err) 1666 } 1667 1668 var references []syntax.ATURI 1669 if comment.Body.Original != nil { 1670 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1671 } 1672 1673 tx, err := i.Db.Begin() 1674 if err != nil { 1675 return fmt.Errorf("failed to start transaction: %w", err) 1676 } 1677 defer tx.Rollback() 1678 1679 _, err = db.PutComment(tx, comment, references) 1680 if err != nil { 1681 return fmt.Errorf("failed to create comment: %w", err) 1682 } 1683 1684 if err := tx.Commit(); err != nil { 1685 return err 1686 } 1687 1688 case jmodels.CommitOperationDelete: 1689 if err := db.DeleteComments( 1690 i.Db, 1691 orm.FilterEq("did", did), 1692 orm.FilterEq("collection", e.Commit.Collection), 1693 orm.FilterEq("rkey", rkey), 1694 ); err != nil { 1695 return fmt.Errorf("failed to delete comment record: %w", err) 1696 } 1697 } 1698 1699 l.Info("ingested record") 1700 return nil 1701} 1702 1703func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error { 1704 did := e.Did 1705 rkey := e.Commit.RKey 1706 1707 l = l.With("handler", "ingestReaction") 1708 1709 switch e.Commit.Operation { 1710 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1711 raw := json.RawMessage(e.Commit.Record) 1712 record := tangled.FeedReaction{} 1713 if err := json.Unmarshal(raw, &record); err != nil { 1714 return fmt.Errorf("invalid record: %w", err) 1715 } 1716 1717 subjectUri, err := syntax.ParseATURI(record.Subject) 1718 if err != nil { 1719 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1720 } 1721 subjectUri = models.NormalizeReactionSubject(subjectUri) 1722 1723 kind, ok := models.ParseReactionKind(record.Reaction) 1724 if !ok { 1725 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1726 } 1727 1728 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1729 if parseErr != nil { 1730 created = time.Now() 1731 } 1732 1733 reaction := models.Reaction{ 1734 ReactedByDid: did, 1735 Rkey: rkey, 1736 ThreadAt: subjectUri, 1737 Kind: kind, 1738 Created: created, 1739 } 1740 if err := db.UpsertReaction(i.Db, reaction); err != nil { 1741 return fmt.Errorf("failed to upsert reaction: %w", err) 1742 } 1743 1744 case jmodels.CommitOperationDelete: 1745 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1746 return fmt.Errorf("failed to delete reaction record: %w", err) 1747 } 1748 } 1749 1750 l.Info("ingested record") 1751 return nil 1752} 1753 1754func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error { 1755 did := e.Did 1756 rkey := e.Commit.RKey 1757 1758 var err error 1759 1760 l = l.With("handler", "ingestLabelDefinition") 1761 1762 switch e.Commit.Operation { 1763 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1764 raw := json.RawMessage(e.Commit.Record) 1765 record := tangled.LabelDefinition{} 1766 err = json.Unmarshal(raw, &record) 1767 if err != nil { 1768 return fmt.Errorf("invalid record: %w", err) 1769 } 1770 1771 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1772 if err != nil { 1773 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1774 } 1775 1776 if err := def.Validate(); err != nil { 1777 return fmt.Errorf("failed to validate labeldef: %w", err) 1778 } 1779 1780 _, err = db.AddLabelDefinition(i.Db, def) 1781 if err != nil { 1782 return fmt.Errorf("failed to create labeldef: %w", err) 1783 } 1784 1785 l.Info("ingested record") 1786 return nil 1787 1788 case jmodels.CommitOperationDelete: 1789 if err := db.DeleteLabelDefinition( 1790 i.Db, 1791 orm.FilterEq("did", did), 1792 orm.FilterEq("rkey", rkey), 1793 ); err != nil { 1794 return fmt.Errorf("failed to delete labeldef record: %w", err) 1795 } 1796 1797 l.Info("ingested record") 1798 return nil 1799 } 1800 1801 return nil 1802} 1803 1804func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1805 did := e.Did 1806 rkey := e.Commit.RKey 1807 1808 var err error 1809 1810 l = l.With("handler", "ingestLabelOp") 1811 1812 switch e.Commit.Operation { 1813 case jmodels.CommitOperationCreate: 1814 raw := json.RawMessage(e.Commit.Record) 1815 record := tangled.LabelOp{} 1816 err = json.Unmarshal(raw, &record) 1817 if err != nil { 1818 return fmt.Errorf("invalid record: %w", err) 1819 } 1820 1821 subject := syntax.ATURI(record.Subject) 1822 collection := subject.Collection() 1823 1824 var repo *models.Repo 1825 switch collection { 1826 case tangled.RepoIssueNSID: 1827 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1828 if err != nil || len(i) != 1 { 1829 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1830 } 1831 repo = i[0].Repo 1832 case tangled.RepoPullNSID: 1833 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1834 if err != nil || len(p) != 1 { 1835 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1836 } 1837 repo = p[0].Repo 1838 default: 1839 return fmt.Errorf("unsupported label subject: %s", collection) 1840 } 1841 1842 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1843 if err != nil { 1844 return fmt.Errorf("failed to build label application ctx: %w", err) 1845 } 1846 1847 ops := models.LabelOpsFromRecord(did, rkey, record) 1848 1849 for _, o := range ops { 1850 def, ok := actx.Defs[o.OperandKey] 1851 if !ok { 1852 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1853 } 1854 // validate permissions: only collaborators can apply labels currently 1855 // 1856 // TODO: introduce a repo:triage permission 1857 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, o.Did, "repo:push") 1858 if permErr != nil { 1859 if !errors.Is(permErr, knotacl.ErrKnotUnreachable) { 1860 return fmt.Errorf("enforcing permission: %w", permErr) 1861 } 1862 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", permErr) 1863 } else if !allowed { 1864 return fmt.Errorf("unauthorized label operation") 1865 } 1866 1867 if err := def.ValidateOperandValue(&o); err != nil { 1868 return fmt.Errorf("failed to validate labelop: %w", err) 1869 } 1870 } 1871 1872 tx, err := i.Db.Begin() 1873 if err != nil { 1874 return err 1875 } 1876 defer tx.Rollback() 1877 1878 for _, o := range ops { 1879 _, err = db.AddLabelOp(tx, &o) 1880 if err != nil { 1881 return fmt.Errorf("failed to add labelop: %w", err) 1882 } 1883 } 1884 1885 if err = tx.Commit(); err != nil { 1886 return err 1887 } 1888 1889 l.Info("ingested record") 1890 } 1891 1892 return nil 1893}