Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/models" 31 "tangled.org/core/appview/notify" 32 "tangled.org/core/appview/repoverify" 33 "tangled.org/core/appview/serververify" 34 "tangled.org/core/appview/validator" 35 "tangled.org/core/idresolver" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38) 39 40type Ingester struct { 41 Db *db.DB 42 Enforcer *rbac.Enforcer 43 IdResolver *idresolver.Resolver 44 Cache *cache.Cache 45 Config *config.Config 46 Logger *slog.Logger 47 Validator *validator.Validator 48 Notifier notify.Notifier 49 Verifier repoverify.Verifier 50} 51 52type processFunc func(ctx context.Context, e *jmodels.Event) error 53 54func (i *Ingester) Ingest() processFunc { 55 return func(ctx context.Context, e *jmodels.Event) error { 56 var err error 57 58 l := i.Logger.With("kind", e.Kind) 59 switch e.Kind { 60 case jmodels.EventKindAccount: 61 // TODO: sync account state to db 62 if e.Account.Active { 63 break 64 } 65 // TODO: revoke sessions by DID 66 if *e.Account.Status == "deactivated" { 67 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 68 } 69 case jmodels.EventKindIdentity: 70 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 71 case jmodels.EventKindCommit: 72 switch e.Commit.Collection { 73 case tangled.GraphFollowNSID: 74 err = i.ingestFollow(e) 75 case tangled.GraphVouchNSID: 76 err = i.ingestVouch(ctx, e) 77 case tangled.FeedStarNSID: 78 err = i.ingestStar(ctx, e) 79 case tangled.PublicKeyNSID: 80 err = i.ingestPublicKey(e) 81 case tangled.RepoArtifactNSID: 82 err = i.ingestArtifact(ctx, e) 83 case tangled.ActorProfileNSID: 84 err = i.ingestProfile(ctx, e) 85 case tangled.SpindleMemberNSID: 86 err = i.ingestSpindleMember(ctx, e) 87 case tangled.SpindleNSID: 88 err = i.ingestSpindle(ctx, e) 89 case tangled.KnotMemberNSID: 90 err = i.ingestKnotMember(e) 91 case tangled.KnotNSID: 92 err = i.ingestKnot(e) 93 case tangled.StringNSID: 94 err = i.ingestString(e) 95 case tangled.RepoIssueNSID: 96 err = i.ingestIssue(ctx, e) 97 case tangled.RepoPullNSID: 98 err = i.ingestPull(ctx, e) 99 case tangled.RepoIssueCommentNSID: 100 err = i.ingestIssueComment(e) 101 case tangled.LabelDefinitionNSID: 102 err = i.ingestLabelDefinition(e) 103 case tangled.LabelOpNSID: 104 err = i.ingestLabelOp(e) 105 case tangled.RepoNSID: 106 err = i.ingestRepo(ctx, e) 107 } 108 l = i.Logger.With("nsid", e.Commit.Collection) 109 } 110 111 if err != nil { 112 l.Warn("failed to ingest record, skipping", "err", err) 113 } 114 115 lastTimeUs := e.TimeUS + 1 116 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 117 l.Error("failed to save cursor", "err", saveErr) 118 } 119 120 return nil 121 } 122} 123 124func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 125 if strings.HasPrefix(ref, "did:") { 126 return db.GetRepoByDid(i.Db, ref) 127 } 128 return db.GetRepoByAtUri(i.Db, ref) 129} 130 131func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 132 var legacy struct { 133 Subject *string `json:"subject"` 134 SubjectDid *string `json:"subjectDid"` 135 } 136 if err := json.Unmarshal(raw, &legacy); err != nil { 137 return false, err 138 } 139 140 switch { 141 case legacy.SubjectDid != nil: 142 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 143 if err != nil { 144 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 145 return false, nil 146 } 147 star.SubjectType = models.StarSubjectRepo 148 star.Subject = repo.RepoDid 149 return true, nil 150 151 case legacy.Subject != nil: 152 uri, err := syntax.ParseATURI(*legacy.Subject) 153 if err != nil { 154 return false, fmt.Errorf("invalid old-format star subject: %w", err) 155 } 156 switch uri.Collection().String() { 157 case tangled.RepoNSID: 158 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 159 if err != nil { 160 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 161 return false, nil 162 } 163 star.SubjectType = models.StarSubjectRepo 164 star.Subject = repo.RepoDid 165 return true, nil 166 default: 167 star.SubjectType = models.StarSubjectString 168 star.Subject = *legacy.Subject 169 return true, nil 170 } 171 172 default: 173 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 174 } 175} 176 177func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 178 var err error 179 did := e.Did 180 181 l := i.Logger.With("handler", "ingestStar") 182 l = l.With("nsid", e.Commit.Collection) 183 184 switch e.Commit.Operation { 185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 186 raw := json.RawMessage(e.Commit.Record) 187 record := tangled.FeedStar{} 188 unmarshalErr := json.Unmarshal(raw, &record) 189 190 star := &models.Star{ 191 Did: did, 192 Rkey: e.Commit.RKey, 193 } 194 195 switch { 196 case unmarshalErr != nil: 197 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 198 if resolveErr != nil { 199 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 200 return unmarshalErr 201 } 202 if !resolved { 203 return nil 204 } 205 206 case record.Subject == nil: 207 return fmt.Errorf("star record has nil subject") 208 209 case record.Subject.FeedStar_Repo != nil: 210 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 211 if repoErr != nil { 212 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 213 return nil 214 } 215 star.SubjectType = models.StarSubjectRepo 216 star.Subject = repo.RepoDid 217 218 case record.Subject.FeedStar_String != nil: 219 star.SubjectType = models.StarSubjectString 220 star.Subject = record.Subject.FeedStar_String.Uri 221 222 default: 223 return fmt.Errorf("star record has empty subject union") 224 } 225 226 err = db.AddStar(i.Db, star) 227 case jmodels.CommitOperationDelete: 228 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 229 } 230 231 if err != nil { 232 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 233 } 234 235 return nil 236} 237 238func (i *Ingester) ingestFollow(e *jmodels.Event) error { 239 var err error 240 did := e.Did 241 242 l := i.Logger.With("handler", "ingestFollow") 243 l = l.With("nsid", e.Commit.Collection) 244 245 switch e.Commit.Operation { 246 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 247 raw := json.RawMessage(e.Commit.Record) 248 record := tangled.GraphFollow{} 249 err = json.Unmarshal(raw, &record) 250 if err != nil { 251 l.Error("invalid record", "err", err) 252 return err 253 } 254 255 err = db.AddFollow(i.Db, &models.Follow{ 256 UserDid: did, 257 SubjectDid: record.Subject, 258 Rkey: e.Commit.RKey, 259 }) 260 case jmodels.CommitOperationDelete: 261 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 262 } 263 264 if err != nil { 265 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 266 } 267 268 return nil 269} 270 271func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 272 var err error 273 did := e.Did 274 275 l := i.Logger.With("handler", "ingestVouch") 276 l = l.With("nsid", e.Commit.Collection) 277 l.Info("ingesting vouch") 278 279 switch e.Commit.Operation { 280 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 281 raw := json.RawMessage(e.Commit.Record) 282 record := tangled.GraphVouch{} 283 err = json.Unmarshal(raw, &record) 284 if err != nil { 285 l.Error("invalid record", "err", err) 286 return err 287 } 288 289 // rkey is the subject_did being vouched for/denounced 290 subjectDID := e.Commit.RKey 291 292 _, err = syntax.ParseDID(subjectDID) 293 if err != nil { 294 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 295 return fmt.Errorf("invalid subject_did: %w", err) 296 } 297 298 if did == subjectDID { 299 l.Warn("attempted self-vouch", "did", did) 300 return fmt.Errorf("cannot vouch for self") 301 } 302 303 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 304 if err != nil { 305 return err 306 } 307 308 if subjectId.Handle.IsInvalidHandle() { 309 return err 310 } 311 312 kind, err := models.ParseVouchKind(record.Kind) 313 if err != nil { 314 l.Error("invalid kind", "kind", kind) 315 return fmt.Errorf("invalid kind: %s", kind) 316 } 317 318 recordCid, err := cid.Parse(e.Commit.CID) 319 if err != nil { 320 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 321 return fmt.Errorf("invalid cid: %w", err) 322 } 323 324 var evidences []syntax.ATURI 325 for _, raw := range record.Evidences { 326 uri, parseErr := syntax.ParseATURI(raw) 327 if parseErr != nil { 328 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 329 continue 330 } 331 evidences = append(evidences, uri) 332 } 333 334 tx, txErr := i.Db.Begin() 335 if txErr != nil { 336 return fmt.Errorf("failed to start transaction: %w", txErr) 337 } 338 339 addErr := db.AddVouch(tx, &models.Vouch{ 340 Did: syntax.DID(did), 341 SubjectDid: subjectId.DID, 342 Cid: recordCid, 343 Kind: kind, 344 Reason: record.Reason, 345 Evidences: evidences, 346 }) 347 if addErr != nil { 348 tx.Rollback() 349 err = addErr 350 } else { 351 err = tx.Commit() 352 } 353 354 case jmodels.CommitOperationDelete: 355 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 356 } 357 358 if err != nil { 359 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 360 } 361 362 return nil 363} 364 365func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 366 did := e.Did 367 var err error 368 369 l := i.Logger.With("handler", "ingestPublicKey") 370 l = l.With("nsid", e.Commit.Collection) 371 372 switch e.Commit.Operation { 373 case jmodels.CommitOperationCreate: 374 l.Debug("processing add of pubkey") 375 raw := json.RawMessage(e.Commit.Record) 376 record := tangled.PublicKey{} 377 err = json.Unmarshal(raw, &record) 378 if err != nil { 379 l.Error("invalid record", "err", err) 380 return err 381 } 382 383 name := record.Name 384 key := record.Key 385 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 386 case jmodels.CommitOperationUpdate: 387 l.Debug("processing update of pubkey") 388 raw := json.RawMessage(e.Commit.Record) 389 record := tangled.PublicKey{} 390 err = json.Unmarshal(raw, &record) 391 if err != nil { 392 l.Error("invalid record", "err", err) 393 return err 394 } 395 396 name := record.Name 397 key := record.Key 398 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 399 case jmodels.CommitOperationDelete: 400 l.Debug("processing delete of pubkey") 401 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 402 } 403 404 if err != nil { 405 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 406 } 407 408 return nil 409} 410 411func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 412 did := e.Did 413 var err error 414 415 l := i.Logger.With("handler", "ingestArtifact") 416 l = l.With("nsid", e.Commit.Collection) 417 418 switch e.Commit.Operation { 419 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 420 raw := json.RawMessage(e.Commit.Record) 421 record := tangled.RepoArtifact{} 422 err = json.Unmarshal(raw, &record) 423 if err != nil { 424 l.Error("invalid record", "err", err) 425 return err 426 } 427 428 var repo *models.Repo 429 if record.RepoDid != nil && *record.RepoDid != "" { 430 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 431 if err != nil && !errors.Is(err, sql.ErrNoRows) { 432 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 433 } 434 } 435 if repo == nil && record.Repo != nil { 436 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 437 if parseErr != nil { 438 return parseErr 439 } 440 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 441 if err != nil { 442 return err 443 } 444 } 445 if repo == nil { 446 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 447 } 448 449 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 450 if err != nil || !ok { 451 return err 452 } 453 454 repoDid := repo.RepoDid 455 if repoDid == "" && record.RepoDid != nil { 456 repoDid = *record.RepoDid 457 } 458 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 459 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 460 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 461 } 462 } 463 464 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 465 if err != nil { 466 createdAt = time.Now() 467 } 468 469 artifact := models.Artifact{ 470 Did: did, 471 Rkey: e.Commit.RKey, 472 RepoDid: syntax.DID(repo.RepoDid), 473 Tag: plumbing.Hash(record.Tag), 474 CreatedAt: createdAt, 475 BlobCid: cid.Cid(record.Artifact.Ref), 476 Name: record.Name, 477 Size: uint64(record.Artifact.Size), 478 MimeType: record.Artifact.MimeType, 479 } 480 481 err = db.AddArtifact(i.Db, artifact) 482 case jmodels.CommitOperationDelete: 483 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 484 } 485 486 if err != nil { 487 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 488 } 489 490 return nil 491} 492 493func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 494 did := e.Did 495 var err error 496 497 l := i.Logger.With("handler", "ingestProfile") 498 l = l.With("nsid", e.Commit.Collection) 499 500 if e.Commit.RKey != "self" { 501 return fmt.Errorf("ingestProfile only ingests `self` record") 502 } 503 504 switch e.Commit.Operation { 505 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 506 raw := json.RawMessage(e.Commit.Record) 507 record := tangled.ActorProfile{} 508 err = json.Unmarshal(raw, &record) 509 if err != nil { 510 l.Error("invalid record", "err", err) 511 return err 512 } 513 514 avatar := "" 515 if record.Avatar != nil { 516 avatar = record.Avatar.Ref.String() 517 } 518 519 description := "" 520 if record.Description != nil { 521 description = *record.Description 522 } 523 524 includeBluesky := record.Bluesky 525 526 pronouns := "" 527 if record.Pronouns != nil { 528 pronouns = *record.Pronouns 529 } 530 531 location := "" 532 if record.Location != nil { 533 location = *record.Location 534 } 535 536 var links [5]string 537 for i, l := range record.Links { 538 if i < 5 { 539 links[i] = l 540 } 541 } 542 543 var stats [2]models.VanityStat 544 for i, s := range record.Stats { 545 if i < 2 { 546 stats[i].Kind = models.ParseVanityStatKind(s) 547 } 548 } 549 550 var pinned [6]string 551 for i, r := range record.PinnedRepositories { 552 if i < 6 { 553 pinned[i] = r 554 } 555 } 556 557 var preferredHandle syntax.Handle 558 if record.PreferredHandle != nil { 559 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 560 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 561 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 562 preferredHandle = h 563 } 564 } 565 } 566 567 profile := models.Profile{ 568 Did: did, 569 Avatar: avatar, 570 Description: description, 571 IncludeBluesky: includeBluesky, 572 Location: location, 573 Links: links, 574 Stats: stats, 575 PinnedRepos: pinned, 576 Pronouns: pronouns, 577 PreferredHandle: preferredHandle, 578 } 579 580 tx, err := i.Db.Begin() 581 if err != nil { 582 return fmt.Errorf("failed to start transaction: %w", err) 583 } 584 585 err = db.ValidateProfile(tx, &profile) 586 if err != nil { 587 return fmt.Errorf("invalid profile record") 588 } 589 590 err = db.UpsertProfile(tx, &profile) 591 if err == nil && i.Cache != nil { 592 pipe := i.Cache.Pipeline() 593 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 594 if preferredHandle != "" { 595 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 596 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 597 } else { 598 pipe.Del(ctx, didKey) 599 } 600 if _, execErr := pipe.Exec(ctx); execErr != nil { 601 l.Warn("failed to update preferred handle cache", "err", execErr) 602 } 603 } 604 case jmodels.CommitOperationDelete: 605 tx, beginErr := i.Db.Begin() 606 if beginErr != nil { 607 return fmt.Errorf("failed to start transaction: %w", beginErr) 608 } 609 610 priorHandle, phErr := db.GetPreferredHandle(tx, did) 611 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 612 l.Warn("failed to read prior preferred handle", "err", phErr) 613 } 614 615 err = db.DeleteProfile(tx, did) 616 if err == nil && i.Cache != nil { 617 pipe := i.Cache.Pipeline() 618 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 619 if priorHandle != "" { 620 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 621 } 622 if _, execErr := pipe.Exec(ctx); execErr != nil { 623 l.Warn("failed to evict preferred handle cache", "err", execErr) 624 } 625 } 626 } 627 628 if err != nil { 629 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 630 } 631 632 return nil 633} 634 635func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 636 did := e.Did 637 var err error 638 639 l := i.Logger.With("handler", "ingestSpindleMember") 640 l = l.With("nsid", e.Commit.Collection) 641 642 switch e.Commit.Operation { 643 case jmodels.CommitOperationCreate: 644 raw := json.RawMessage(e.Commit.Record) 645 record := tangled.SpindleMember{} 646 err = json.Unmarshal(raw, &record) 647 if err != nil { 648 l.Error("invalid record", "err", err) 649 return err 650 } 651 652 // only spindle owner can invite to spindles 653 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 654 if err != nil || !ok { 655 return fmt.Errorf("failed to enforce permissions: %w", err) 656 } 657 658 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 659 if err != nil { 660 return err 661 } 662 663 if memberId.Handle.IsInvalidHandle() { 664 return err 665 } 666 667 err = db.AddSpindleMember(i.Db, models.SpindleMember{ 668 Did: syntax.DID(did), 669 Rkey: e.Commit.RKey, 670 Instance: record.Instance, 671 Subject: memberId.DID, 672 }) 673 if !ok { 674 return fmt.Errorf("failed to add to db: %w", err) 675 } 676 677 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 678 if err != nil { 679 return fmt.Errorf("failed to update ACLs: %w", err) 680 } 681 682 l.Info("added spindle member") 683 case jmodels.CommitOperationDelete: 684 rkey := e.Commit.RKey 685 686 // get record from db first 687 members, err := db.GetSpindleMembers( 688 i.Db, 689 orm.FilterEq("did", did), 690 orm.FilterEq("rkey", rkey), 691 ) 692 if err != nil || len(members) != 1 { 693 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 694 } 695 member := members[0] 696 697 tx, err := i.Db.Begin() 698 if err != nil { 699 return fmt.Errorf("failed to start txn: %w", err) 700 } 701 702 // remove record by rkey && update enforcer 703 if err = db.RemoveSpindleMember( 704 tx, 705 orm.FilterEq("did", did), 706 orm.FilterEq("rkey", rkey), 707 ); err != nil { 708 return fmt.Errorf("failed to remove from db: %w", err) 709 } 710 711 // update enforcer 712 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 713 if err != nil { 714 return fmt.Errorf("failed to update ACLs: %w", err) 715 } 716 717 if err = tx.Commit(); err != nil { 718 return fmt.Errorf("failed to commit txn: %w", err) 719 } 720 721 if err = i.Enforcer.E.SavePolicy(); err != nil { 722 return fmt.Errorf("failed to save ACLs: %w", err) 723 } 724 725 l.Info("removed spindle member") 726 } 727 728 return nil 729} 730 731func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 732 did := e.Did 733 var err error 734 735 l := i.Logger.With("handler", "ingestSpindle") 736 l = l.With("nsid", e.Commit.Collection) 737 738 switch e.Commit.Operation { 739 case jmodels.CommitOperationCreate: 740 raw := json.RawMessage(e.Commit.Record) 741 record := tangled.Spindle{} 742 err = json.Unmarshal(raw, &record) 743 if err != nil { 744 l.Error("invalid record", "err", err) 745 return err 746 } 747 748 instance := e.Commit.RKey 749 750 err := db.AddSpindle(i.Db, models.Spindle{ 751 Owner: syntax.DID(did), 752 Instance: instance, 753 }) 754 if err != nil { 755 l.Error("failed to add spindle to db", "err", err, "instance", instance) 756 return err 757 } 758 759 err = retry.Do( 760 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 761 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 762 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 763 ) 764 if err != nil { 765 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 766 return err 767 } 768 769 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 770 if err != nil { 771 return fmt.Errorf("failed to mark verified: %w", err) 772 } 773 774 return nil 775 776 case jmodels.CommitOperationDelete: 777 instance := e.Commit.RKey 778 779 // get record from db first 780 spindles, err := db.GetSpindles( 781 ctx, 782 i.Db, 783 orm.FilterEq("owner", did), 784 orm.FilterEq("instance", instance), 785 ) 786 if err != nil || len(spindles) != 1 { 787 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 788 } 789 spindle := spindles[0] 790 791 tx, err := i.Db.Begin() 792 if err != nil { 793 return err 794 } 795 defer func() { 796 tx.Rollback() 797 i.Enforcer.E.LoadPolicy() 798 }() 799 800 // remove spindle members first 801 err = db.RemoveSpindleMember( 802 tx, 803 orm.FilterEq("owner", did), 804 orm.FilterEq("instance", instance), 805 ) 806 if err != nil { 807 return err 808 } 809 810 err = db.DeleteSpindle( 811 tx, 812 orm.FilterEq("owner", did), 813 orm.FilterEq("instance", instance), 814 ) 815 if err != nil { 816 return err 817 } 818 819 if spindle.Verified != nil { 820 err = i.Enforcer.RemoveSpindle(instance) 821 if err != nil { 822 return err 823 } 824 } 825 826 err = tx.Commit() 827 if err != nil { 828 return err 829 } 830 831 err = i.Enforcer.E.SavePolicy() 832 if err != nil { 833 return err 834 } 835 } 836 837 return nil 838} 839 840func (i *Ingester) ingestString(e *jmodels.Event) error { 841 did := e.Did 842 rkey := e.Commit.RKey 843 844 var err error 845 846 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 847 l.Info("ingesting record") 848 849 switch e.Commit.Operation { 850 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 851 raw := json.RawMessage(e.Commit.Record) 852 record := tangled.String{} 853 err = json.Unmarshal(raw, &record) 854 if err != nil { 855 l.Error("invalid record", "err", err) 856 return err 857 } 858 859 string := models.StringFromRecord(did, rkey, record) 860 861 if err = i.Validator.ValidateString(&string); err != nil { 862 l.Error("invalid record", "err", err) 863 return err 864 } 865 866 if err = db.AddString(i.Db, string); err != nil { 867 l.Error("failed to add string", "err", err) 868 return err 869 } 870 871 return nil 872 873 case jmodels.CommitOperationDelete: 874 if err := db.DeleteString( 875 i.Db, 876 orm.FilterEq("did", did), 877 orm.FilterEq("rkey", rkey), 878 ); err != nil { 879 l.Error("failed to delete", "err", err) 880 return fmt.Errorf("failed to delete string record: %w", err) 881 } 882 883 return nil 884 } 885 886 return nil 887} 888 889func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 890 did := e.Did 891 var err error 892 893 l := i.Logger.With("handler", "ingestKnotMember") 894 l = l.With("nsid", e.Commit.Collection) 895 896 switch e.Commit.Operation { 897 case jmodels.CommitOperationCreate: 898 raw := json.RawMessage(e.Commit.Record) 899 record := tangled.KnotMember{} 900 err = json.Unmarshal(raw, &record) 901 if err != nil { 902 l.Error("invalid record", "err", err) 903 return err 904 } 905 906 // only knot owner can invite to knots 907 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 908 if err != nil || !ok { 909 return fmt.Errorf("failed to enforce permissions: %w", err) 910 } 911 912 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 913 if err != nil { 914 return err 915 } 916 917 if memberId.Handle.IsInvalidHandle() { 918 return err 919 } 920 921 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 922 if err != nil { 923 return fmt.Errorf("failed to update ACLs: %w", err) 924 } 925 926 l.Info("added knot member") 927 case jmodels.CommitOperationDelete: 928 // we don't store knot members in a table (like we do for spindle) 929 // and we can't remove this just yet. possibly fixed if we switch 930 // to either: 931 // 1. a knot_members table like with spindle and store the rkey 932 // 2. use the knot host as the rkey 933 // 934 // TODO: implement member deletion 935 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 936 } 937 938 return nil 939} 940 941func (i *Ingester) ingestKnot(e *jmodels.Event) error { 942 did := e.Did 943 var err error 944 945 l := i.Logger.With("handler", "ingestKnot") 946 l = l.With("nsid", e.Commit.Collection) 947 948 switch e.Commit.Operation { 949 case jmodels.CommitOperationCreate: 950 raw := json.RawMessage(e.Commit.Record) 951 record := tangled.Knot{} 952 err = json.Unmarshal(raw, &record) 953 if err != nil { 954 l.Error("invalid record", "err", err) 955 return err 956 } 957 958 domain := e.Commit.RKey 959 960 err := db.AddKnot(i.Db, domain, did) 961 if err != nil { 962 l.Error("failed to add knot to db", "err", err, "domain", domain) 963 return err 964 } 965 966 err = retry.Do( 967 func() error { 968 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 969 }, 970 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 971 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 972 ) 973 if err != nil { 974 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 975 return err 976 } 977 978 err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 979 if err != nil { 980 return fmt.Errorf("failed to mark verified: %w", err) 981 } 982 983 return nil 984 985 case jmodels.CommitOperationDelete: 986 domain := e.Commit.RKey 987 988 // get record from db first 989 registrations, err := db.GetRegistrations( 990 i.Db, 991 orm.FilterEq("domain", domain), 992 orm.FilterEq("did", did), 993 ) 994 if err != nil { 995 return fmt.Errorf("failed to get registration: %w", err) 996 } 997 if len(registrations) != 1 { 998 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 999 } 1000 registration := registrations[0] 1001 1002 tx, err := i.Db.Begin() 1003 if err != nil { 1004 return err 1005 } 1006 defer func() { 1007 tx.Rollback() 1008 i.Enforcer.E.LoadPolicy() 1009 }() 1010 1011 err = db.DeleteKnot( 1012 tx, 1013 orm.FilterEq("did", did), 1014 orm.FilterEq("domain", domain), 1015 ) 1016 if err != nil { 1017 return err 1018 } 1019 1020 if registration.Registered != nil { 1021 err = i.Enforcer.RemoveKnot(domain) 1022 if err != nil { 1023 return err 1024 } 1025 } 1026 1027 err = tx.Commit() 1028 if err != nil { 1029 return err 1030 } 1031 1032 err = i.Enforcer.E.SavePolicy() 1033 if err != nil { 1034 return err 1035 } 1036 } 1037 1038 return nil 1039} 1040func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1041 did := e.Did 1042 rkey := e.Commit.RKey 1043 1044 var err error 1045 1046 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1047 l.Info("ingesting record") 1048 1049 switch e.Commit.Operation { 1050 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1051 raw := json.RawMessage(e.Commit.Record) 1052 record := tangled.RepoIssue{} 1053 err = json.Unmarshal(raw, &record) 1054 if err != nil { 1055 l.Error("invalid record", "err", err) 1056 return err 1057 } 1058 1059 issue := models.IssueFromRecord(did, rkey, record) 1060 1061 if issue.RepoDid == "" { 1062 return fmt.Errorf("issue record has no repo field") 1063 } 1064 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1065 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1066 } 1067 1068 if err := i.Validator.ValidateIssue(&issue); err != nil { 1069 return fmt.Errorf("failed to validate issue: %w", err) 1070 } 1071 1072 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1073 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1074 if repoErr == nil && repo.RepoDid != "" { 1075 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1076 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1077 } 1078 } 1079 } 1080 1081 tx, err := i.Db.BeginTx(ctx, nil) 1082 if err != nil { 1083 l.Error("failed to begin transaction", "err", err) 1084 return err 1085 } 1086 defer tx.Rollback() 1087 1088 err = db.PutIssue(tx, &issue) 1089 if err != nil { 1090 l.Error("failed to create issue", "err", err) 1091 return err 1092 } 1093 1094 err = tx.Commit() 1095 if err != nil { 1096 l.Error("failed to commit txn", "err", err) 1097 return err 1098 } 1099 1100 return nil 1101 1102 case jmodels.CommitOperationDelete: 1103 tx, err := i.Db.BeginTx(ctx, nil) 1104 if err != nil { 1105 l.Error("failed to begin transaction", "err", err) 1106 return err 1107 } 1108 defer tx.Rollback() 1109 1110 if err := db.DeleteIssues( 1111 tx, 1112 did, 1113 rkey, 1114 ); err != nil { 1115 l.Error("failed to delete", "err", err) 1116 return fmt.Errorf("failed to delete issue record: %w", err) 1117 } 1118 if err := tx.Commit(); err != nil { 1119 l.Error("failed to commit txn", "err", err) 1120 return err 1121 } 1122 1123 return nil 1124 } 1125 1126 return nil 1127} 1128 1129func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1130 did := e.Did 1131 rkey := e.Commit.RKey 1132 1133 var err error 1134 1135 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1136 l.Info("ingesting record") 1137 1138 switch e.Commit.Operation { 1139 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1140 raw := json.RawMessage(e.Commit.Record) 1141 record := tangled.RepoPull{} 1142 err = json.Unmarshal(raw, &record) 1143 if err != nil { 1144 l.Error("invalid record", "err", err) 1145 return err 1146 } 1147 1148 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1149 if err != nil { 1150 l.Error("failed to resolve did") 1151 return err 1152 } 1153 1154 // go through and fetch all blobs in parallel 1155 readers := make([]*io.ReadCloser, len(record.Rounds)) 1156 var mu sync.Mutex 1157 1158 g, gctx := errgroup.WithContext(ctx) 1159 1160 for idx, b := range record.Rounds { 1161 g.Go(func() error { 1162 // for some reason, a blob is empty 1163 if b.PatchBlob == nil { 1164 return fmt.Errorf("missing patchBlob in round %d", idx) 1165 } 1166 1167 ownerPds := ownerId.PDSEndpoint() 1168 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1169 q := url.Query() 1170 q.Set("cid", b.PatchBlob.Ref.String()) 1171 q.Set("did", did) 1172 url.RawQuery = q.Encode() 1173 1174 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1175 if err != nil { 1176 l.Error("failed to create request") 1177 return err 1178 } 1179 req.Header.Set("Content-Type", "application/json") 1180 1181 resp, err := http.DefaultClient.Do(req) 1182 if err != nil { 1183 l.Error("failed to make request") 1184 return err 1185 } 1186 1187 mu.Lock() 1188 readers[idx] = &resp.Body 1189 mu.Unlock() 1190 1191 return nil 1192 }) 1193 } 1194 1195 if err := g.Wait(); err != nil { 1196 for _, r := range readers { 1197 if r != nil && *r != nil { 1198 (*r).Close() 1199 } 1200 } 1201 return err 1202 } 1203 1204 defer func() { 1205 for _, r := range readers { 1206 if r != nil && *r != nil { 1207 (*r).Close() 1208 } 1209 } 1210 }() 1211 1212 pull, err := models.PullFromRecord(did, rkey, record, readers) 1213 if err != nil { 1214 return fmt.Errorf("failed to parse pull from record: %w", err) 1215 } 1216 if err := i.Validator.ValidatePull(pull); err != nil { 1217 return fmt.Errorf("failed to validate pull: %w", err) 1218 } 1219 1220 tx, err := i.Db.BeginTx(ctx, nil) 1221 if err != nil { 1222 l.Error("failed to begin transaction", "err", err) 1223 return err 1224 } 1225 defer tx.Rollback() 1226 1227 err = db.PutPull(tx, pull) 1228 if err != nil { 1229 l.Error("failed to create pull", "err", err) 1230 return err 1231 } 1232 1233 err = tx.Commit() 1234 if err != nil { 1235 l.Error("failed to commit txn", "err", err) 1236 return err 1237 } 1238 1239 return nil 1240 1241 case jmodels.CommitOperationDelete: 1242 tx, err := i.Db.BeginTx(ctx, nil) 1243 if err != nil { 1244 l.Error("failed to begin transaction", "err", err) 1245 return err 1246 } 1247 defer tx.Rollback() 1248 1249 if err := db.AbandonPulls( 1250 tx, 1251 orm.FilterEq("owner_did", did), 1252 orm.FilterEq("rkey", rkey), 1253 ); err != nil { 1254 l.Error("failed to abandon", "err", err) 1255 return fmt.Errorf("failed to abandon pull record: %w", err) 1256 } 1257 if err := tx.Commit(); err != nil { 1258 l.Error("failed to commit txn", "err", err) 1259 return err 1260 } 1261 1262 return nil 1263 } 1264 1265 return nil 1266} 1267 1268func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1269 did := e.Did 1270 rkey := e.Commit.RKey 1271 1272 var err error 1273 1274 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1275 l.Info("ingesting record") 1276 1277 switch e.Commit.Operation { 1278 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1279 raw := json.RawMessage(e.Commit.Record) 1280 record := tangled.RepoIssueComment{} 1281 err = json.Unmarshal(raw, &record) 1282 if err != nil { 1283 return fmt.Errorf("invalid record: %w", err) 1284 } 1285 1286 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1287 if err != nil { 1288 return fmt.Errorf("failed to parse comment from record: %w", err) 1289 } 1290 1291 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1292 return fmt.Errorf("failed to validate comment: %w", err) 1293 } 1294 1295 tx, err := i.Db.Begin() 1296 if err != nil { 1297 return fmt.Errorf("failed to start transaction: %w", err) 1298 } 1299 defer tx.Rollback() 1300 1301 _, err = db.AddIssueComment(tx, *comment) 1302 if err != nil { 1303 return fmt.Errorf("failed to create issue comment: %w", err) 1304 } 1305 1306 return tx.Commit() 1307 1308 case jmodels.CommitOperationDelete: 1309 if err := db.DeleteIssueComments( 1310 i.Db, 1311 orm.FilterEq("did", did), 1312 orm.FilterEq("rkey", rkey), 1313 ); err != nil { 1314 return fmt.Errorf("failed to delete issue comment record: %w", err) 1315 } 1316 1317 return nil 1318 } 1319 1320 return nil 1321} 1322 1323func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1324 did := e.Did 1325 rkey := e.Commit.RKey 1326 1327 var err error 1328 1329 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1330 l.Info("ingesting record") 1331 1332 switch e.Commit.Operation { 1333 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1334 raw := json.RawMessage(e.Commit.Record) 1335 record := tangled.LabelDefinition{} 1336 err = json.Unmarshal(raw, &record) 1337 if err != nil { 1338 return fmt.Errorf("invalid record: %w", err) 1339 } 1340 1341 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1342 if err != nil { 1343 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1344 } 1345 1346 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1347 return fmt.Errorf("failed to validate labeldef: %w", err) 1348 } 1349 1350 _, err = db.AddLabelDefinition(i.Db, def) 1351 if err != nil { 1352 return fmt.Errorf("failed to create labeldef: %w", err) 1353 } 1354 1355 return nil 1356 1357 case jmodels.CommitOperationDelete: 1358 if err := db.DeleteLabelDefinition( 1359 i.Db, 1360 orm.FilterEq("did", did), 1361 orm.FilterEq("rkey", rkey), 1362 ); err != nil { 1363 return fmt.Errorf("failed to delete labeldef record: %w", err) 1364 } 1365 1366 return nil 1367 } 1368 1369 return nil 1370} 1371 1372func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1373 did := e.Did 1374 rkey := e.Commit.RKey 1375 1376 var err error 1377 1378 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1379 l.Info("ingesting record") 1380 1381 switch e.Commit.Operation { 1382 case jmodels.CommitOperationCreate: 1383 raw := json.RawMessage(e.Commit.Record) 1384 record := tangled.LabelOp{} 1385 err = json.Unmarshal(raw, &record) 1386 if err != nil { 1387 return fmt.Errorf("invalid record: %w", err) 1388 } 1389 1390 subject := syntax.ATURI(record.Subject) 1391 collection := subject.Collection() 1392 1393 var repo *models.Repo 1394 switch collection { 1395 case tangled.RepoIssueNSID: 1396 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1397 if err != nil || len(i) != 1 { 1398 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1399 } 1400 repo = i[0].Repo 1401 default: 1402 return fmt.Errorf("unsupported label subject: %s", collection) 1403 } 1404 1405 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1406 if err != nil { 1407 return fmt.Errorf("failed to build label application ctx: %w", err) 1408 } 1409 1410 ops := models.LabelOpsFromRecord(did, rkey, record) 1411 1412 for _, o := range ops { 1413 def, ok := actx.Defs[o.OperandKey] 1414 if !ok { 1415 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1416 } 1417 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1418 return fmt.Errorf("failed to validate labelop: %w", err) 1419 } 1420 } 1421 1422 tx, err := i.Db.Begin() 1423 if err != nil { 1424 return err 1425 } 1426 defer tx.Rollback() 1427 1428 for _, o := range ops { 1429 _, err = db.AddLabelOp(tx, &o) 1430 if err != nil { 1431 return fmt.Errorf("failed to add labelop: %w", err) 1432 } 1433 } 1434 1435 if err = tx.Commit(); err != nil { 1436 return err 1437 } 1438 } 1439 1440 return nil 1441}