Monorepo for Tangled tangled.org
11

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "sync" 16 17 "time" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/ipfs/go-cid" 24 "golang.org/x/sync/errgroup" 25 "tangled.org/core/api/tangled" 26 "tangled.org/core/appview/cache" 27 "tangled.org/core/appview/config" 28 "tangled.org/core/appview/db" 29 "tangled.org/core/appview/models" 30 "tangled.org/core/appview/serververify" 31 "tangled.org/core/appview/validator" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35) 36 37type Ingester struct { 38 Db db.DbWrapper 39 Enforcer *rbac.Enforcer 40 IdResolver *idresolver.Resolver 41 Cache *cache.Cache 42 Config *config.Config 43 Logger *slog.Logger 44 Validator *validator.Validator 45} 46 47type processFunc func(ctx context.Context, e *jmodels.Event) error 48 49func (i *Ingester) Ingest() processFunc { 50 return func(ctx context.Context, e *jmodels.Event) error { 51 var err error 52 53 l := i.Logger.With("kind", e.Kind) 54 switch e.Kind { 55 case jmodels.EventKindAccount: 56 // TODO: sync account state to db 57 if e.Account.Active { 58 break 59 } 60 // TODO: revoke sessions by DID 61 if *e.Account.Status == "deactivated" { 62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 63 } 64 case jmodels.EventKindIdentity: 65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 66 case jmodels.EventKindCommit: 67 switch e.Commit.Collection { 68 case tangled.GraphFollowNSID: 69 err = i.ingestFollow(e) 70 case tangled.GraphVouchNSID: 71 err = i.ingestVouch(ctx, e) 72 case tangled.FeedStarNSID: 73 err = i.ingestStar(ctx, e) 74 case tangled.PublicKeyNSID: 75 err = i.ingestPublicKey(e) 76 case tangled.RepoArtifactNSID: 77 err = i.ingestArtifact(ctx, e) 78 case tangled.ActorProfileNSID: 79 err = i.ingestProfile(ctx, e) 80 case tangled.SpindleMemberNSID: 81 err = i.ingestSpindleMember(ctx, e) 82 case tangled.SpindleNSID: 83 err = i.ingestSpindle(ctx, e) 84 case tangled.KnotMemberNSID: 85 err = i.ingestKnotMember(e) 86 case tangled.KnotNSID: 87 err = i.ingestKnot(e) 88 case tangled.StringNSID: 89 err = i.ingestString(e) 90 case tangled.RepoIssueNSID: 91 err = i.ingestIssue(ctx, e) 92 case tangled.RepoPullNSID: 93 err = i.ingestPull(ctx, e) 94 case tangled.RepoIssueCommentNSID: 95 err = i.ingestIssueComment(e) 96 case tangled.LabelDefinitionNSID: 97 err = i.ingestLabelDefinition(e) 98 case tangled.LabelOpNSID: 99 err = i.ingestLabelOp(e) 100 } 101 l = i.Logger.With("nsid", e.Commit.Collection) 102 } 103 104 if err != nil { 105 l.Warn("failed to ingest record, skipping", "err", err) 106 } 107 108 lastTimeUs := e.TimeUS + 1 109 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 110 l.Error("failed to save cursor", "err", saveErr) 111 } 112 113 return nil 114 } 115} 116 117func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 118 var err error 119 did := e.Did 120 121 l := i.Logger.With("handler", "ingestStar") 122 l = l.With("nsid", e.Commit.Collection) 123 124 switch e.Commit.Operation { 125 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 126 var subjectUri syntax.ATURI 127 128 raw := json.RawMessage(e.Commit.Record) 129 record := tangled.FeedStar{} 130 err := json.Unmarshal(raw, &record) 131 if err != nil { 132 l.Error("invalid record", "err", err) 133 return err 134 } 135 136 star := &models.Star{ 137 Did: did, 138 Rkey: e.Commit.RKey, 139 } 140 141 switch { 142 case record.SubjectDid != nil: 143 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 144 if repoErr == nil { 145 subjectUri = repo.RepoAt() 146 star.RepoAt = subjectUri 147 } 148 case record.Subject != nil: 149 subjectUri, err = syntax.ParseATURI(*record.Subject) 150 if err != nil { 151 l.Error("invalid record", "err", err) 152 return err 153 } 154 star.RepoAt = subjectUri 155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 156 if repoErr == nil && repo.RepoDid != "" { 157 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.FeedStarNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 159 } 160 } 161 default: 162 l.Error("star record has neither subject nor subjectDid") 163 return fmt.Errorf("star record has neither subject nor subjectDid") 164 } 165 err = db.AddStar(i.Db, star) 166 case jmodels.CommitOperationDelete: 167 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 168 } 169 170 if err != nil { 171 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 172 } 173 174 return nil 175} 176 177func (i *Ingester) ingestFollow(e *jmodels.Event) error { 178 var err error 179 did := e.Did 180 181 l := i.Logger.With("handler", "ingestFollow") 182 l = l.With("nsid", e.Commit.Collection) 183 184 switch e.Commit.Operation { 185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 186 raw := json.RawMessage(e.Commit.Record) 187 record := tangled.GraphFollow{} 188 err = json.Unmarshal(raw, &record) 189 if err != nil { 190 l.Error("invalid record", "err", err) 191 return err 192 } 193 194 err = db.AddFollow(i.Db, &models.Follow{ 195 UserDid: did, 196 SubjectDid: record.Subject, 197 Rkey: e.Commit.RKey, 198 }) 199 case jmodels.CommitOperationDelete: 200 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 201 } 202 203 if err != nil { 204 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 205 } 206 207 return nil 208} 209 210func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 211 var err error 212 did := e.Did 213 214 l := i.Logger.With("handler", "ingestVouch") 215 l = l.With("nsid", e.Commit.Collection) 216 l.Info("ingesting vouch") 217 218 switch e.Commit.Operation { 219 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 220 raw := json.RawMessage(e.Commit.Record) 221 record := tangled.GraphVouch{} 222 err = json.Unmarshal(raw, &record) 223 if err != nil { 224 l.Error("invalid record", "err", err) 225 return err 226 } 227 228 // rkey is the subject_did being vouched for/denounced 229 subjectDID := e.Commit.RKey 230 231 _, err = syntax.ParseDID(subjectDID) 232 if err != nil { 233 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 234 return fmt.Errorf("invalid subject_did: %w", err) 235 } 236 237 if did == subjectDID { 238 l.Warn("attempted self-vouch", "did", did) 239 return fmt.Errorf("cannot vouch for self") 240 } 241 242 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 243 if err != nil { 244 return err 245 } 246 247 if subjectId.Handle.IsInvalidHandle() { 248 return err 249 } 250 251 kind, err := models.ParseVouchKind(record.Kind) 252 if err != nil { 253 l.Error("invalid kind", "kind", kind) 254 return fmt.Errorf("invalid kind: %s", kind) 255 } 256 257 recordCid, err := cid.Parse(e.Commit.CID) 258 if err != nil { 259 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 260 return fmt.Errorf("invalid cid: %w", err) 261 } 262 263 var evidences []syntax.ATURI 264 for _, raw := range record.Evidences { 265 uri, parseErr := syntax.ParseATURI(raw) 266 if parseErr != nil { 267 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 268 continue 269 } 270 evidences = append(evidences, uri) 271 } 272 273 ddb, ok := i.Db.Execer.(*db.DB) 274 if !ok { 275 return fmt.Errorf("failed to ingest vouch record, invalid db cast") 276 } 277 278 tx, txErr := ddb.Begin() 279 if txErr != nil { 280 return fmt.Errorf("failed to start transaction: %w", txErr) 281 } 282 283 addErr := db.AddVouch(tx, &models.Vouch{ 284 Did: syntax.DID(did), 285 SubjectDid: subjectId.DID, 286 Cid: recordCid, 287 Kind: kind, 288 Reason: record.Reason, 289 Evidences: evidences, 290 }) 291 if addErr != nil { 292 tx.Rollback() 293 err = addErr 294 } else { 295 err = tx.Commit() 296 } 297 298 case jmodels.CommitOperationDelete: 299 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 300 } 301 302 if err != nil { 303 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 304 } 305 306 return nil 307} 308 309func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 310 did := e.Did 311 var err error 312 313 l := i.Logger.With("handler", "ingestPublicKey") 314 l = l.With("nsid", e.Commit.Collection) 315 316 switch e.Commit.Operation { 317 case jmodels.CommitOperationCreate: 318 l.Debug("processing add of pubkey") 319 raw := json.RawMessage(e.Commit.Record) 320 record := tangled.PublicKey{} 321 err = json.Unmarshal(raw, &record) 322 if err != nil { 323 l.Error("invalid record", "err", err) 324 return err 325 } 326 327 name := record.Name 328 key := record.Key 329 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 330 case jmodels.CommitOperationUpdate: 331 l.Debug("processing update of pubkey") 332 raw := json.RawMessage(e.Commit.Record) 333 record := tangled.PublicKey{} 334 err = json.Unmarshal(raw, &record) 335 if err != nil { 336 l.Error("invalid record", "err", err) 337 return err 338 } 339 340 name := record.Name 341 key := record.Key 342 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 343 case jmodels.CommitOperationDelete: 344 l.Debug("processing delete of pubkey") 345 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 346 } 347 348 if err != nil { 349 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 350 } 351 352 return nil 353} 354 355func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 356 did := e.Did 357 var err error 358 359 l := i.Logger.With("handler", "ingestArtifact") 360 l = l.With("nsid", e.Commit.Collection) 361 362 switch e.Commit.Operation { 363 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 364 raw := json.RawMessage(e.Commit.Record) 365 record := tangled.RepoArtifact{} 366 err = json.Unmarshal(raw, &record) 367 if err != nil { 368 l.Error("invalid record", "err", err) 369 return err 370 } 371 372 var repo *models.Repo 373 if record.RepoDid != nil && *record.RepoDid != "" { 374 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 375 if err != nil && !errors.Is(err, sql.ErrNoRows) { 376 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 377 } 378 } 379 if repo == nil && record.Repo != nil { 380 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 381 if parseErr != nil { 382 return parseErr 383 } 384 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 385 if err != nil { 386 return err 387 } 388 } 389 if repo == nil { 390 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 391 } 392 393 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 394 if err != nil || !ok { 395 return err 396 } 397 398 repoDid := repo.RepoDid 399 if repoDid == "" && record.RepoDid != nil { 400 repoDid = *record.RepoDid 401 } 402 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 403 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 404 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 405 } 406 } 407 408 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 409 if err != nil { 410 createdAt = time.Now() 411 } 412 413 artifact := models.Artifact{ 414 Did: did, 415 Rkey: e.Commit.RKey, 416 RepoAt: repo.RepoAt(), 417 Tag: plumbing.Hash(record.Tag), 418 CreatedAt: createdAt, 419 BlobCid: cid.Cid(record.Artifact.Ref), 420 Name: record.Name, 421 Size: uint64(record.Artifact.Size), 422 MimeType: record.Artifact.MimeType, 423 } 424 425 err = db.AddArtifact(i.Db, artifact) 426 case jmodels.CommitOperationDelete: 427 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 428 } 429 430 if err != nil { 431 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 432 } 433 434 return nil 435} 436 437func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 438 did := e.Did 439 var err error 440 441 l := i.Logger.With("handler", "ingestProfile") 442 l = l.With("nsid", e.Commit.Collection) 443 444 if e.Commit.RKey != "self" { 445 return fmt.Errorf("ingestProfile only ingests `self` record") 446 } 447 448 switch e.Commit.Operation { 449 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 450 raw := json.RawMessage(e.Commit.Record) 451 record := tangled.ActorProfile{} 452 err = json.Unmarshal(raw, &record) 453 if err != nil { 454 l.Error("invalid record", "err", err) 455 return err 456 } 457 458 avatar := "" 459 if record.Avatar != nil { 460 avatar = record.Avatar.Ref.String() 461 } 462 463 description := "" 464 if record.Description != nil { 465 description = *record.Description 466 } 467 468 includeBluesky := record.Bluesky 469 470 pronouns := "" 471 if record.Pronouns != nil { 472 pronouns = *record.Pronouns 473 } 474 475 location := "" 476 if record.Location != nil { 477 location = *record.Location 478 } 479 480 var links [5]string 481 for i, l := range record.Links { 482 if i < 5 { 483 links[i] = l 484 } 485 } 486 487 var stats [2]models.VanityStat 488 for i, s := range record.Stats { 489 if i < 2 { 490 stats[i].Kind = models.ParseVanityStatKind(s) 491 } 492 } 493 494 var pinned [6]string 495 for i, r := range record.PinnedRepositories { 496 if i < 6 { 497 pinned[i] = r 498 } 499 } 500 501 var preferredHandle syntax.Handle 502 if record.PreferredHandle != nil { 503 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 504 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 505 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 506 preferredHandle = h 507 } 508 } 509 } 510 511 profile := models.Profile{ 512 Did: did, 513 Avatar: avatar, 514 Description: description, 515 IncludeBluesky: includeBluesky, 516 Location: location, 517 Links: links, 518 Stats: stats, 519 PinnedRepos: pinned, 520 Pronouns: pronouns, 521 PreferredHandle: preferredHandle, 522 } 523 524 ddb, ok := i.Db.Execer.(*db.DB) 525 if !ok { 526 return fmt.Errorf("failed to index profile record, invalid db cast") 527 } 528 529 tx, err := ddb.Begin() 530 if err != nil { 531 return fmt.Errorf("failed to start transaction") 532 } 533 534 err = db.ValidateProfile(tx, &profile) 535 if err != nil { 536 return fmt.Errorf("invalid profile record") 537 } 538 539 err = db.UpsertProfile(tx, &profile) 540 if err == nil && i.Cache != nil { 541 pipe := i.Cache.Pipeline() 542 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 543 if preferredHandle != "" { 544 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 545 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 546 } else { 547 pipe.Del(ctx, didKey) 548 } 549 if _, execErr := pipe.Exec(ctx); execErr != nil { 550 l.Warn("failed to update preferred handle cache", "err", execErr) 551 } 552 } 553 case jmodels.CommitOperationDelete: 554 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 555 } 556 557 if err != nil { 558 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 559 } 560 561 return nil 562} 563 564func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 565 did := e.Did 566 var err error 567 568 l := i.Logger.With("handler", "ingestSpindleMember") 569 l = l.With("nsid", e.Commit.Collection) 570 571 switch e.Commit.Operation { 572 case jmodels.CommitOperationCreate: 573 raw := json.RawMessage(e.Commit.Record) 574 record := tangled.SpindleMember{} 575 err = json.Unmarshal(raw, &record) 576 if err != nil { 577 l.Error("invalid record", "err", err) 578 return err 579 } 580 581 // only spindle owner can invite to spindles 582 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 583 if err != nil || !ok { 584 return fmt.Errorf("failed to enforce permissions: %w", err) 585 } 586 587 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 588 if err != nil { 589 return err 590 } 591 592 if memberId.Handle.IsInvalidHandle() { 593 return err 594 } 595 596 ddb, ok := i.Db.Execer.(*db.DB) 597 if !ok { 598 return fmt.Errorf("invalid db cast") 599 } 600 601 err = db.AddSpindleMember(ddb, models.SpindleMember{ 602 Did: syntax.DID(did), 603 Rkey: e.Commit.RKey, 604 Instance: record.Instance, 605 Subject: memberId.DID, 606 }) 607 if !ok { 608 return fmt.Errorf("failed to add to db: %w", err) 609 } 610 611 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 612 if err != nil { 613 return fmt.Errorf("failed to update ACLs: %w", err) 614 } 615 616 l.Info("added spindle member") 617 case jmodels.CommitOperationDelete: 618 rkey := e.Commit.RKey 619 620 ddb, ok := i.Db.Execer.(*db.DB) 621 if !ok { 622 return fmt.Errorf("failed to index profile record, invalid db cast") 623 } 624 625 // get record from db first 626 members, err := db.GetSpindleMembers( 627 ddb, 628 orm.FilterEq("did", did), 629 orm.FilterEq("rkey", rkey), 630 ) 631 if err != nil || len(members) != 1 { 632 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 633 } 634 member := members[0] 635 636 tx, err := ddb.Begin() 637 if err != nil { 638 return fmt.Errorf("failed to start txn: %w", err) 639 } 640 641 // remove record by rkey && update enforcer 642 if err = db.RemoveSpindleMember( 643 tx, 644 orm.FilterEq("did", did), 645 orm.FilterEq("rkey", rkey), 646 ); err != nil { 647 return fmt.Errorf("failed to remove from db: %w", err) 648 } 649 650 // update enforcer 651 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 652 if err != nil { 653 return fmt.Errorf("failed to update ACLs: %w", err) 654 } 655 656 if err = tx.Commit(); err != nil { 657 return fmt.Errorf("failed to commit txn: %w", err) 658 } 659 660 if err = i.Enforcer.E.SavePolicy(); err != nil { 661 return fmt.Errorf("failed to save ACLs: %w", err) 662 } 663 664 l.Info("removed spindle member") 665 } 666 667 return nil 668} 669 670func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 671 did := e.Did 672 var err error 673 674 l := i.Logger.With("handler", "ingestSpindle") 675 l = l.With("nsid", e.Commit.Collection) 676 677 switch e.Commit.Operation { 678 case jmodels.CommitOperationCreate: 679 raw := json.RawMessage(e.Commit.Record) 680 record := tangled.Spindle{} 681 err = json.Unmarshal(raw, &record) 682 if err != nil { 683 l.Error("invalid record", "err", err) 684 return err 685 } 686 687 instance := e.Commit.RKey 688 689 ddb, ok := i.Db.Execer.(*db.DB) 690 if !ok { 691 return fmt.Errorf("failed to index profile record, invalid db cast") 692 } 693 694 err := db.AddSpindle(ddb, models.Spindle{ 695 Owner: syntax.DID(did), 696 Instance: instance, 697 }) 698 if err != nil { 699 l.Error("failed to add spindle to db", "err", err, "instance", instance) 700 return err 701 } 702 703 err = retry.Do( 704 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 705 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 706 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 707 ) 708 if err != nil { 709 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 710 return err 711 } 712 713 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 714 if err != nil { 715 return fmt.Errorf("failed to mark verified: %w", err) 716 } 717 718 return nil 719 720 case jmodels.CommitOperationDelete: 721 instance := e.Commit.RKey 722 723 ddb, ok := i.Db.Execer.(*db.DB) 724 if !ok { 725 return fmt.Errorf("failed to index profile record, invalid db cast") 726 } 727 728 // get record from db first 729 spindles, err := db.GetSpindles( 730 ctx, 731 ddb, 732 orm.FilterEq("owner", did), 733 orm.FilterEq("instance", instance), 734 ) 735 if err != nil || len(spindles) != 1 { 736 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 737 } 738 spindle := spindles[0] 739 740 tx, err := ddb.Begin() 741 if err != nil { 742 return err 743 } 744 defer func() { 745 tx.Rollback() 746 i.Enforcer.E.LoadPolicy() 747 }() 748 749 // remove spindle members first 750 err = db.RemoveSpindleMember( 751 tx, 752 orm.FilterEq("owner", did), 753 orm.FilterEq("instance", instance), 754 ) 755 if err != nil { 756 return err 757 } 758 759 err = db.DeleteSpindle( 760 tx, 761 orm.FilterEq("owner", did), 762 orm.FilterEq("instance", instance), 763 ) 764 if err != nil { 765 return err 766 } 767 768 if spindle.Verified != nil { 769 err = i.Enforcer.RemoveSpindle(instance) 770 if err != nil { 771 return err 772 } 773 } 774 775 err = tx.Commit() 776 if err != nil { 777 return err 778 } 779 780 err = i.Enforcer.E.SavePolicy() 781 if err != nil { 782 return err 783 } 784 } 785 786 return nil 787} 788 789func (i *Ingester) ingestString(e *jmodels.Event) error { 790 did := e.Did 791 rkey := e.Commit.RKey 792 793 var err error 794 795 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 796 l.Info("ingesting record") 797 798 ddb, ok := i.Db.Execer.(*db.DB) 799 if !ok { 800 return fmt.Errorf("failed to index string record, invalid db cast") 801 } 802 803 switch e.Commit.Operation { 804 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 805 raw := json.RawMessage(e.Commit.Record) 806 record := tangled.String{} 807 err = json.Unmarshal(raw, &record) 808 if err != nil { 809 l.Error("invalid record", "err", err) 810 return err 811 } 812 813 string := models.StringFromRecord(did, rkey, record) 814 815 if err = i.Validator.ValidateString(&string); err != nil { 816 l.Error("invalid record", "err", err) 817 return err 818 } 819 820 if err = db.AddString(ddb, string); err != nil { 821 l.Error("failed to add string", "err", err) 822 return err 823 } 824 825 return nil 826 827 case jmodels.CommitOperationDelete: 828 if err := db.DeleteString( 829 ddb, 830 orm.FilterEq("did", did), 831 orm.FilterEq("rkey", rkey), 832 ); err != nil { 833 l.Error("failed to delete", "err", err) 834 return fmt.Errorf("failed to delete string record: %w", err) 835 } 836 837 return nil 838 } 839 840 return nil 841} 842 843func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 844 did := e.Did 845 var err error 846 847 l := i.Logger.With("handler", "ingestKnotMember") 848 l = l.With("nsid", e.Commit.Collection) 849 850 switch e.Commit.Operation { 851 case jmodels.CommitOperationCreate: 852 raw := json.RawMessage(e.Commit.Record) 853 record := tangled.KnotMember{} 854 err = json.Unmarshal(raw, &record) 855 if err != nil { 856 l.Error("invalid record", "err", err) 857 return err 858 } 859 860 // only knot owner can invite to knots 861 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 862 if err != nil || !ok { 863 return fmt.Errorf("failed to enforce permissions: %w", err) 864 } 865 866 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 867 if err != nil { 868 return err 869 } 870 871 if memberId.Handle.IsInvalidHandle() { 872 return err 873 } 874 875 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 876 if err != nil { 877 return fmt.Errorf("failed to update ACLs: %w", err) 878 } 879 880 l.Info("added knot member") 881 case jmodels.CommitOperationDelete: 882 // we don't store knot members in a table (like we do for spindle) 883 // and we can't remove this just yet. possibly fixed if we switch 884 // to either: 885 // 1. a knot_members table like with spindle and store the rkey 886 // 2. use the knot host as the rkey 887 // 888 // TODO: implement member deletion 889 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 890 } 891 892 return nil 893} 894 895func (i *Ingester) ingestKnot(e *jmodels.Event) error { 896 did := e.Did 897 var err error 898 899 l := i.Logger.With("handler", "ingestKnot") 900 l = l.With("nsid", e.Commit.Collection) 901 902 switch e.Commit.Operation { 903 case jmodels.CommitOperationCreate: 904 raw := json.RawMessage(e.Commit.Record) 905 record := tangled.Knot{} 906 err = json.Unmarshal(raw, &record) 907 if err != nil { 908 l.Error("invalid record", "err", err) 909 return err 910 } 911 912 domain := e.Commit.RKey 913 914 ddb, ok := i.Db.Execer.(*db.DB) 915 if !ok { 916 return fmt.Errorf("failed to index profile record, invalid db cast") 917 } 918 919 err := db.AddKnot(ddb, domain, did) 920 if err != nil { 921 l.Error("failed to add knot to db", "err", err, "domain", domain) 922 return err 923 } 924 925 err = retry.Do( 926 func() error { 927 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 928 }, 929 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 930 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 931 ) 932 if err != nil { 933 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 934 return err 935 } 936 937 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 938 if err != nil { 939 return fmt.Errorf("failed to mark verified: %w", err) 940 } 941 942 return nil 943 944 case jmodels.CommitOperationDelete: 945 domain := e.Commit.RKey 946 947 ddb, ok := i.Db.Execer.(*db.DB) 948 if !ok { 949 return fmt.Errorf("failed to index knot record, invalid db cast") 950 } 951 952 // get record from db first 953 registrations, err := db.GetRegistrations( 954 ddb, 955 orm.FilterEq("domain", domain), 956 orm.FilterEq("did", did), 957 ) 958 if err != nil { 959 return fmt.Errorf("failed to get registration: %w", err) 960 } 961 if len(registrations) != 1 { 962 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 963 } 964 registration := registrations[0] 965 966 tx, err := ddb.Begin() 967 if err != nil { 968 return err 969 } 970 defer func() { 971 tx.Rollback() 972 i.Enforcer.E.LoadPolicy() 973 }() 974 975 err = db.DeleteKnot( 976 tx, 977 orm.FilterEq("did", did), 978 orm.FilterEq("domain", domain), 979 ) 980 if err != nil { 981 return err 982 } 983 984 if registration.Registered != nil { 985 err = i.Enforcer.RemoveKnot(domain) 986 if err != nil { 987 return err 988 } 989 } 990 991 err = tx.Commit() 992 if err != nil { 993 return err 994 } 995 996 err = i.Enforcer.E.SavePolicy() 997 if err != nil { 998 return err 999 } 1000 } 1001 1002 return nil 1003} 1004func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1005 did := e.Did 1006 rkey := e.Commit.RKey 1007 1008 var err error 1009 1010 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1011 l.Info("ingesting record") 1012 1013 ddb, ok := i.Db.Execer.(*db.DB) 1014 if !ok { 1015 return fmt.Errorf("failed to index issue record, invalid db cast") 1016 } 1017 1018 switch e.Commit.Operation { 1019 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1020 raw := json.RawMessage(e.Commit.Record) 1021 record := tangled.RepoIssue{} 1022 err = json.Unmarshal(raw, &record) 1023 if err != nil { 1024 l.Error("invalid record", "err", err) 1025 return err 1026 } 1027 1028 issue := models.IssueFromRecord(did, rkey, record) 1029 1030 if issue.RepoAt == "" { 1031 return fmt.Errorf("issue record has no repo field") 1032 } 1033 1034 if err := i.Validator.ValidateIssue(&issue); err != nil { 1035 return fmt.Errorf("failed to validate issue: %w", err) 1036 } 1037 1038 if record.Repo != nil { 1039 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 1040 if repoErr == nil && repo.RepoDid != "" { 1041 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1042 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1043 } 1044 } 1045 } 1046 1047 tx, err := ddb.BeginTx(ctx, nil) 1048 if err != nil { 1049 l.Error("failed to begin transaction", "err", err) 1050 return err 1051 } 1052 defer tx.Rollback() 1053 1054 err = db.PutIssue(tx, &issue) 1055 if err != nil { 1056 l.Error("failed to create issue", "err", err) 1057 return err 1058 } 1059 1060 err = tx.Commit() 1061 if err != nil { 1062 l.Error("failed to commit txn", "err", err) 1063 return err 1064 } 1065 1066 return nil 1067 1068 case jmodels.CommitOperationDelete: 1069 tx, err := ddb.BeginTx(ctx, nil) 1070 if err != nil { 1071 l.Error("failed to begin transaction", "err", err) 1072 return err 1073 } 1074 defer tx.Rollback() 1075 1076 if err := db.DeleteIssues( 1077 tx, 1078 did, 1079 rkey, 1080 ); err != nil { 1081 l.Error("failed to delete", "err", err) 1082 return fmt.Errorf("failed to delete issue record: %w", err) 1083 } 1084 if err := tx.Commit(); err != nil { 1085 l.Error("failed to commit txn", "err", err) 1086 return err 1087 } 1088 1089 return nil 1090 } 1091 1092 return nil 1093} 1094 1095func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1096 did := e.Did 1097 rkey := e.Commit.RKey 1098 1099 var err error 1100 1101 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1102 l.Info("ingesting record") 1103 1104 ddb, ok := i.Db.Execer.(*db.DB) 1105 if !ok { 1106 return fmt.Errorf("failed to index pull record, invalid db cast") 1107 } 1108 1109 switch e.Commit.Operation { 1110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1111 raw := json.RawMessage(e.Commit.Record) 1112 record := tangled.RepoPull{} 1113 err = json.Unmarshal(raw, &record) 1114 if err != nil { 1115 l.Error("invalid record", "err", err) 1116 return err 1117 } 1118 1119 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1120 if err != nil { 1121 l.Error("failed to resolve did") 1122 return err 1123 } 1124 1125 // go through and fetch all blobs in parallel 1126 readers := make([]*io.ReadCloser, len(record.Rounds)) 1127 var mu sync.Mutex 1128 1129 g, gctx := errgroup.WithContext(ctx) 1130 1131 for idx, b := range record.Rounds { 1132 g.Go(func() error { 1133 // for some reason, a blob is empty 1134 if b.PatchBlob == nil { 1135 return fmt.Errorf("missing patchBlob in round %d", idx) 1136 } 1137 1138 ownerPds := ownerId.PDSEndpoint() 1139 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1140 q := url.Query() 1141 q.Set("cid", b.PatchBlob.Ref.String()) 1142 q.Set("did", did) 1143 url.RawQuery = q.Encode() 1144 1145 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1146 if err != nil { 1147 l.Error("failed to create request") 1148 return err 1149 } 1150 req.Header.Set("Content-Type", "application/json") 1151 1152 resp, err := http.DefaultClient.Do(req) 1153 if err != nil { 1154 l.Error("failed to make request") 1155 return err 1156 } 1157 1158 mu.Lock() 1159 readers[idx] = &resp.Body 1160 mu.Unlock() 1161 1162 return nil 1163 }) 1164 } 1165 1166 if err := g.Wait(); err != nil { 1167 for _, r := range readers { 1168 if r != nil && *r != nil { 1169 (*r).Close() 1170 } 1171 } 1172 return err 1173 } 1174 1175 defer func() { 1176 for _, r := range readers { 1177 if r != nil && *r != nil { 1178 (*r).Close() 1179 } 1180 } 1181 }() 1182 1183 pull, err := models.PullFromRecord(did, rkey, record, readers) 1184 if err != nil { 1185 return fmt.Errorf("failed to parse pull from record: %w", err) 1186 } 1187 if err := i.Validator.ValidatePull(pull); err != nil { 1188 return fmt.Errorf("failed to validate pull: %w", err) 1189 } 1190 1191 tx, err := ddb.BeginTx(ctx, nil) 1192 if err != nil { 1193 l.Error("failed to begin transaction", "err", err) 1194 return err 1195 } 1196 defer tx.Rollback() 1197 1198 err = db.PutPull(tx, pull) 1199 if err != nil { 1200 l.Error("failed to create pull", "err", err) 1201 return err 1202 } 1203 1204 err = tx.Commit() 1205 if err != nil { 1206 l.Error("failed to commit txn", "err", err) 1207 return err 1208 } 1209 1210 return nil 1211 1212 case jmodels.CommitOperationDelete: 1213 tx, err := ddb.BeginTx(ctx, nil) 1214 if err != nil { 1215 l.Error("failed to begin transaction", "err", err) 1216 return err 1217 } 1218 defer tx.Rollback() 1219 1220 if err := db.AbandonPulls( 1221 tx, 1222 orm.FilterEq("owner_did", did), 1223 orm.FilterEq("rkey", rkey), 1224 ); err != nil { 1225 l.Error("failed to abandon", "err", err) 1226 return fmt.Errorf("failed to abandon pull record: %w", err) 1227 } 1228 if err := tx.Commit(); err != nil { 1229 l.Error("failed to commit txn", "err", err) 1230 return err 1231 } 1232 1233 return nil 1234 } 1235 1236 return nil 1237} 1238 1239func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1240 did := e.Did 1241 rkey := e.Commit.RKey 1242 1243 var err error 1244 1245 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1246 l.Info("ingesting record") 1247 1248 ddb, ok := i.Db.Execer.(*db.DB) 1249 if !ok { 1250 return fmt.Errorf("failed to index issue comment record, invalid db cast") 1251 } 1252 1253 switch e.Commit.Operation { 1254 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1255 raw := json.RawMessage(e.Commit.Record) 1256 record := tangled.RepoIssueComment{} 1257 err = json.Unmarshal(raw, &record) 1258 if err != nil { 1259 return fmt.Errorf("invalid record: %w", err) 1260 } 1261 1262 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1263 if err != nil { 1264 return fmt.Errorf("failed to parse comment from record: %w", err) 1265 } 1266 1267 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1268 return fmt.Errorf("failed to validate comment: %w", err) 1269 } 1270 1271 tx, err := ddb.Begin() 1272 if err != nil { 1273 return fmt.Errorf("failed to start transaction: %w", err) 1274 } 1275 defer tx.Rollback() 1276 1277 _, err = db.AddIssueComment(tx, *comment) 1278 if err != nil { 1279 return fmt.Errorf("failed to create issue comment: %w", err) 1280 } 1281 1282 return tx.Commit() 1283 1284 case jmodels.CommitOperationDelete: 1285 if err := db.DeleteIssueComments( 1286 ddb, 1287 orm.FilterEq("did", did), 1288 orm.FilterEq("rkey", rkey), 1289 ); err != nil { 1290 return fmt.Errorf("failed to delete issue comment record: %w", err) 1291 } 1292 1293 return nil 1294 } 1295 1296 return nil 1297} 1298 1299func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1300 did := e.Did 1301 rkey := e.Commit.RKey 1302 1303 var err error 1304 1305 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1306 l.Info("ingesting record") 1307 1308 ddb, ok := i.Db.Execer.(*db.DB) 1309 if !ok { 1310 return fmt.Errorf("failed to index label definition, invalid db cast") 1311 } 1312 1313 switch e.Commit.Operation { 1314 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1315 raw := json.RawMessage(e.Commit.Record) 1316 record := tangled.LabelDefinition{} 1317 err = json.Unmarshal(raw, &record) 1318 if err != nil { 1319 return fmt.Errorf("invalid record: %w", err) 1320 } 1321 1322 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1323 if err != nil { 1324 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1325 } 1326 1327 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1328 return fmt.Errorf("failed to validate labeldef: %w", err) 1329 } 1330 1331 _, err = db.AddLabelDefinition(ddb, def) 1332 if err != nil { 1333 return fmt.Errorf("failed to create labeldef: %w", err) 1334 } 1335 1336 return nil 1337 1338 case jmodels.CommitOperationDelete: 1339 if err := db.DeleteLabelDefinition( 1340 ddb, 1341 orm.FilterEq("did", did), 1342 orm.FilterEq("rkey", rkey), 1343 ); err != nil { 1344 return fmt.Errorf("failed to delete labeldef record: %w", err) 1345 } 1346 1347 return nil 1348 } 1349 1350 return nil 1351} 1352 1353func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1354 did := e.Did 1355 rkey := e.Commit.RKey 1356 1357 var err error 1358 1359 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1360 l.Info("ingesting record") 1361 1362 ddb, ok := i.Db.Execer.(*db.DB) 1363 if !ok { 1364 return fmt.Errorf("failed to index label op, invalid db cast") 1365 } 1366 1367 switch e.Commit.Operation { 1368 case jmodels.CommitOperationCreate: 1369 raw := json.RawMessage(e.Commit.Record) 1370 record := tangled.LabelOp{} 1371 err = json.Unmarshal(raw, &record) 1372 if err != nil { 1373 return fmt.Errorf("invalid record: %w", err) 1374 } 1375 1376 subject := syntax.ATURI(record.Subject) 1377 collection := subject.Collection() 1378 1379 var repo *models.Repo 1380 switch collection { 1381 case tangled.RepoIssueNSID: 1382 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1383 if err != nil || len(i) != 1 { 1384 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1385 } 1386 repo = i[0].Repo 1387 default: 1388 return fmt.Errorf("unsupported label subject: %s", collection) 1389 } 1390 1391 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1392 if err != nil { 1393 return fmt.Errorf("failed to build label application ctx: %w", err) 1394 } 1395 1396 ops := models.LabelOpsFromRecord(did, rkey, record) 1397 1398 for _, o := range ops { 1399 def, ok := actx.Defs[o.OperandKey] 1400 if !ok { 1401 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1402 } 1403 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1404 return fmt.Errorf("failed to validate labelop: %w", err) 1405 } 1406 } 1407 1408 tx, err := ddb.Begin() 1409 if err != nil { 1410 return err 1411 } 1412 defer tx.Rollback() 1413 1414 for _, o := range ops { 1415 _, err = db.AddLabelOp(tx, &o) 1416 if err != nil { 1417 return fmt.Errorf("failed to add labelop: %w", err) 1418 } 1419 } 1420 1421 if err = tx.Commit(); err != nil { 1422 return err 1423 } 1424 } 1425 1426 return nil 1427}