Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/knotacl" 31 "tangled.org/core/appview/mentions" 32 "tangled.org/core/appview/models" 33 "tangled.org/core/appview/notify" 34 "tangled.org/core/appview/repoverify" 35 "tangled.org/core/appview/serververify" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/orm" 38 "tangled.org/core/rbac" 39) 40 41type Ingester struct { 42 Ctx context.Context 43 Db *db.DB 44 Enforcer *rbac.Enforcer 45 Acl *knotacl.Service 46 IdResolver *idresolver.Resolver 47 Cache *cache.Cache 48 Config *config.Config 49 Logger *slog.Logger 50 MentionsResolver *mentions.Resolver 51 Notifier notify.Notifier 52 Verifier repoverify.Verifier 53} 54 55type processFunc func(ctx context.Context, e *jmodels.Event) error 56 57func (i *Ingester) Ingest() processFunc { 58 return func(ctx context.Context, e *jmodels.Event) error { 59 var err error 60 61 l := i.Logger.With("kind", e.Kind) 62 switch e.Kind { 63 case jmodels.EventKindAccount: 64 // TODO: sync account state to db 65 if e.Account.Active { 66 break 67 } 68 // TODO: revoke sessions by DID 69 if *e.Account.Status == "deactivated" { 70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 71 } 72 case jmodels.EventKindIdentity: 73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 74 case jmodels.EventKindCommit: 75 l = l.With( 76 "nsid", e.Commit.Collection, 77 "did", e.Did, 78 "rkey", e.Commit.RKey, 79 "op", e.Commit.Operation, 80 ) 81 switch e.Commit.Collection { 82 case tangled.GraphFollowNSID: 83 err = i.ingestFollow(e, l) 84 case tangled.GraphVouchNSID: 85 err = i.ingestVouch(ctx, e, l) 86 case tangled.FeedStarNSID: 87 err = i.ingestStar(ctx, e, l) 88 case tangled.FeedReactionNSID: 89 err = i.ingestReaction(e, l) 90 case tangled.PublicKeyNSID: 91 err = i.ingestPublicKey(e, l) 92 case tangled.RepoArtifactNSID: 93 err = i.ingestArtifact(ctx, e, l) 94 case tangled.ActorProfileNSID: 95 err = i.ingestProfile(ctx, e, l) 96 case tangled.SpindleMemberNSID: 97 err = i.ingestSpindleMember(ctx, e, l) 98 case tangled.SpindleNSID: 99 err = i.ingestSpindle(ctx, e, l) 100 case tangled.KnotMemberNSID: 101 err = i.ingestKnotMember(ctx, e, l) 102 case tangled.KnotNSID: 103 err = i.ingestKnot(ctx, e, l) 104 case tangled.StringNSID: 105 err = i.ingestString(e, l) 106 case tangled.RepoIssueNSID: 107 err = i.ingestIssue(ctx, e, l) 108 case tangled.RepoPullNSID: 109 err = i.ingestPull(ctx, e, l) 110 case tangled.FeedCommentNSID: 111 err = i.ingestComment(e, l) 112 case tangled.RepoIssueCommentNSID: 113 err = i.ingestIssueComment(e, l) 114 case tangled.RepoPullCommentNSID: 115 err = i.ingestPullComment(e, l) 116 case tangled.LabelDefinitionNSID: 117 err = i.ingestLabelDefinition(e, l) 118 case tangled.LabelOpNSID: 119 err = i.ingestLabelOp(ctx, e, l) 120 case tangled.RepoNSID: 121 err = i.ingestRepo(ctx, e, l) 122 } 123 } 124 125 if err != nil { 126 l.Warn("failed to ingest record, skipping", "err", err) 127 } 128 129 lastTimeUs := e.TimeUS + 1 130 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 131 l.Error("failed to save cursor", "err", saveErr) 132 } 133 134 return nil 135 } 136} 137 138func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 139 if strings.HasPrefix(ref, "did:") { 140 return db.GetRepoByDid(i.Db, ref) 141 } 142 return db.GetRepoByAtUri(i.Db, ref) 143} 144 145func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 146 var legacy struct { 147 Subject *string `json:"subject"` 148 SubjectDid *string `json:"subjectDid"` 149 } 150 if err := json.Unmarshal(raw, &legacy); err != nil { 151 return false, err 152 } 153 154 switch { 155 case legacy.SubjectDid != nil: 156 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 157 if err != nil { 158 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 159 return false, nil 160 } 161 star.SubjectType = models.StarSubjectRepo 162 star.Subject = repo.RepoDid 163 return true, nil 164 165 case legacy.Subject != nil: 166 uri, err := syntax.ParseATURI(*legacy.Subject) 167 if err != nil { 168 return false, fmt.Errorf("invalid old-format star subject: %w", err) 169 } 170 switch uri.Collection().String() { 171 case tangled.RepoNSID: 172 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 173 if err != nil { 174 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 175 return false, nil 176 } 177 star.SubjectType = models.StarSubjectRepo 178 star.Subject = repo.RepoDid 179 return true, nil 180 default: 181 star.SubjectType = models.StarSubjectString 182 star.Subject = *legacy.Subject 183 return true, nil 184 } 185 186 default: 187 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 188 } 189} 190 191func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 192 var err error 193 did := e.Did 194 195 l = l.With("handler", "ingestStar") 196 197 switch e.Commit.Operation { 198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 199 raw := json.RawMessage(e.Commit.Record) 200 record := tangled.FeedStar{} 201 unmarshalErr := json.Unmarshal(raw, &record) 202 203 star := &models.Star{ 204 Did: did, 205 Rkey: e.Commit.RKey, 206 } 207 208 switch { 209 case unmarshalErr != nil: 210 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 211 if resolveErr != nil { 212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 213 return unmarshalErr 214 } 215 if !resolved { 216 return nil 217 } 218 219 case record.Subject == nil: 220 return fmt.Errorf("star record has nil subject") 221 222 case record.Subject.FeedStar_Repo != nil: 223 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 224 if repoErr != nil { 225 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 226 return nil 227 } 228 star.SubjectType = models.StarSubjectRepo 229 star.Subject = repo.RepoDid 230 231 case record.Subject.FeedStar_String != nil: 232 star.SubjectType = models.StarSubjectString 233 star.Subject = record.Subject.FeedStar_String.Uri 234 235 default: 236 return fmt.Errorf("star record has empty subject union") 237 } 238 239 err = db.AddStar(i.Db, star) 240 case jmodels.CommitOperationDelete: 241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 242 } 243 244 if err != nil { 245 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 246 } 247 248 l.Info("ingested record") 249 return nil 250} 251 252func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error { 253 var err error 254 did := e.Did 255 256 l = l.With("handler", "ingestFollow") 257 258 switch e.Commit.Operation { 259 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 260 raw := json.RawMessage(e.Commit.Record) 261 record := tangled.GraphFollow{} 262 err = json.Unmarshal(raw, &record) 263 if err != nil { 264 l.Error("invalid record", "err", err) 265 return err 266 } 267 268 err = db.AddFollow(i.Db, &models.Follow{ 269 UserDid: did, 270 SubjectDid: record.Subject, 271 Rkey: e.Commit.RKey, 272 }) 273 case jmodels.CommitOperationDelete: 274 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 275 } 276 277 if err != nil { 278 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 279 } 280 281 l.Info("ingested record") 282 return nil 283} 284 285func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 286 var err error 287 did := e.Did 288 289 l = l.With("handler", "ingestVouch") 290 291 switch e.Commit.Operation { 292 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 293 raw := json.RawMessage(e.Commit.Record) 294 record := tangled.GraphVouch{} 295 err = json.Unmarshal(raw, &record) 296 if err != nil { 297 l.Error("invalid record", "err", err) 298 return err 299 } 300 301 // rkey is the subject_did being vouched for/denounced 302 subjectDID := e.Commit.RKey 303 304 _, err = syntax.ParseDID(subjectDID) 305 if err != nil { 306 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 307 return fmt.Errorf("invalid subject_did: %w", err) 308 } 309 310 if did == subjectDID { 311 l.Warn("attempted self-vouch", "did", did) 312 return fmt.Errorf("cannot vouch for self") 313 } 314 315 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 316 if err != nil { 317 return err 318 } 319 320 if subjectId.Handle.IsInvalidHandle() { 321 return err 322 } 323 324 kind, err := models.ParseVouchKind(record.Kind) 325 if err != nil { 326 l.Error("invalid kind", "kind", kind) 327 return fmt.Errorf("invalid kind: %s", kind) 328 } 329 330 recordCid, err := cid.Parse(e.Commit.CID) 331 if err != nil { 332 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 333 return fmt.Errorf("invalid cid: %w", err) 334 } 335 336 var evidences []syntax.ATURI 337 for _, raw := range record.Evidences { 338 uri, parseErr := syntax.ParseATURI(raw) 339 if parseErr != nil { 340 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 341 continue 342 } 343 evidences = append(evidences, uri) 344 } 345 346 tx, txErr := i.Db.Begin() 347 if txErr != nil { 348 return fmt.Errorf("failed to start transaction: %w", txErr) 349 } 350 351 addErr := db.AddVouch(tx, &models.Vouch{ 352 Did: syntax.DID(did), 353 SubjectDid: subjectId.DID, 354 Cid: recordCid, 355 Kind: kind, 356 Reason: record.Reason, 357 Evidences: evidences, 358 }) 359 if addErr != nil { 360 tx.Rollback() 361 err = addErr 362 } else { 363 err = tx.Commit() 364 } 365 366 case jmodels.CommitOperationDelete: 367 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 368 } 369 370 if err != nil { 371 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 372 } 373 374 l.Info("ingested record") 375 return nil 376} 377 378func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error { 379 did := e.Did 380 var err error 381 382 l = l.With("handler", "ingestPublicKey") 383 384 switch e.Commit.Operation { 385 case jmodels.CommitOperationCreate: 386 l.Debug("processing add of pubkey") 387 raw := json.RawMessage(e.Commit.Record) 388 record := tangled.PublicKey{} 389 err = json.Unmarshal(raw, &record) 390 if err != nil { 391 l.Error("invalid record", "err", err) 392 return err 393 } 394 395 name := record.Name 396 key := record.Key 397 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 398 case jmodels.CommitOperationUpdate: 399 l.Debug("processing update of pubkey") 400 raw := json.RawMessage(e.Commit.Record) 401 record := tangled.PublicKey{} 402 err = json.Unmarshal(raw, &record) 403 if err != nil { 404 l.Error("invalid record", "err", err) 405 return err 406 } 407 408 name := record.Name 409 key := record.Key 410 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 411 case jmodels.CommitOperationDelete: 412 l.Debug("processing delete of pubkey") 413 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 414 } 415 416 if err != nil { 417 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 418 } 419 420 l.Info("ingested record") 421 return nil 422} 423 424func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 425 did := e.Did 426 var err error 427 428 l = l.With("handler", "ingestArtifact") 429 430 switch e.Commit.Operation { 431 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 432 raw := json.RawMessage(e.Commit.Record) 433 record := tangled.RepoArtifact{} 434 err = json.Unmarshal(raw, &record) 435 if err != nil { 436 l.Error("invalid record", "err", err) 437 return err 438 } 439 440 var repo *models.Repo 441 if record.RepoDid != nil && *record.RepoDid != "" { 442 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 443 if err != nil && !errors.Is(err, sql.ErrNoRows) { 444 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 445 } 446 } 447 if repo == nil && record.Repo != nil { 448 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 449 if parseErr != nil { 450 return parseErr 451 } 452 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 453 if err != nil { 454 return err 455 } 456 } 457 if repo == nil { 458 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 459 } 460 461 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push") 462 if permErr != nil { 463 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr) 464 } else if !allowed { 465 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier()) 466 return nil 467 } 468 469 repoDid := repo.RepoDid 470 if repoDid == "" && record.RepoDid != nil { 471 repoDid = *record.RepoDid 472 } 473 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 474 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 475 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 476 } 477 } 478 479 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 480 if parseErr != nil { 481 createdAt = time.Now() 482 } 483 484 artifact := models.Artifact{ 485 Did: did, 486 Rkey: e.Commit.RKey, 487 RepoDid: syntax.DID(repo.RepoDid), 488 Tag: plumbing.Hash(record.Tag), 489 CreatedAt: createdAt, 490 BlobCid: cid.Cid(record.Artifact.Ref), 491 Name: record.Name, 492 Size: uint64(record.Artifact.Size), 493 MimeType: record.Artifact.MimeType, 494 } 495 496 err = db.AddArtifact(i.Db, artifact) 497 case jmodels.CommitOperationDelete: 498 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 499 } 500 501 if err != nil { 502 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 503 } 504 505 l.Info("ingested record") 506 return nil 507} 508 509func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 510 did := e.Did 511 var err error 512 513 l = l.With("handler", "ingestProfile") 514 515 if e.Commit.RKey != "self" { 516 return fmt.Errorf("ingestProfile only ingests `self` record") 517 } 518 519 switch e.Commit.Operation { 520 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 521 raw := json.RawMessage(e.Commit.Record) 522 record := tangled.ActorProfile{} 523 err = json.Unmarshal(raw, &record) 524 if err != nil { 525 l.Error("invalid record", "err", err) 526 return err 527 } 528 529 avatar := "" 530 if record.Avatar != nil { 531 avatar = record.Avatar.Ref.String() 532 } 533 534 description := "" 535 if record.Description != nil { 536 description = *record.Description 537 } 538 539 includeBluesky := record.Bluesky 540 541 pronouns := "" 542 if record.Pronouns != nil { 543 pronouns = *record.Pronouns 544 } 545 546 location := "" 547 if record.Location != nil { 548 location = *record.Location 549 } 550 551 var links [5]string 552 for i, l := range record.Links { 553 if i < 5 { 554 links[i] = l 555 } 556 } 557 558 var stats [2]models.VanityStat 559 for i, s := range record.Stats { 560 if i < 2 { 561 stats[i].Kind = models.ParseVanityStatKind(s) 562 } 563 } 564 565 var pinned [6]string 566 for i, r := range record.PinnedRepositories { 567 if i < 6 { 568 pinned[i] = r 569 } 570 } 571 572 var preferredHandle syntax.Handle 573 if record.PreferredHandle != nil { 574 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 575 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 576 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 577 preferredHandle = h 578 } 579 } 580 } 581 582 profile := models.Profile{ 583 Did: did, 584 Avatar: avatar, 585 Description: description, 586 IncludeBluesky: includeBluesky, 587 Location: location, 588 Links: links, 589 Stats: stats, 590 PinnedRepos: pinned, 591 Pronouns: pronouns, 592 PreferredHandle: preferredHandle, 593 } 594 595 tx, err := i.Db.Begin() 596 if err != nil { 597 return fmt.Errorf("failed to start transaction: %w", err) 598 } 599 600 err = db.ValidateProfile(tx, &profile) 601 if err != nil { 602 return fmt.Errorf("invalid profile record") 603 } 604 605 err = db.UpsertProfile(tx, &profile) 606 if err == nil && i.Cache != nil { 607 pipe := i.Cache.Pipeline() 608 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 609 if preferredHandle != "" { 610 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 611 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 612 } else { 613 pipe.Del(ctx, didKey) 614 } 615 if _, execErr := pipe.Exec(ctx); execErr != nil { 616 l.Warn("failed to update preferred handle cache", "err", execErr) 617 } 618 } 619 case jmodels.CommitOperationDelete: 620 tx, beginErr := i.Db.Begin() 621 if beginErr != nil { 622 return fmt.Errorf("failed to start transaction: %w", beginErr) 623 } 624 625 priorHandle, phErr := db.GetPreferredHandle(tx, did) 626 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 627 l.Warn("failed to read prior preferred handle", "err", phErr) 628 } 629 630 err = db.DeleteProfile(tx, did) 631 if err == nil && i.Cache != nil { 632 pipe := i.Cache.Pipeline() 633 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 634 if priorHandle != "" { 635 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 636 } 637 if _, execErr := pipe.Exec(ctx); execErr != nil { 638 l.Warn("failed to evict preferred handle cache", "err", execErr) 639 } 640 } 641 } 642 643 if err != nil { 644 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 645 } 646 647 l.Info("ingested record") 648 return nil 649} 650 651func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 652 did := e.Did 653 var err error 654 655 l = l.With("handler", "ingestSpindleMember") 656 657 switch e.Commit.Operation { 658 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 659 raw := json.RawMessage(e.Commit.Record) 660 record := tangled.SpindleMember{} 661 err = json.Unmarshal(raw, &record) 662 if err != nil { 663 l.Error("invalid record", "err", err) 664 return err 665 } 666 667 // only spindle owner can invite to spindles 668 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 669 if err != nil { 670 return fmt.Errorf("failed to check invite permission: %w", err) 671 } 672 if !ok { 673 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 674 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 675 } 676 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 677 if err != nil { 678 return fmt.Errorf("failed to re-check invite permission: %w", err) 679 } 680 if !ok { 681 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 682 } 683 } 684 685 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 686 if err != nil { 687 return err 688 } 689 690 if memberId.Handle.IsInvalidHandle() { 691 return fmt.Errorf("invalid handle for member %s", record.Subject) 692 } 693 694 existing, err := db.GetSpindleMembers(i.Db, 695 orm.FilterEq("did", did), 696 orm.FilterEq("rkey", e.Commit.RKey), 697 ) 698 if err != nil { 699 return fmt.Errorf("failed to look up existing member: %w", err) 700 } 701 if len(existing) > 1 { 702 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 703 } 704 705 tx, err := i.Db.Begin() 706 if err != nil { 707 return fmt.Errorf("failed to start txn: %w", err) 708 } 709 committed := false 710 defer func() { 711 if committed { 712 return 713 } 714 tx.Rollback() 715 i.Enforcer.E.LoadPolicy() 716 }() 717 718 if len(existing) == 1 { 719 prev := existing[0] 720 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 721 if err = db.RemoveSpindleMember(tx, 722 orm.FilterEq("did", did), 723 orm.FilterEq("rkey", e.Commit.RKey), 724 ); err != nil { 725 return fmt.Errorf("failed to remove stale row: %w", err) 726 } 727 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 728 return fmt.Errorf("failed to remove stale ACL: %w", err) 729 } 730 } 731 } 732 733 if err = db.AddSpindleMember(tx, models.SpindleMember{ 734 Did: syntax.DID(did), 735 Rkey: e.Commit.RKey, 736 Instance: record.Instance, 737 Subject: memberId.DID, 738 }); err != nil { 739 return fmt.Errorf("failed to add to db: %w", err) 740 } 741 742 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 743 return fmt.Errorf("failed to update ACLs: %w", err) 744 } 745 746 if err = tx.Commit(); err != nil { 747 return fmt.Errorf("failed to commit txn: %w", err) 748 } 749 750 if err = i.Enforcer.E.SavePolicy(); err != nil { 751 return fmt.Errorf("failed to save ACLs: %w", err) 752 } 753 committed = true 754 755 l.Info("upserted spindle member") 756 case jmodels.CommitOperationDelete: 757 rkey := e.Commit.RKey 758 759 // get record from db first 760 members, err := db.GetSpindleMembers( 761 i.Db, 762 orm.FilterEq("did", did), 763 orm.FilterEq("rkey", rkey), 764 ) 765 if err != nil || len(members) != 1 { 766 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 767 } 768 member := members[0] 769 770 tx, err := i.Db.Begin() 771 if err != nil { 772 return fmt.Errorf("failed to start txn: %w", err) 773 } 774 committed := false 775 defer func() { 776 if committed { 777 return 778 } 779 tx.Rollback() 780 i.Enforcer.E.LoadPolicy() 781 }() 782 783 // remove record by rkey && update enforcer 784 if err = db.RemoveSpindleMember( 785 tx, 786 orm.FilterEq("did", did), 787 orm.FilterEq("rkey", rkey), 788 ); err != nil { 789 return fmt.Errorf("failed to remove from db: %w", err) 790 } 791 792 // update enforcer 793 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 794 if err != nil { 795 return fmt.Errorf("failed to update ACLs: %w", err) 796 } 797 798 if err = tx.Commit(); err != nil { 799 return fmt.Errorf("failed to commit txn: %w", err) 800 } 801 802 if err = i.Enforcer.E.SavePolicy(); err != nil { 803 return fmt.Errorf("failed to save ACLs: %w", err) 804 } 805 committed = true 806 807 l.Info("removed spindle member") 808 } 809 810 return nil 811} 812 813func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 814 did := e.Did 815 var err error 816 817 l = l.With("handler", "ingestSpindle") 818 819 switch e.Commit.Operation { 820 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 821 raw := json.RawMessage(e.Commit.Record) 822 record := tangled.Spindle{} 823 err = json.Unmarshal(raw, &record) 824 if err != nil { 825 l.Error("invalid record", "err", err) 826 return err 827 } 828 829 instance := e.Commit.RKey 830 831 err := db.AddSpindle(i.Db, models.Spindle{ 832 Owner: syntax.DID(did), 833 Instance: instance, 834 }) 835 if err != nil { 836 l.Error("failed to add spindle to db", "err", err, "instance", instance) 837 return err 838 } 839 840 if err := i.verifySpindle(ctx, instance, did); err != nil { 841 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 842 } 843 844 l.Info("ingested record", "instance", instance) 845 return nil 846 847 case jmodels.CommitOperationDelete: 848 instance := e.Commit.RKey 849 850 // get record from db first 851 spindles, err := db.GetSpindles( 852 ctx, 853 i.Db, 854 orm.FilterEq("owner", did), 855 orm.FilterEq("instance", instance), 856 ) 857 if err != nil || len(spindles) != 1 { 858 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 859 } 860 spindle := spindles[0] 861 862 tx, err := i.Db.Begin() 863 if err != nil { 864 return fmt.Errorf("failed to start txn: %w", err) 865 } 866 defer func() { 867 tx.Rollback() 868 i.Enforcer.E.LoadPolicy() 869 }() 870 871 // remove spindle members first 872 err = db.RemoveSpindleMember( 873 tx, 874 orm.FilterEq("owner", did), 875 orm.FilterEq("instance", instance), 876 ) 877 if err != nil { 878 return fmt.Errorf("failed to remove spindle members: %w", err) 879 } 880 881 err = db.DeleteSpindle( 882 tx, 883 orm.FilterEq("owner", did), 884 orm.FilterEq("instance", instance), 885 ) 886 if err != nil { 887 return fmt.Errorf("failed to delete spindle: %w", err) 888 } 889 890 if spindle.Verified != nil { 891 err = i.Enforcer.RemoveSpindle(instance) 892 if err != nil { 893 return fmt.Errorf("failed to remove spindle from enforcer: %w", err) 894 } 895 } 896 897 err = tx.Commit() 898 if err != nil { 899 return fmt.Errorf("failed to commit txn: %w", err) 900 } 901 902 err = i.Enforcer.E.SavePolicy() 903 if err != nil { 904 return fmt.Errorf("failed to save ACLs: %w", err) 905 } 906 907 l.Info("ingested record", "instance", instance) 908 } 909 910 return nil 911} 912 913func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error { 914 did := e.Did 915 rkey := e.Commit.RKey 916 917 var err error 918 919 l = l.With("handler", "ingestString") 920 921 switch e.Commit.Operation { 922 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 923 raw := json.RawMessage(e.Commit.Record) 924 record := tangled.String{} 925 err = json.Unmarshal(raw, &record) 926 if err != nil { 927 l.Error("invalid record", "err", err) 928 return err 929 } 930 931 string := models.StringFromRecord(did, rkey, record) 932 933 if err = string.Validate(); err != nil { 934 l.Error("invalid record", "err", err) 935 return err 936 } 937 938 if err = db.AddString(i.Db, string); err != nil { 939 l.Error("failed to add string", "err", err) 940 return err 941 } 942 943 l.Info("ingested record") 944 return nil 945 946 case jmodels.CommitOperationDelete: 947 if err := db.DeleteString( 948 i.Db, 949 orm.FilterEq("did", did), 950 orm.FilterEq("rkey", rkey), 951 ); err != nil { 952 l.Error("failed to delete", "err", err) 953 return fmt.Errorf("failed to delete string record: %w", err) 954 } 955 956 l.Info("ingested record") 957 return nil 958 } 959 960 return nil 961} 962 963func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 964 did := e.Did 965 var err error 966 967 l = l.With("handler", "ingestKnotMember") 968 969 switch e.Commit.Operation { 970 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 971 raw := json.RawMessage(e.Commit.Record) 972 record := tangled.KnotMember{} 973 err = json.Unmarshal(raw, &record) 974 if err != nil { 975 l.Error("invalid record", "err", err) 976 return err 977 } 978 979 // only knot owner can invite to knots 980 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 981 if err != nil { 982 return fmt.Errorf("failed to check invite permission: %w", err) 983 } 984 if !ok { 985 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 986 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 987 } 988 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 989 if err != nil { 990 return fmt.Errorf("failed to re-check invite permission: %w", err) 991 } 992 if !ok { 993 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 994 } 995 } 996 997 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 998 if err != nil { 999 return err 1000 } 1001 1002 if memberId.Handle.IsInvalidHandle() { 1003 return fmt.Errorf("invalid handle for member %s", record.Subject) 1004 } 1005 1006 existing, err := db.GetKnotMembers(i.Db, 1007 orm.FilterEq("did", did), 1008 orm.FilterEq("rkey", e.Commit.RKey), 1009 ) 1010 if err != nil { 1011 return fmt.Errorf("failed to look up existing member: %w", err) 1012 } 1013 if len(existing) > 1 { 1014 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 1015 } 1016 1017 tx, err := i.Db.Begin() 1018 if err != nil { 1019 return fmt.Errorf("failed to start txn: %w", err) 1020 } 1021 committed := false 1022 defer func() { 1023 if committed { 1024 return 1025 } 1026 tx.Rollback() 1027 i.Enforcer.E.LoadPolicy() 1028 }() 1029 1030 if len(existing) == 1 { 1031 prev := existing[0] 1032 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1033 if err = db.RemoveKnotMember(tx, 1034 orm.FilterEq("did", did), 1035 orm.FilterEq("rkey", e.Commit.RKey), 1036 ); err != nil { 1037 return fmt.Errorf("failed to remove stale row: %w", err) 1038 } 1039 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1040 return fmt.Errorf("failed to remove stale ACL: %w", err) 1041 } 1042 } 1043 } 1044 1045 if err = db.AddKnotMember(tx, models.KnotMember{ 1046 Did: syntax.DID(did), 1047 Rkey: e.Commit.RKey, 1048 Domain: record.Domain, 1049 Subject: memberId.DID, 1050 }); err != nil { 1051 return fmt.Errorf("failed to add to db: %w", err) 1052 } 1053 1054 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1055 return fmt.Errorf("failed to update ACLs: %w", err) 1056 } 1057 1058 if err = tx.Commit(); err != nil { 1059 return fmt.Errorf("failed to commit txn: %w", err) 1060 } 1061 1062 if err = i.Enforcer.E.SavePolicy(); err != nil { 1063 return fmt.Errorf("failed to save ACLs: %w", err) 1064 } 1065 committed = true 1066 1067 l.Info("upserted knot member") 1068 case jmodels.CommitOperationDelete: 1069 rkey := e.Commit.RKey 1070 1071 members, err := db.GetKnotMembers( 1072 i.Db, 1073 orm.FilterEq("did", did), 1074 orm.FilterEq("rkey", rkey), 1075 ) 1076 if err != nil { 1077 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1078 } 1079 if len(members) == 0 { 1080 l.Info("knot member already removed", "rkey", rkey) 1081 return nil 1082 } 1083 if len(members) > 1 { 1084 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1085 } 1086 member := members[0] 1087 1088 tx, err := i.Db.Begin() 1089 if err != nil { 1090 return fmt.Errorf("failed to start txn: %w", err) 1091 } 1092 committed := false 1093 defer func() { 1094 if committed { 1095 return 1096 } 1097 tx.Rollback() 1098 i.Enforcer.E.LoadPolicy() 1099 }() 1100 1101 if err = db.RemoveKnotMember( 1102 tx, 1103 orm.FilterEq("did", did), 1104 orm.FilterEq("rkey", rkey), 1105 ); err != nil { 1106 return fmt.Errorf("failed to remove from db: %w", err) 1107 } 1108 1109 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1110 return fmt.Errorf("failed to update ACLs: %w", err) 1111 } 1112 1113 if err = tx.Commit(); err != nil { 1114 return fmt.Errorf("failed to commit txn: %w", err) 1115 } 1116 1117 if err = i.Enforcer.E.SavePolicy(); err != nil { 1118 return fmt.Errorf("failed to save ACLs: %w", err) 1119 } 1120 committed = true 1121 1122 l.Info("removed knot member") 1123 } 1124 1125 return nil 1126} 1127 1128func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1129 did := e.Did 1130 var err error 1131 1132 l = l.With("handler", "ingestKnot") 1133 1134 switch e.Commit.Operation { 1135 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1136 raw := json.RawMessage(e.Commit.Record) 1137 record := tangled.Knot{} 1138 err = json.Unmarshal(raw, &record) 1139 if err != nil { 1140 l.Error("invalid record", "err", err) 1141 return err 1142 } 1143 1144 domain := e.Commit.RKey 1145 1146 err := db.AddKnot(i.Db, domain, did) 1147 if err != nil { 1148 l.Error("failed to add knot to db", "err", err, "domain", domain) 1149 return err 1150 } 1151 1152 if err := i.verifyKnot(ctx, domain, did); err != nil { 1153 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1154 } 1155 1156 l.Info("ingested record", "domain", domain) 1157 return nil 1158 1159 case jmodels.CommitOperationDelete: 1160 domain := e.Commit.RKey 1161 1162 // get record from db first 1163 registrations, err := db.GetRegistrations( 1164 i.Db, 1165 orm.FilterEq("domain", domain), 1166 orm.FilterEq("did", did), 1167 ) 1168 if err != nil { 1169 return fmt.Errorf("failed to get registration: %w", err) 1170 } 1171 if len(registrations) != 1 { 1172 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1173 } 1174 registration := registrations[0] 1175 1176 tx, err := i.Db.Begin() 1177 if err != nil { 1178 return fmt.Errorf("failed to start txn: %w", err) 1179 } 1180 defer func() { 1181 tx.Rollback() 1182 i.Enforcer.E.LoadPolicy() 1183 }() 1184 1185 err = db.RemoveKnotMember( 1186 tx, 1187 orm.FilterEq("did", did), 1188 orm.FilterEq("domain", domain), 1189 ) 1190 if err != nil { 1191 return fmt.Errorf("failed to remove knot members: %w", err) 1192 } 1193 1194 err = db.DeleteKnot( 1195 tx, 1196 orm.FilterEq("did", did), 1197 orm.FilterEq("domain", domain), 1198 ) 1199 if err != nil { 1200 return fmt.Errorf("failed to delete knot: %w", err) 1201 } 1202 1203 err = db.RemoveReposByKnot(tx, domain) 1204 if err != nil { 1205 return fmt.Errorf("failed to remove repos by knot: %w", err) 1206 } 1207 1208 if registration.Registered != nil { 1209 err = i.Enforcer.RemoveKnot(domain) 1210 if err != nil { 1211 return fmt.Errorf("failed to remove knot from enforcer: %w", err) 1212 } 1213 } 1214 1215 err = tx.Commit() 1216 if err != nil { 1217 return fmt.Errorf("failed to commit txn: %w", err) 1218 } 1219 1220 err = i.Enforcer.E.SavePolicy() 1221 if err != nil { 1222 return fmt.Errorf("failed to save ACLs: %w", err) 1223 } 1224 1225 l.Info("ingested record", "domain", domain) 1226 } 1227 1228 return nil 1229} 1230 1231const ( 1232 verifyAttempts = 4 1233 verifyMinDelay = 1 * time.Second 1234 verifyMaxDelay = 5 * time.Second 1235) 1236 1237func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1238 regs, err := db.GetRegistrations(i.Db, 1239 orm.FilterEq("domain", domain), 1240 orm.FilterEq("did", did), 1241 ) 1242 if err != nil { 1243 return fmt.Errorf("look up registration: %w", err) 1244 } 1245 if len(regs) != 1 { 1246 return fmt.Errorf("no registration for %s by %s", domain, did) 1247 } 1248 if regs[0].Registered != nil { 1249 return nil 1250 } 1251 1252 err = retry.Do( 1253 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1254 retry.Context(ctx), 1255 retry.Attempts(verifyAttempts), 1256 retry.Delay(verifyMinDelay), 1257 retry.MaxDelay(verifyMaxDelay), 1258 retry.DelayType(retry.BackOffDelay), 1259 retry.LastErrorOnly(true), 1260 ) 1261 if err != nil { 1262 return fmt.Errorf("verify: %w", err) 1263 } 1264 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1265} 1266 1267func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1268 spindles, err := db.GetSpindles(ctx, i.Db, 1269 orm.FilterEq("instance", instance), 1270 orm.FilterEq("owner", did), 1271 ) 1272 if err != nil { 1273 return fmt.Errorf("look up spindle: %w", err) 1274 } 1275 if len(spindles) != 1 { 1276 return fmt.Errorf("no spindle for %s by %s", instance, did) 1277 } 1278 if spindles[0].Verified != nil { 1279 return nil 1280 } 1281 1282 err = retry.Do( 1283 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1284 retry.Context(ctx), 1285 retry.Attempts(verifyAttempts), 1286 retry.Delay(verifyMinDelay), 1287 retry.MaxDelay(verifyMaxDelay), 1288 retry.DelayType(retry.BackOffDelay), 1289 retry.LastErrorOnly(true), 1290 ) 1291 if err != nil { 1292 return fmt.Errorf("verify: %w", err) 1293 } 1294 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1295 return err 1296} 1297 1298const sweepConcurrency = 4 1299 1300func (i *Ingester) SweepPendingVerifications() { 1301 l := i.Logger.With("handler", "SweepPendingVerifications") 1302 1303 var g errgroup.Group 1304 g.SetLimit(sweepConcurrency) 1305 1306 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1307 if err != nil { 1308 l.Error("failed to list unverified knots", "err", err) 1309 } else { 1310 for _, reg := range regs { 1311 g.Go(func() error { 1312 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1313 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1314 } 1315 return nil 1316 }) 1317 } 1318 } 1319 1320 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1321 if err != nil { 1322 l.Error("failed to list unverified spindles", "err", err) 1323 g.Wait() 1324 return 1325 } 1326 for _, s := range spindles { 1327 g.Go(func() error { 1328 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1329 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1330 } 1331 return nil 1332 }) 1333 } 1334 g.Wait() 1335} 1336 1337func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1338 did := e.Did 1339 rkey := e.Commit.RKey 1340 1341 var err error 1342 1343 l = l.With("handler", "ingestIssue") 1344 1345 switch e.Commit.Operation { 1346 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1347 raw := json.RawMessage(e.Commit.Record) 1348 record := tangled.RepoIssue{} 1349 err = json.Unmarshal(raw, &record) 1350 if err != nil { 1351 l.Error("invalid record", "err", err) 1352 return err 1353 } 1354 1355 issue := models.IssueFromRecord(did, rkey, record) 1356 1357 if issue.RepoDid == "" { 1358 return fmt.Errorf("issue record has no repo field") 1359 } 1360 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1361 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1362 } 1363 1364 if err := issue.Validate(); err != nil { 1365 return fmt.Errorf("failed to validate issue: %w", err) 1366 } 1367 1368 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1369 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1370 if repoErr == nil && repo.RepoDid != "" { 1371 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1372 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1373 } 1374 } 1375 } 1376 1377 tx, err := i.Db.BeginTx(ctx, nil) 1378 if err != nil { 1379 l.Error("failed to begin transaction", "err", err) 1380 return err 1381 } 1382 defer tx.Rollback() 1383 1384 err = db.PutIssue(tx, &issue) 1385 if err != nil { 1386 l.Error("failed to create issue", "err", err) 1387 return err 1388 } 1389 1390 err = tx.Commit() 1391 if err != nil { 1392 l.Error("failed to commit txn", "err", err) 1393 return err 1394 } 1395 1396 l.Info("ingested record") 1397 return nil 1398 1399 case jmodels.CommitOperationDelete: 1400 tx, err := i.Db.BeginTx(ctx, nil) 1401 if err != nil { 1402 l.Error("failed to begin transaction", "err", err) 1403 return err 1404 } 1405 defer tx.Rollback() 1406 1407 if err := db.DeleteIssues( 1408 tx, 1409 did, 1410 rkey, 1411 ); err != nil { 1412 l.Error("failed to delete", "err", err) 1413 return fmt.Errorf("failed to delete issue record: %w", err) 1414 } 1415 if err := tx.Commit(); err != nil { 1416 l.Error("failed to commit txn", "err", err) 1417 return err 1418 } 1419 1420 l.Info("ingested record") 1421 return nil 1422 } 1423 1424 return nil 1425} 1426 1427func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1428 did := e.Did 1429 rkey := e.Commit.RKey 1430 1431 var err error 1432 1433 l = l.With("handler", "ingestPull") 1434 1435 switch e.Commit.Operation { 1436 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1437 raw := json.RawMessage(e.Commit.Record) 1438 record := tangled.RepoPull{} 1439 err = json.Unmarshal(raw, &record) 1440 if err != nil { 1441 l.Error("invalid record", "err", err) 1442 return err 1443 } 1444 1445 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1446 if err != nil { 1447 l.Error("failed to resolve did", "err", err) 1448 return err 1449 } 1450 1451 // go through and fetch all blobs in parallel 1452 readers := make([]*io.ReadCloser, len(record.Rounds)) 1453 var mu sync.Mutex 1454 1455 g, gctx := errgroup.WithContext(ctx) 1456 1457 for idx, b := range record.Rounds { 1458 g.Go(func() error { 1459 // for some reason, a blob is empty 1460 if b.PatchBlob == nil { 1461 return fmt.Errorf("missing patchBlob in round %d", idx) 1462 } 1463 1464 ownerPds := ownerId.PDSEndpoint() 1465 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1466 q := url.Query() 1467 q.Set("cid", b.PatchBlob.Ref.String()) 1468 q.Set("did", did) 1469 url.RawQuery = q.Encode() 1470 1471 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1472 if err != nil { 1473 l.Error("failed to create request") 1474 return err 1475 } 1476 req.Header.Set("Content-Type", "application/json") 1477 1478 resp, err := http.DefaultClient.Do(req) 1479 if err != nil { 1480 l.Error("failed to make request") 1481 return err 1482 } 1483 1484 mu.Lock() 1485 readers[idx] = &resp.Body 1486 mu.Unlock() 1487 1488 return nil 1489 }) 1490 } 1491 1492 if err := g.Wait(); err != nil { 1493 for _, r := range readers { 1494 if r != nil && *r != nil { 1495 (*r).Close() 1496 } 1497 } 1498 return err 1499 } 1500 1501 defer func() { 1502 for _, r := range readers { 1503 if r != nil && *r != nil { 1504 (*r).Close() 1505 } 1506 } 1507 }() 1508 1509 pull, err := models.PullFromRecord(did, rkey, record, readers) 1510 if err != nil { 1511 return fmt.Errorf("failed to parse pull from record: %w", err) 1512 } 1513 if err := pull.Validate(); err != nil { 1514 return fmt.Errorf("failed to validate pull: %w", err) 1515 } 1516 if pull.DependentOn != nil { 1517 if err := func() error { 1518 dependentPull, err := db.GetPull( 1519 i.Db, 1520 orm.FilterEq("dependent_on", pull.DependentOn.String()), 1521 ) 1522 if errors.Is(err, sql.ErrNoRows) { 1523 return nil 1524 } 1525 if err != nil { 1526 return fmt.Errorf("failed to fetch pulls with same dependency: %w", err) 1527 } 1528 if dependentPull.AtUri() == pull.AtUri() { 1529 return nil 1530 } 1531 return fmt.Errorf("another pull already depends on %s, which would form a DAG, this is presently disallowed", pull.DependentOn.String()) 1532 }(); err != nil { 1533 return fmt.Errorf("failed to validate pull stack: %w", err) 1534 } 1535 } 1536 1537 tx, err := i.Db.BeginTx(ctx, nil) 1538 if err != nil { 1539 l.Error("failed to begin transaction", "err", err) 1540 return err 1541 } 1542 defer tx.Rollback() 1543 1544 err = db.PutPull(tx, pull) 1545 if err != nil { 1546 l.Error("failed to create pull", "err", err) 1547 return err 1548 } 1549 1550 err = tx.Commit() 1551 if err != nil { 1552 l.Error("failed to commit txn", "err", err) 1553 return err 1554 } 1555 1556 l.Info("ingested record") 1557 return nil 1558 1559 case jmodels.CommitOperationDelete: 1560 tx, err := i.Db.BeginTx(ctx, nil) 1561 if err != nil { 1562 l.Error("failed to begin transaction", "err", err) 1563 return err 1564 } 1565 defer tx.Rollback() 1566 1567 if err := db.AbandonPulls( 1568 tx, 1569 orm.FilterEq("owner_did", did), 1570 orm.FilterEq("rkey", rkey), 1571 ); err != nil { 1572 l.Error("failed to abandon", "err", err) 1573 return fmt.Errorf("failed to abandon pull record: %w", err) 1574 } 1575 if err := tx.Commit(); err != nil { 1576 l.Error("failed to commit txn", "err", err) 1577 return err 1578 } 1579 1580 l.Info("ingested record") 1581 return nil 1582 } 1583 1584 return nil 1585} 1586 1587// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions 1588func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error { 1589 l = l.With("handler", "ingestIssueComment") 1590 1591 switch e.Commit.Operation { 1592 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1593 // no-op. sh.tangled.repo.issue.comment is deprecated 1594 1595 case jmodels.CommitOperationDelete: 1596 if err := db.PurgeComments( 1597 i.Db, 1598 orm.FilterEq("did", e.Did), 1599 orm.FilterEq("collection", e.Commit.Collection), 1600 orm.FilterEq("rkey", e.Commit.RKey), 1601 ); err != nil { 1602 return fmt.Errorf("failed to delete comment record: %w", err) 1603 } 1604 } 1605 1606 l.Info("ingested record") 1607 return nil 1608} 1609 1610// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1611func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error { 1612 l = l.With("handler", "ingestPullComment") 1613 1614 switch e.Commit.Operation { 1615 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1616 // no-op. sh.tangled.repo.pull.comment is deprecated 1617 1618 case jmodels.CommitOperationDelete: 1619 if err := db.PurgeComments( 1620 i.Db, 1621 orm.FilterEq("did", e.Did), 1622 orm.FilterEq("collection", e.Commit.Collection), 1623 orm.FilterEq("rkey", e.Commit.RKey), 1624 ); err != nil { 1625 return fmt.Errorf("failed to delete comment record: %w", err) 1626 } 1627 } 1628 1629 l.Info("ingested record") 1630 return nil 1631} 1632 1633func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error { 1634 did := e.Did 1635 rkey := e.Commit.RKey 1636 cid := e.Commit.CID 1637 1638 var err error 1639 1640 l = l.With("handler", "ingestComment") 1641 1642 ctx := context.Background() 1643 1644 switch e.Commit.Operation { 1645 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1646 raw := json.RawMessage(e.Commit.Record) 1647 record := tangled.FeedComment{} 1648 err = json.Unmarshal(raw, &record) 1649 if err != nil { 1650 return fmt.Errorf("invalid record: %w", err) 1651 } 1652 1653 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1654 if err != nil { 1655 return fmt.Errorf("failed to parse comment from record: %w", err) 1656 } 1657 1658 if err := comment.Validate(); err != nil { 1659 return fmt.Errorf("failed to validate comment: %w", err) 1660 } 1661 1662 var references []syntax.ATURI 1663 if comment.Body.Original != nil { 1664 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1665 } 1666 1667 tx, err := i.Db.Begin() 1668 if err != nil { 1669 return fmt.Errorf("failed to start transaction: %w", err) 1670 } 1671 defer tx.Rollback() 1672 1673 _, err = db.PutComment(tx, comment, references) 1674 if err != nil { 1675 return fmt.Errorf("failed to create comment: %w", err) 1676 } 1677 1678 if err := tx.Commit(); err != nil { 1679 return err 1680 } 1681 1682 case jmodels.CommitOperationDelete: 1683 if err := db.DeleteComments( 1684 i.Db, 1685 orm.FilterEq("did", did), 1686 orm.FilterEq("collection", e.Commit.Collection), 1687 orm.FilterEq("rkey", rkey), 1688 ); err != nil { 1689 return fmt.Errorf("failed to delete comment record: %w", err) 1690 } 1691 } 1692 1693 l.Info("ingested record") 1694 return nil 1695} 1696 1697func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error { 1698 did := e.Did 1699 rkey := e.Commit.RKey 1700 1701 l = l.With("handler", "ingestReaction") 1702 1703 switch e.Commit.Operation { 1704 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1705 raw := json.RawMessage(e.Commit.Record) 1706 record := tangled.FeedReaction{} 1707 if err := json.Unmarshal(raw, &record); err != nil { 1708 return fmt.Errorf("invalid record: %w", err) 1709 } 1710 1711 subjectUri, err := syntax.ParseATURI(record.Subject) 1712 if err != nil { 1713 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err) 1714 } 1715 subjectUri = models.NormalizeReactionSubject(subjectUri) 1716 1717 kind, ok := models.ParseReactionKind(record.Reaction) 1718 if !ok { 1719 return fmt.Errorf("invalid reaction kind: %q", record.Reaction) 1720 } 1721 1722 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt) 1723 if parseErr != nil { 1724 created = time.Now() 1725 } 1726 1727 tx, err := i.Db.Begin() 1728 if err != nil { 1729 return fmt.Errorf("failed to start transaction: %w", err) 1730 } 1731 defer tx.Rollback() 1732 1733 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1734 return fmt.Errorf("failed to clear existing reaction: %w", err) 1735 } 1736 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1737 return fmt.Errorf("failed to add reaction: %w", err) 1738 } 1739 1740 if err := tx.Commit(); err != nil { 1741 return err 1742 } 1743 1744 case jmodels.CommitOperationDelete: 1745 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil { 1746 return fmt.Errorf("failed to delete reaction record: %w", err) 1747 } 1748 } 1749 1750 l.Info("ingested record") 1751 return nil 1752} 1753 1754func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error { 1755 did := e.Did 1756 rkey := e.Commit.RKey 1757 1758 var err error 1759 1760 l = l.With("handler", "ingestLabelDefinition") 1761 1762 switch e.Commit.Operation { 1763 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1764 raw := json.RawMessage(e.Commit.Record) 1765 record := tangled.LabelDefinition{} 1766 err = json.Unmarshal(raw, &record) 1767 if err != nil { 1768 return fmt.Errorf("invalid record: %w", err) 1769 } 1770 1771 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1772 if err != nil { 1773 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1774 } 1775 1776 if err := def.Validate(); err != nil { 1777 return fmt.Errorf("failed to validate labeldef: %w", err) 1778 } 1779 1780 _, err = db.AddLabelDefinition(i.Db, def) 1781 if err != nil { 1782 return fmt.Errorf("failed to create labeldef: %w", err) 1783 } 1784 1785 l.Info("ingested record") 1786 return nil 1787 1788 case jmodels.CommitOperationDelete: 1789 if err := db.DeleteLabelDefinition( 1790 i.Db, 1791 orm.FilterEq("did", did), 1792 orm.FilterEq("rkey", rkey), 1793 ); err != nil { 1794 return fmt.Errorf("failed to delete labeldef record: %w", err) 1795 } 1796 1797 l.Info("ingested record") 1798 return nil 1799 } 1800 1801 return nil 1802} 1803 1804func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 1805 did := e.Did 1806 rkey := e.Commit.RKey 1807 1808 var err error 1809 1810 l = l.With("handler", "ingestLabelOp") 1811 1812 switch e.Commit.Operation { 1813 case jmodels.CommitOperationCreate: 1814 raw := json.RawMessage(e.Commit.Record) 1815 record := tangled.LabelOp{} 1816 err = json.Unmarshal(raw, &record) 1817 if err != nil { 1818 return fmt.Errorf("invalid record: %w", err) 1819 } 1820 1821 subject := syntax.ATURI(record.Subject) 1822 collection := subject.Collection() 1823 1824 var repo *models.Repo 1825 switch collection { 1826 case tangled.RepoIssueNSID: 1827 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1828 if err != nil || len(i) != 1 { 1829 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1830 } 1831 repo = i[0].Repo 1832 case tangled.RepoPullNSID: 1833 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject)) 1834 if err != nil || len(p) != 1 { 1835 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p)) 1836 } 1837 repo = p[0].Repo 1838 default: 1839 return fmt.Errorf("unsupported label subject: %s", collection) 1840 } 1841 1842 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1843 if err != nil { 1844 return fmt.Errorf("failed to build label application ctx: %w", err) 1845 } 1846 1847 ops := models.LabelOpsFromRecord(did, rkey, record) 1848 1849 for _, o := range ops { 1850 def, ok := actx.Defs[o.OperandKey] 1851 if !ok { 1852 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1853 } 1854 // validate permissions: only collaborators can apply labels currently 1855 // 1856 // TODO: introduce a repo:triage permission 1857 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, o.Did, "repo:push") 1858 if permErr != nil { 1859 if !errors.Is(permErr, knotacl.ErrKnotUnreachable) { 1860 return fmt.Errorf("enforcing permission: %w", permErr) 1861 } 1862 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", permErr) 1863 } else if !allowed { 1864 return fmt.Errorf("unauthorized label operation") 1865 } 1866 1867 if err := def.ValidateOperandValue(&o); err != nil { 1868 return fmt.Errorf("failed to validate labelop: %w", err) 1869 } 1870 } 1871 1872 tx, err := i.Db.Begin() 1873 if err != nil { 1874 return err 1875 } 1876 defer tx.Rollback() 1877 1878 for _, o := range ops { 1879 _, err = db.AddLabelOp(tx, &o) 1880 if err != nil { 1881 return fmt.Errorf("failed to add labelop: %w", err) 1882 } 1883 } 1884 1885 if err = tx.Commit(); err != nil { 1886 return err 1887 } 1888 1889 l.Info("ingested record") 1890 } 1891 1892 return nil 1893}