Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/models" 31 "tangled.org/core/appview/notify" 32 "tangled.org/core/appview/repoverify" 33 "tangled.org/core/appview/serververify" 34 "tangled.org/core/appview/validator" 35 "tangled.org/core/idresolver" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38) 39 40type Ingester struct { 41 Db *db.DB 42 Enforcer *rbac.Enforcer 43 IdResolver *idresolver.Resolver 44 Cache *cache.Cache 45 Config *config.Config 46 Logger *slog.Logger 47 Validator *validator.Validator 48 Notifier notify.Notifier 49 Verifier repoverify.Verifier 50} 51 52type processFunc func(ctx context.Context, e *jmodels.Event) error 53 54func (i *Ingester) Ingest() processFunc { 55 return func(ctx context.Context, e *jmodels.Event) error { 56 var err error 57 58 l := i.Logger.With("kind", e.Kind) 59 switch e.Kind { 60 case jmodels.EventKindAccount: 61 // TODO: sync account state to db 62 if e.Account.Active { 63 break 64 } 65 // TODO: revoke sessions by DID 66 if *e.Account.Status == "deactivated" { 67 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 68 } 69 case jmodels.EventKindIdentity: 70 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 71 case jmodels.EventKindCommit: 72 switch e.Commit.Collection { 73 case tangled.GraphFollowNSID: 74 err = i.ingestFollow(e) 75 case tangled.GraphVouchNSID: 76 err = i.ingestVouch(ctx, e) 77 case tangled.FeedStarNSID: 78 err = i.ingestStar(ctx, e) 79 case tangled.PublicKeyNSID: 80 err = i.ingestPublicKey(e) 81 case tangled.RepoArtifactNSID: 82 err = i.ingestArtifact(ctx, e) 83 case tangled.ActorProfileNSID: 84 err = i.ingestProfile(ctx, e) 85 case tangled.SpindleMemberNSID: 86 err = i.ingestSpindleMember(ctx, e) 87 case tangled.SpindleNSID: 88 err = i.ingestSpindle(ctx, e) 89 case tangled.KnotMemberNSID: 90 err = i.ingestKnotMember(e) 91 case tangled.KnotNSID: 92 err = i.ingestKnot(e) 93 case tangled.StringNSID: 94 err = i.ingestString(e) 95 case tangled.RepoIssueNSID: 96 err = i.ingestIssue(ctx, e) 97 case tangled.RepoPullNSID: 98 err = i.ingestPull(ctx, e) 99 case tangled.RepoIssueCommentNSID: 100 err = i.ingestIssueComment(e) 101 case tangled.LabelDefinitionNSID: 102 err = i.ingestLabelDefinition(e) 103 case tangled.LabelOpNSID: 104 err = i.ingestLabelOp(e) 105 case tangled.RepoNSID: 106 err = i.ingestRepo(ctx, e) 107 } 108 l = i.Logger.With("nsid", e.Commit.Collection) 109 } 110 111 if err != nil { 112 l.Warn("failed to ingest record, skipping", "err", err) 113 } 114 115 lastTimeUs := e.TimeUS + 1 116 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 117 l.Error("failed to save cursor", "err", saveErr) 118 } 119 120 return nil 121 } 122} 123 124func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 125 if strings.HasPrefix(ref, "did:") { 126 return db.GetRepoByDid(i.Db, ref) 127 } 128 return db.GetRepoByAtUri(i.Db, ref) 129} 130 131func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 132 var legacy struct { 133 Subject *string `json:"subject"` 134 SubjectDid *string `json:"subjectDid"` 135 } 136 if err := json.Unmarshal(raw, &legacy); err != nil { 137 return false, err 138 } 139 140 switch { 141 case legacy.SubjectDid != nil: 142 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 143 if err != nil { 144 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 145 return false, nil 146 } 147 star.SubjectType = models.StarSubjectRepo 148 star.Subject = repo.RepoDid 149 return true, nil 150 151 case legacy.Subject != nil: 152 uri, err := syntax.ParseATURI(*legacy.Subject) 153 if err != nil { 154 return false, fmt.Errorf("invalid old-format star subject: %w", err) 155 } 156 switch uri.Collection().String() { 157 case tangled.RepoNSID: 158 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 159 if err != nil { 160 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 161 return false, nil 162 } 163 star.SubjectType = models.StarSubjectRepo 164 star.Subject = repo.RepoDid 165 return true, nil 166 default: 167 star.SubjectType = models.StarSubjectString 168 star.Subject = *legacy.Subject 169 return true, nil 170 } 171 172 default: 173 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 174 } 175} 176 177func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 178 var err error 179 did := e.Did 180 181 l := i.Logger.With("handler", "ingestStar") 182 l = l.With("nsid", e.Commit.Collection) 183 184 switch e.Commit.Operation { 185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 186 raw := json.RawMessage(e.Commit.Record) 187 record := tangled.FeedStar{} 188 unmarshalErr := json.Unmarshal(raw, &record) 189 190 star := &models.Star{ 191 Did: did, 192 Rkey: e.Commit.RKey, 193 } 194 195 switch { 196 case unmarshalErr != nil: 197 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 198 if resolveErr != nil { 199 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 200 return unmarshalErr 201 } 202 if !resolved { 203 return nil 204 } 205 206 case record.Subject == nil: 207 return fmt.Errorf("star record has nil subject") 208 209 case record.Subject.FeedStar_Repo != nil: 210 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 211 if repoErr != nil { 212 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 213 return nil 214 } 215 star.SubjectType = models.StarSubjectRepo 216 star.Subject = repo.RepoDid 217 218 case record.Subject.FeedStar_String != nil: 219 star.SubjectType = models.StarSubjectString 220 star.Subject = record.Subject.FeedStar_String.Uri 221 222 default: 223 return fmt.Errorf("star record has empty subject union") 224 } 225 226 err = db.AddStar(i.Db, star) 227 case jmodels.CommitOperationDelete: 228 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 229 } 230 231 if err != nil { 232 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 233 } 234 235 return nil 236} 237 238func (i *Ingester) ingestFollow(e *jmodels.Event) error { 239 var err error 240 did := e.Did 241 242 l := i.Logger.With("handler", "ingestFollow") 243 l = l.With("nsid", e.Commit.Collection) 244 245 switch e.Commit.Operation { 246 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 247 raw := json.RawMessage(e.Commit.Record) 248 record := tangled.GraphFollow{} 249 err = json.Unmarshal(raw, &record) 250 if err != nil { 251 l.Error("invalid record", "err", err) 252 return err 253 } 254 255 err = db.AddFollow(i.Db, &models.Follow{ 256 UserDid: did, 257 SubjectDid: record.Subject, 258 Rkey: e.Commit.RKey, 259 }) 260 case jmodels.CommitOperationDelete: 261 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 262 } 263 264 if err != nil { 265 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 266 } 267 268 return nil 269} 270 271func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 272 var err error 273 did := e.Did 274 275 l := i.Logger.With("handler", "ingestVouch") 276 l = l.With("nsid", e.Commit.Collection) 277 l.Info("ingesting vouch") 278 279 switch e.Commit.Operation { 280 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 281 raw := json.RawMessage(e.Commit.Record) 282 record := tangled.GraphVouch{} 283 err = json.Unmarshal(raw, &record) 284 if err != nil { 285 l.Error("invalid record", "err", err) 286 return err 287 } 288 289 // rkey is the subject_did being vouched for/denounced 290 subjectDID := e.Commit.RKey 291 292 _, err = syntax.ParseDID(subjectDID) 293 if err != nil { 294 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 295 return fmt.Errorf("invalid subject_did: %w", err) 296 } 297 298 if did == subjectDID { 299 l.Warn("attempted self-vouch", "did", did) 300 return fmt.Errorf("cannot vouch for self") 301 } 302 303 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 304 if err != nil { 305 return err 306 } 307 308 if subjectId.Handle.IsInvalidHandle() { 309 return err 310 } 311 312 kind, err := models.ParseVouchKind(record.Kind) 313 if err != nil { 314 l.Error("invalid kind", "kind", kind) 315 return fmt.Errorf("invalid kind: %s", kind) 316 } 317 318 recordCid, err := cid.Parse(e.Commit.CID) 319 if err != nil { 320 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 321 return fmt.Errorf("invalid cid: %w", err) 322 } 323 324 var evidences []syntax.ATURI 325 for _, raw := range record.Evidences { 326 uri, parseErr := syntax.ParseATURI(raw) 327 if parseErr != nil { 328 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 329 continue 330 } 331 evidences = append(evidences, uri) 332 } 333 334 tx, txErr := i.Db.Begin() 335 if txErr != nil { 336 return fmt.Errorf("failed to start transaction: %w", txErr) 337 } 338 339 addErr := db.AddVouch(tx, &models.Vouch{ 340 Did: syntax.DID(did), 341 SubjectDid: subjectId.DID, 342 Cid: recordCid, 343 Kind: kind, 344 Reason: record.Reason, 345 Evidences: evidences, 346 }) 347 if addErr != nil { 348 tx.Rollback() 349 err = addErr 350 } else { 351 err = tx.Commit() 352 } 353 354 case jmodels.CommitOperationDelete: 355 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 356 } 357 358 if err != nil { 359 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 360 } 361 362 return nil 363} 364 365func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 366 did := e.Did 367 var err error 368 369 l := i.Logger.With("handler", "ingestPublicKey") 370 l = l.With("nsid", e.Commit.Collection) 371 372 switch e.Commit.Operation { 373 case jmodels.CommitOperationCreate: 374 l.Debug("processing add of pubkey") 375 raw := json.RawMessage(e.Commit.Record) 376 record := tangled.PublicKey{} 377 err = json.Unmarshal(raw, &record) 378 if err != nil { 379 l.Error("invalid record", "err", err) 380 return err 381 } 382 383 name := record.Name 384 key := record.Key 385 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 386 case jmodels.CommitOperationUpdate: 387 l.Debug("processing update of pubkey") 388 raw := json.RawMessage(e.Commit.Record) 389 record := tangled.PublicKey{} 390 err = json.Unmarshal(raw, &record) 391 if err != nil { 392 l.Error("invalid record", "err", err) 393 return err 394 } 395 396 name := record.Name 397 key := record.Key 398 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 399 case jmodels.CommitOperationDelete: 400 l.Debug("processing delete of pubkey") 401 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 402 } 403 404 if err != nil { 405 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 406 } 407 408 return nil 409} 410 411func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 412 did := e.Did 413 var err error 414 415 l := i.Logger.With("handler", "ingestArtifact") 416 l = l.With("nsid", e.Commit.Collection) 417 418 switch e.Commit.Operation { 419 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 420 raw := json.RawMessage(e.Commit.Record) 421 record := tangled.RepoArtifact{} 422 err = json.Unmarshal(raw, &record) 423 if err != nil { 424 l.Error("invalid record", "err", err) 425 return err 426 } 427 428 var repo *models.Repo 429 if record.RepoDid != nil && *record.RepoDid != "" { 430 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 431 if err != nil && !errors.Is(err, sql.ErrNoRows) { 432 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 433 } 434 } 435 if repo == nil && record.Repo != nil { 436 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 437 if parseErr != nil { 438 return parseErr 439 } 440 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 441 if err != nil { 442 return err 443 } 444 } 445 if repo == nil { 446 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 447 } 448 449 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 450 if err != nil || !ok { 451 return err 452 } 453 454 repoDid := repo.RepoDid 455 if repoDid == "" && record.RepoDid != nil { 456 repoDid = *record.RepoDid 457 } 458 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 459 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 460 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 461 } 462 } 463 464 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 465 if err != nil { 466 createdAt = time.Now() 467 } 468 469 artifact := models.Artifact{ 470 Did: did, 471 Rkey: e.Commit.RKey, 472 RepoDid: syntax.DID(repo.RepoDid), 473 Tag: plumbing.Hash(record.Tag), 474 CreatedAt: createdAt, 475 BlobCid: cid.Cid(record.Artifact.Ref), 476 Name: record.Name, 477 Size: uint64(record.Artifact.Size), 478 MimeType: record.Artifact.MimeType, 479 } 480 481 err = db.AddArtifact(i.Db, artifact) 482 case jmodels.CommitOperationDelete: 483 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 484 } 485 486 if err != nil { 487 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 488 } 489 490 return nil 491} 492 493func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 494 did := e.Did 495 var err error 496 497 l := i.Logger.With("handler", "ingestProfile") 498 l = l.With("nsid", e.Commit.Collection) 499 500 if e.Commit.RKey != "self" { 501 return fmt.Errorf("ingestProfile only ingests `self` record") 502 } 503 504 switch e.Commit.Operation { 505 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 506 raw := json.RawMessage(e.Commit.Record) 507 record := tangled.ActorProfile{} 508 err = json.Unmarshal(raw, &record) 509 if err != nil { 510 l.Error("invalid record", "err", err) 511 return err 512 } 513 514 avatar := "" 515 if record.Avatar != nil { 516 avatar = record.Avatar.Ref.String() 517 } 518 519 description := "" 520 if record.Description != nil { 521 description = *record.Description 522 } 523 524 includeBluesky := record.Bluesky 525 526 pronouns := "" 527 if record.Pronouns != nil { 528 pronouns = *record.Pronouns 529 } 530 531 location := "" 532 if record.Location != nil { 533 location = *record.Location 534 } 535 536 var links [5]string 537 for i, l := range record.Links { 538 if i < 5 { 539 links[i] = l 540 } 541 } 542 543 var stats [2]models.VanityStat 544 for i, s := range record.Stats { 545 if i < 2 { 546 stats[i].Kind = models.ParseVanityStatKind(s) 547 } 548 } 549 550 var pinned [6]string 551 for i, r := range record.PinnedRepositories { 552 if i < 6 { 553 pinned[i] = r 554 } 555 } 556 557 var preferredHandle syntax.Handle 558 if record.PreferredHandle != nil { 559 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 560 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 561 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 562 preferredHandle = h 563 } 564 } 565 } 566 567 profile := models.Profile{ 568 Did: did, 569 Avatar: avatar, 570 Description: description, 571 IncludeBluesky: includeBluesky, 572 Location: location, 573 Links: links, 574 Stats: stats, 575 PinnedRepos: pinned, 576 Pronouns: pronouns, 577 PreferredHandle: preferredHandle, 578 } 579 580 tx, err := i.Db.Begin() 581 if err != nil { 582 return fmt.Errorf("failed to start transaction") 583 } 584 585 err = db.ValidateProfile(tx, &profile) 586 if err != nil { 587 return fmt.Errorf("invalid profile record") 588 } 589 590 err = db.UpsertProfile(tx, &profile) 591 if err == nil && i.Cache != nil { 592 pipe := i.Cache.Pipeline() 593 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 594 if preferredHandle != "" { 595 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 596 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 597 } else { 598 pipe.Del(ctx, didKey) 599 } 600 if _, execErr := pipe.Exec(ctx); execErr != nil { 601 l.Warn("failed to update preferred handle cache", "err", execErr) 602 } 603 } 604 case jmodels.CommitOperationDelete: 605 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 606 } 607 608 if err != nil { 609 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 610 } 611 612 return nil 613} 614 615func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 616 did := e.Did 617 var err error 618 619 l := i.Logger.With("handler", "ingestSpindleMember") 620 l = l.With("nsid", e.Commit.Collection) 621 622 switch e.Commit.Operation { 623 case jmodels.CommitOperationCreate: 624 raw := json.RawMessage(e.Commit.Record) 625 record := tangled.SpindleMember{} 626 err = json.Unmarshal(raw, &record) 627 if err != nil { 628 l.Error("invalid record", "err", err) 629 return err 630 } 631 632 // only spindle owner can invite to spindles 633 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 634 if err != nil || !ok { 635 return fmt.Errorf("failed to enforce permissions: %w", err) 636 } 637 638 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 639 if err != nil { 640 return err 641 } 642 643 if memberId.Handle.IsInvalidHandle() { 644 return err 645 } 646 647 err = db.AddSpindleMember(i.Db, models.SpindleMember{ 648 Did: syntax.DID(did), 649 Rkey: e.Commit.RKey, 650 Instance: record.Instance, 651 Subject: memberId.DID, 652 }) 653 if !ok { 654 return fmt.Errorf("failed to add to db: %w", err) 655 } 656 657 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 658 if err != nil { 659 return fmt.Errorf("failed to update ACLs: %w", err) 660 } 661 662 l.Info("added spindle member") 663 case jmodels.CommitOperationDelete: 664 rkey := e.Commit.RKey 665 666 // get record from db first 667 members, err := db.GetSpindleMembers( 668 i.Db, 669 orm.FilterEq("did", did), 670 orm.FilterEq("rkey", rkey), 671 ) 672 if err != nil || len(members) != 1 { 673 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 674 } 675 member := members[0] 676 677 tx, err := i.Db.Begin() 678 if err != nil { 679 return fmt.Errorf("failed to start txn: %w", err) 680 } 681 682 // remove record by rkey && update enforcer 683 if err = db.RemoveSpindleMember( 684 tx, 685 orm.FilterEq("did", did), 686 orm.FilterEq("rkey", rkey), 687 ); err != nil { 688 return fmt.Errorf("failed to remove from db: %w", err) 689 } 690 691 // update enforcer 692 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 693 if err != nil { 694 return fmt.Errorf("failed to update ACLs: %w", err) 695 } 696 697 if err = tx.Commit(); err != nil { 698 return fmt.Errorf("failed to commit txn: %w", err) 699 } 700 701 if err = i.Enforcer.E.SavePolicy(); err != nil { 702 return fmt.Errorf("failed to save ACLs: %w", err) 703 } 704 705 l.Info("removed spindle member") 706 } 707 708 return nil 709} 710 711func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 712 did := e.Did 713 var err error 714 715 l := i.Logger.With("handler", "ingestSpindle") 716 l = l.With("nsid", e.Commit.Collection) 717 718 switch e.Commit.Operation { 719 case jmodels.CommitOperationCreate: 720 raw := json.RawMessage(e.Commit.Record) 721 record := tangled.Spindle{} 722 err = json.Unmarshal(raw, &record) 723 if err != nil { 724 l.Error("invalid record", "err", err) 725 return err 726 } 727 728 instance := e.Commit.RKey 729 730 err := db.AddSpindle(i.Db, models.Spindle{ 731 Owner: syntax.DID(did), 732 Instance: instance, 733 }) 734 if err != nil { 735 l.Error("failed to add spindle to db", "err", err, "instance", instance) 736 return err 737 } 738 739 err = retry.Do( 740 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 741 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 742 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 743 ) 744 if err != nil { 745 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 746 return err 747 } 748 749 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 750 if err != nil { 751 return fmt.Errorf("failed to mark verified: %w", err) 752 } 753 754 return nil 755 756 case jmodels.CommitOperationDelete: 757 instance := e.Commit.RKey 758 759 // get record from db first 760 spindles, err := db.GetSpindles( 761 ctx, 762 i.Db, 763 orm.FilterEq("owner", did), 764 orm.FilterEq("instance", instance), 765 ) 766 if err != nil || len(spindles) != 1 { 767 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 768 } 769 spindle := spindles[0] 770 771 tx, err := i.Db.Begin() 772 if err != nil { 773 return err 774 } 775 defer func() { 776 tx.Rollback() 777 i.Enforcer.E.LoadPolicy() 778 }() 779 780 // remove spindle members first 781 err = db.RemoveSpindleMember( 782 tx, 783 orm.FilterEq("owner", did), 784 orm.FilterEq("instance", instance), 785 ) 786 if err != nil { 787 return err 788 } 789 790 err = db.DeleteSpindle( 791 tx, 792 orm.FilterEq("owner", did), 793 orm.FilterEq("instance", instance), 794 ) 795 if err != nil { 796 return err 797 } 798 799 if spindle.Verified != nil { 800 err = i.Enforcer.RemoveSpindle(instance) 801 if err != nil { 802 return err 803 } 804 } 805 806 err = tx.Commit() 807 if err != nil { 808 return err 809 } 810 811 err = i.Enforcer.E.SavePolicy() 812 if err != nil { 813 return err 814 } 815 } 816 817 return nil 818} 819 820func (i *Ingester) ingestString(e *jmodels.Event) error { 821 did := e.Did 822 rkey := e.Commit.RKey 823 824 var err error 825 826 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 827 l.Info("ingesting record") 828 829 switch e.Commit.Operation { 830 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 831 raw := json.RawMessage(e.Commit.Record) 832 record := tangled.String{} 833 err = json.Unmarshal(raw, &record) 834 if err != nil { 835 l.Error("invalid record", "err", err) 836 return err 837 } 838 839 string := models.StringFromRecord(did, rkey, record) 840 841 if err = i.Validator.ValidateString(&string); err != nil { 842 l.Error("invalid record", "err", err) 843 return err 844 } 845 846 if err = db.AddString(i.Db, string); err != nil { 847 l.Error("failed to add string", "err", err) 848 return err 849 } 850 851 return nil 852 853 case jmodels.CommitOperationDelete: 854 if err := db.DeleteString( 855 i.Db, 856 orm.FilterEq("did", did), 857 orm.FilterEq("rkey", rkey), 858 ); err != nil { 859 l.Error("failed to delete", "err", err) 860 return fmt.Errorf("failed to delete string record: %w", err) 861 } 862 863 return nil 864 } 865 866 return nil 867} 868 869func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 870 did := e.Did 871 var err error 872 873 l := i.Logger.With("handler", "ingestKnotMember") 874 l = l.With("nsid", e.Commit.Collection) 875 876 switch e.Commit.Operation { 877 case jmodels.CommitOperationCreate: 878 raw := json.RawMessage(e.Commit.Record) 879 record := tangled.KnotMember{} 880 err = json.Unmarshal(raw, &record) 881 if err != nil { 882 l.Error("invalid record", "err", err) 883 return err 884 } 885 886 // only knot owner can invite to knots 887 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 888 if err != nil || !ok { 889 return fmt.Errorf("failed to enforce permissions: %w", err) 890 } 891 892 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 893 if err != nil { 894 return err 895 } 896 897 if memberId.Handle.IsInvalidHandle() { 898 return err 899 } 900 901 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 902 if err != nil { 903 return fmt.Errorf("failed to update ACLs: %w", err) 904 } 905 906 l.Info("added knot member") 907 case jmodels.CommitOperationDelete: 908 // we don't store knot members in a table (like we do for spindle) 909 // and we can't remove this just yet. possibly fixed if we switch 910 // to either: 911 // 1. a knot_members table like with spindle and store the rkey 912 // 2. use the knot host as the rkey 913 // 914 // TODO: implement member deletion 915 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 916 } 917 918 return nil 919} 920 921func (i *Ingester) ingestKnot(e *jmodels.Event) error { 922 did := e.Did 923 var err error 924 925 l := i.Logger.With("handler", "ingestKnot") 926 l = l.With("nsid", e.Commit.Collection) 927 928 switch e.Commit.Operation { 929 case jmodels.CommitOperationCreate: 930 raw := json.RawMessage(e.Commit.Record) 931 record := tangled.Knot{} 932 err = json.Unmarshal(raw, &record) 933 if err != nil { 934 l.Error("invalid record", "err", err) 935 return err 936 } 937 938 domain := e.Commit.RKey 939 940 err := db.AddKnot(i.Db, domain, did) 941 if err != nil { 942 l.Error("failed to add knot to db", "err", err, "domain", domain) 943 return err 944 } 945 946 err = retry.Do( 947 func() error { 948 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 949 }, 950 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 951 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 952 ) 953 if err != nil { 954 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 955 return err 956 } 957 958 err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 959 if err != nil { 960 return fmt.Errorf("failed to mark verified: %w", err) 961 } 962 963 return nil 964 965 case jmodels.CommitOperationDelete: 966 domain := e.Commit.RKey 967 968 // get record from db first 969 registrations, err := db.GetRegistrations( 970 i.Db, 971 orm.FilterEq("domain", domain), 972 orm.FilterEq("did", did), 973 ) 974 if err != nil { 975 return fmt.Errorf("failed to get registration: %w", err) 976 } 977 if len(registrations) != 1 { 978 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 979 } 980 registration := registrations[0] 981 982 tx, err := i.Db.Begin() 983 if err != nil { 984 return err 985 } 986 defer func() { 987 tx.Rollback() 988 i.Enforcer.E.LoadPolicy() 989 }() 990 991 err = db.DeleteKnot( 992 tx, 993 orm.FilterEq("did", did), 994 orm.FilterEq("domain", domain), 995 ) 996 if err != nil { 997 return err 998 } 999 1000 if registration.Registered != nil { 1001 err = i.Enforcer.RemoveKnot(domain) 1002 if err != nil { 1003 return err 1004 } 1005 } 1006 1007 err = tx.Commit() 1008 if err != nil { 1009 return err 1010 } 1011 1012 err = i.Enforcer.E.SavePolicy() 1013 if err != nil { 1014 return err 1015 } 1016 } 1017 1018 return nil 1019} 1020func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1021 did := e.Did 1022 rkey := e.Commit.RKey 1023 1024 var err error 1025 1026 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1027 l.Info("ingesting record") 1028 1029 switch e.Commit.Operation { 1030 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1031 raw := json.RawMessage(e.Commit.Record) 1032 record := tangled.RepoIssue{} 1033 err = json.Unmarshal(raw, &record) 1034 if err != nil { 1035 l.Error("invalid record", "err", err) 1036 return err 1037 } 1038 1039 issue := models.IssueFromRecord(did, rkey, record) 1040 1041 if issue.RepoDid == "" { 1042 return fmt.Errorf("issue record has no repo field") 1043 } 1044 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1045 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1046 } 1047 1048 if err := i.Validator.ValidateIssue(&issue); err != nil { 1049 return fmt.Errorf("failed to validate issue: %w", err) 1050 } 1051 1052 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1053 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1054 if repoErr == nil && repo.RepoDid != "" { 1055 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1056 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1057 } 1058 } 1059 } 1060 1061 tx, err := i.Db.BeginTx(ctx, nil) 1062 if err != nil { 1063 l.Error("failed to begin transaction", "err", err) 1064 return err 1065 } 1066 defer tx.Rollback() 1067 1068 err = db.PutIssue(tx, &issue) 1069 if err != nil { 1070 l.Error("failed to create issue", "err", err) 1071 return err 1072 } 1073 1074 err = tx.Commit() 1075 if err != nil { 1076 l.Error("failed to commit txn", "err", err) 1077 return err 1078 } 1079 1080 return nil 1081 1082 case jmodels.CommitOperationDelete: 1083 tx, err := i.Db.BeginTx(ctx, nil) 1084 if err != nil { 1085 l.Error("failed to begin transaction", "err", err) 1086 return err 1087 } 1088 defer tx.Rollback() 1089 1090 if err := db.DeleteIssues( 1091 tx, 1092 did, 1093 rkey, 1094 ); err != nil { 1095 l.Error("failed to delete", "err", err) 1096 return fmt.Errorf("failed to delete issue record: %w", err) 1097 } 1098 if err := tx.Commit(); err != nil { 1099 l.Error("failed to commit txn", "err", err) 1100 return err 1101 } 1102 1103 return nil 1104 } 1105 1106 return nil 1107} 1108 1109func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1110 did := e.Did 1111 rkey := e.Commit.RKey 1112 1113 var err error 1114 1115 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1116 l.Info("ingesting record") 1117 1118 switch e.Commit.Operation { 1119 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1120 raw := json.RawMessage(e.Commit.Record) 1121 record := tangled.RepoPull{} 1122 err = json.Unmarshal(raw, &record) 1123 if err != nil { 1124 l.Error("invalid record", "err", err) 1125 return err 1126 } 1127 1128 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1129 if err != nil { 1130 l.Error("failed to resolve did") 1131 return err 1132 } 1133 1134 // go through and fetch all blobs in parallel 1135 readers := make([]*io.ReadCloser, len(record.Rounds)) 1136 var mu sync.Mutex 1137 1138 g, gctx := errgroup.WithContext(ctx) 1139 1140 for idx, b := range record.Rounds { 1141 g.Go(func() error { 1142 // for some reason, a blob is empty 1143 if b.PatchBlob == nil { 1144 return fmt.Errorf("missing patchBlob in round %d", idx) 1145 } 1146 1147 ownerPds := ownerId.PDSEndpoint() 1148 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1149 q := url.Query() 1150 q.Set("cid", b.PatchBlob.Ref.String()) 1151 q.Set("did", did) 1152 url.RawQuery = q.Encode() 1153 1154 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1155 if err != nil { 1156 l.Error("failed to create request") 1157 return err 1158 } 1159 req.Header.Set("Content-Type", "application/json") 1160 1161 resp, err := http.DefaultClient.Do(req) 1162 if err != nil { 1163 l.Error("failed to make request") 1164 return err 1165 } 1166 1167 mu.Lock() 1168 readers[idx] = &resp.Body 1169 mu.Unlock() 1170 1171 return nil 1172 }) 1173 } 1174 1175 if err := g.Wait(); err != nil { 1176 for _, r := range readers { 1177 if r != nil && *r != nil { 1178 (*r).Close() 1179 } 1180 } 1181 return err 1182 } 1183 1184 defer func() { 1185 for _, r := range readers { 1186 if r != nil && *r != nil { 1187 (*r).Close() 1188 } 1189 } 1190 }() 1191 1192 pull, err := models.PullFromRecord(did, rkey, record, readers) 1193 if err != nil { 1194 return fmt.Errorf("failed to parse pull from record: %w", err) 1195 } 1196 if err := i.Validator.ValidatePull(pull); err != nil { 1197 return fmt.Errorf("failed to validate pull: %w", err) 1198 } 1199 1200 tx, err := i.Db.BeginTx(ctx, nil) 1201 if err != nil { 1202 l.Error("failed to begin transaction", "err", err) 1203 return err 1204 } 1205 defer tx.Rollback() 1206 1207 err = db.PutPull(tx, pull) 1208 if err != nil { 1209 l.Error("failed to create pull", "err", err) 1210 return err 1211 } 1212 1213 err = tx.Commit() 1214 if err != nil { 1215 l.Error("failed to commit txn", "err", err) 1216 return err 1217 } 1218 1219 return nil 1220 1221 case jmodels.CommitOperationDelete: 1222 tx, err := i.Db.BeginTx(ctx, nil) 1223 if err != nil { 1224 l.Error("failed to begin transaction", "err", err) 1225 return err 1226 } 1227 defer tx.Rollback() 1228 1229 if err := db.AbandonPulls( 1230 tx, 1231 orm.FilterEq("owner_did", did), 1232 orm.FilterEq("rkey", rkey), 1233 ); err != nil { 1234 l.Error("failed to abandon", "err", err) 1235 return fmt.Errorf("failed to abandon pull record: %w", err) 1236 } 1237 if err := tx.Commit(); err != nil { 1238 l.Error("failed to commit txn", "err", err) 1239 return err 1240 } 1241 1242 return nil 1243 } 1244 1245 return nil 1246} 1247 1248func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1249 did := e.Did 1250 rkey := e.Commit.RKey 1251 1252 var err error 1253 1254 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1255 l.Info("ingesting record") 1256 1257 switch e.Commit.Operation { 1258 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1259 raw := json.RawMessage(e.Commit.Record) 1260 record := tangled.RepoIssueComment{} 1261 err = json.Unmarshal(raw, &record) 1262 if err != nil { 1263 return fmt.Errorf("invalid record: %w", err) 1264 } 1265 1266 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1267 if err != nil { 1268 return fmt.Errorf("failed to parse comment from record: %w", err) 1269 } 1270 1271 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1272 return fmt.Errorf("failed to validate comment: %w", err) 1273 } 1274 1275 tx, err := i.Db.Begin() 1276 if err != nil { 1277 return fmt.Errorf("failed to start transaction: %w", err) 1278 } 1279 defer tx.Rollback() 1280 1281 _, err = db.AddIssueComment(tx, *comment) 1282 if err != nil { 1283 return fmt.Errorf("failed to create issue comment: %w", err) 1284 } 1285 1286 return tx.Commit() 1287 1288 case jmodels.CommitOperationDelete: 1289 if err := db.DeleteIssueComments( 1290 i.Db, 1291 orm.FilterEq("did", did), 1292 orm.FilterEq("rkey", rkey), 1293 ); err != nil { 1294 return fmt.Errorf("failed to delete issue comment record: %w", err) 1295 } 1296 1297 return nil 1298 } 1299 1300 return nil 1301} 1302 1303func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1304 did := e.Did 1305 rkey := e.Commit.RKey 1306 1307 var err error 1308 1309 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1310 l.Info("ingesting record") 1311 1312 switch e.Commit.Operation { 1313 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1314 raw := json.RawMessage(e.Commit.Record) 1315 record := tangled.LabelDefinition{} 1316 err = json.Unmarshal(raw, &record) 1317 if err != nil { 1318 return fmt.Errorf("invalid record: %w", err) 1319 } 1320 1321 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1322 if err != nil { 1323 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1324 } 1325 1326 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1327 return fmt.Errorf("failed to validate labeldef: %w", err) 1328 } 1329 1330 _, err = db.AddLabelDefinition(i.Db, def) 1331 if err != nil { 1332 return fmt.Errorf("failed to create labeldef: %w", err) 1333 } 1334 1335 return nil 1336 1337 case jmodels.CommitOperationDelete: 1338 if err := db.DeleteLabelDefinition( 1339 i.Db, 1340 orm.FilterEq("did", did), 1341 orm.FilterEq("rkey", rkey), 1342 ); err != nil { 1343 return fmt.Errorf("failed to delete labeldef record: %w", err) 1344 } 1345 1346 return nil 1347 } 1348 1349 return nil 1350} 1351 1352func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1353 did := e.Did 1354 rkey := e.Commit.RKey 1355 1356 var err error 1357 1358 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1359 l.Info("ingesting record") 1360 1361 switch e.Commit.Operation { 1362 case jmodels.CommitOperationCreate: 1363 raw := json.RawMessage(e.Commit.Record) 1364 record := tangled.LabelOp{} 1365 err = json.Unmarshal(raw, &record) 1366 if err != nil { 1367 return fmt.Errorf("invalid record: %w", err) 1368 } 1369 1370 subject := syntax.ATURI(record.Subject) 1371 collection := subject.Collection() 1372 1373 var repo *models.Repo 1374 switch collection { 1375 case tangled.RepoIssueNSID: 1376 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1377 if err != nil || len(i) != 1 { 1378 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1379 } 1380 repo = i[0].Repo 1381 default: 1382 return fmt.Errorf("unsupported label subject: %s", collection) 1383 } 1384 1385 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1386 if err != nil { 1387 return fmt.Errorf("failed to build label application ctx: %w", err) 1388 } 1389 1390 ops := models.LabelOpsFromRecord(did, rkey, record) 1391 1392 for _, o := range ops { 1393 def, ok := actx.Defs[o.OperandKey] 1394 if !ok { 1395 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1396 } 1397 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1398 return fmt.Errorf("failed to validate labelop: %w", err) 1399 } 1400 } 1401 1402 tx, err := i.Db.Begin() 1403 if err != nil { 1404 return err 1405 } 1406 defer tx.Rollback() 1407 1408 for _, o := range ops { 1409 _, err = db.AddLabelOp(tx, &o) 1410 if err != nil { 1411 return fmt.Errorf("failed to add labelop: %w", err) 1412 } 1413 } 1414 1415 if err = tx.Commit(); err != nil { 1416 return err 1417 } 1418 } 1419 1420 return nil 1421}