Monorepo for Tangled tangled.org
12

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "sync" 16 17 "time" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 jmodels "github.com/bluesky-social/jetstream/pkg/models" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/ipfs/go-cid" 24 "golang.org/x/sync/errgroup" 25 "tangled.org/core/api/tangled" 26 "tangled.org/core/appview/cache" 27 "tangled.org/core/appview/config" 28 "tangled.org/core/appview/db" 29 "tangled.org/core/appview/models" 30 "tangled.org/core/appview/serververify" 31 "tangled.org/core/appview/validator" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35) 36 37type Ingester struct { 38 Db *db.DB 39 Enforcer *rbac.Enforcer 40 IdResolver *idresolver.Resolver 41 Cache *cache.Cache 42 Config *config.Config 43 Logger *slog.Logger 44 Validator *validator.Validator 45} 46 47type processFunc func(ctx context.Context, e *jmodels.Event) error 48 49func (i *Ingester) Ingest() processFunc { 50 return func(ctx context.Context, e *jmodels.Event) error { 51 var err error 52 53 l := i.Logger.With("kind", e.Kind) 54 switch e.Kind { 55 case jmodels.EventKindAccount: 56 // TODO: sync account state to db 57 if e.Account.Active { 58 break 59 } 60 // TODO: revoke sessions by DID 61 if *e.Account.Status == "deactivated" { 62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 63 } 64 case jmodels.EventKindIdentity: 65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 66 case jmodels.EventKindCommit: 67 switch e.Commit.Collection { 68 case tangled.GraphFollowNSID: 69 err = i.ingestFollow(e) 70 case tangled.GraphVouchNSID: 71 err = i.ingestVouch(ctx, e) 72 case tangled.FeedStarNSID: 73 err = i.ingestStar(ctx, e) 74 case tangled.PublicKeyNSID: 75 err = i.ingestPublicKey(e) 76 case tangled.RepoArtifactNSID: 77 err = i.ingestArtifact(ctx, e) 78 case tangled.ActorProfileNSID: 79 err = i.ingestProfile(ctx, e) 80 case tangled.SpindleMemberNSID: 81 err = i.ingestSpindleMember(ctx, e) 82 case tangled.SpindleNSID: 83 err = i.ingestSpindle(ctx, e) 84 case tangled.KnotMemberNSID: 85 err = i.ingestKnotMember(e) 86 case tangled.KnotNSID: 87 err = i.ingestKnot(e) 88 case tangled.StringNSID: 89 err = i.ingestString(e) 90 case tangled.RepoIssueNSID: 91 err = i.ingestIssue(ctx, e) 92 case tangled.RepoPullNSID: 93 err = i.ingestPull(ctx, e) 94 case tangled.RepoIssueCommentNSID: 95 err = i.ingestIssueComment(e) 96 case tangled.LabelDefinitionNSID: 97 err = i.ingestLabelDefinition(e) 98 case tangled.LabelOpNSID: 99 err = i.ingestLabelOp(e) 100 } 101 l = i.Logger.With("nsid", e.Commit.Collection) 102 } 103 104 if err != nil { 105 l.Warn("failed to ingest record, skipping", "err", err) 106 } 107 108 lastTimeUs := e.TimeUS + 1 109 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 110 l.Error("failed to save cursor", "err", saveErr) 111 } 112 113 return nil 114 } 115} 116 117func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 118 var err error 119 did := e.Did 120 121 l := i.Logger.With("handler", "ingestStar") 122 l = l.With("nsid", e.Commit.Collection) 123 124 switch e.Commit.Operation { 125 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 126 var subjectUri syntax.ATURI 127 128 raw := json.RawMessage(e.Commit.Record) 129 record := tangled.FeedStar{} 130 err := json.Unmarshal(raw, &record) 131 if err != nil { 132 l.Error("invalid record", "err", err) 133 return err 134 } 135 136 star := &models.Star{ 137 Did: did, 138 Rkey: e.Commit.RKey, 139 } 140 141 switch { 142 case record.SubjectDid != nil: 143 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 144 if repoErr == nil { 145 subjectUri = repo.RepoAt() 146 star.RepoAt = subjectUri 147 } 148 case record.Subject != nil: 149 subjectUri, err = syntax.ParseATURI(*record.Subject) 150 if err != nil { 151 l.Error("invalid record", "err", err) 152 return err 153 } 154 star.RepoAt = subjectUri 155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 156 if repoErr == nil && repo.RepoDid != "" { 157 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.FeedStarNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 159 } 160 } 161 default: 162 l.Error("star record has neither subject nor subjectDid") 163 return fmt.Errorf("star record has neither subject nor subjectDid") 164 } 165 err = db.AddStar(i.Db, star) 166 case jmodels.CommitOperationDelete: 167 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 168 } 169 170 if err != nil { 171 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 172 } 173 174 return nil 175} 176 177func (i *Ingester) ingestFollow(e *jmodels.Event) error { 178 var err error 179 did := e.Did 180 181 l := i.Logger.With("handler", "ingestFollow") 182 l = l.With("nsid", e.Commit.Collection) 183 184 switch e.Commit.Operation { 185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 186 raw := json.RawMessage(e.Commit.Record) 187 record := tangled.GraphFollow{} 188 err = json.Unmarshal(raw, &record) 189 if err != nil { 190 l.Error("invalid record", "err", err) 191 return err 192 } 193 194 err = db.AddFollow(i.Db, &models.Follow{ 195 UserDid: did, 196 SubjectDid: record.Subject, 197 Rkey: e.Commit.RKey, 198 }) 199 case jmodels.CommitOperationDelete: 200 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 201 } 202 203 if err != nil { 204 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 205 } 206 207 return nil 208} 209 210func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 211 var err error 212 did := e.Did 213 214 l := i.Logger.With("handler", "ingestVouch") 215 l = l.With("nsid", e.Commit.Collection) 216 l.Info("ingesting vouch") 217 218 switch e.Commit.Operation { 219 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 220 raw := json.RawMessage(e.Commit.Record) 221 record := tangled.GraphVouch{} 222 err = json.Unmarshal(raw, &record) 223 if err != nil { 224 l.Error("invalid record", "err", err) 225 return err 226 } 227 228 // rkey is the subject_did being vouched for/denounced 229 subjectDID := e.Commit.RKey 230 231 _, err = syntax.ParseDID(subjectDID) 232 if err != nil { 233 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 234 return fmt.Errorf("invalid subject_did: %w", err) 235 } 236 237 if did == subjectDID { 238 l.Warn("attempted self-vouch", "did", did) 239 return fmt.Errorf("cannot vouch for self") 240 } 241 242 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 243 if err != nil { 244 return err 245 } 246 247 if subjectId.Handle.IsInvalidHandle() { 248 return err 249 } 250 251 kind, err := models.ParseVouchKind(record.Kind) 252 if err != nil { 253 l.Error("invalid kind", "kind", kind) 254 return fmt.Errorf("invalid kind: %s", kind) 255 } 256 257 recordCid, err := cid.Parse(e.Commit.CID) 258 if err != nil { 259 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 260 return fmt.Errorf("invalid cid: %w", err) 261 } 262 263 var evidences []syntax.ATURI 264 for _, raw := range record.Evidences { 265 uri, parseErr := syntax.ParseATURI(raw) 266 if parseErr != nil { 267 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 268 continue 269 } 270 evidences = append(evidences, uri) 271 } 272 273 tx, txErr := i.Db.Begin() 274 if txErr != nil { 275 return fmt.Errorf("failed to start transaction: %w", txErr) 276 } 277 278 addErr := db.AddVouch(tx, &models.Vouch{ 279 Did: syntax.DID(did), 280 SubjectDid: subjectId.DID, 281 Cid: recordCid, 282 Kind: kind, 283 Reason: record.Reason, 284 Evidences: evidences, 285 }) 286 if addErr != nil { 287 tx.Rollback() 288 err = addErr 289 } else { 290 err = tx.Commit() 291 } 292 293 case jmodels.CommitOperationDelete: 294 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 295 } 296 297 if err != nil { 298 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 299 } 300 301 return nil 302} 303 304func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 305 did := e.Did 306 var err error 307 308 l := i.Logger.With("handler", "ingestPublicKey") 309 l = l.With("nsid", e.Commit.Collection) 310 311 switch e.Commit.Operation { 312 case jmodels.CommitOperationCreate: 313 l.Debug("processing add of pubkey") 314 raw := json.RawMessage(e.Commit.Record) 315 record := tangled.PublicKey{} 316 err = json.Unmarshal(raw, &record) 317 if err != nil { 318 l.Error("invalid record", "err", err) 319 return err 320 } 321 322 name := record.Name 323 key := record.Key 324 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 325 case jmodels.CommitOperationUpdate: 326 l.Debug("processing update of pubkey") 327 raw := json.RawMessage(e.Commit.Record) 328 record := tangled.PublicKey{} 329 err = json.Unmarshal(raw, &record) 330 if err != nil { 331 l.Error("invalid record", "err", err) 332 return err 333 } 334 335 name := record.Name 336 key := record.Key 337 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 338 case jmodels.CommitOperationDelete: 339 l.Debug("processing delete of pubkey") 340 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 341 } 342 343 if err != nil { 344 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 345 } 346 347 return nil 348} 349 350func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 351 did := e.Did 352 var err error 353 354 l := i.Logger.With("handler", "ingestArtifact") 355 l = l.With("nsid", e.Commit.Collection) 356 357 switch e.Commit.Operation { 358 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 359 raw := json.RawMessage(e.Commit.Record) 360 record := tangled.RepoArtifact{} 361 err = json.Unmarshal(raw, &record) 362 if err != nil { 363 l.Error("invalid record", "err", err) 364 return err 365 } 366 367 var repo *models.Repo 368 if record.RepoDid != nil && *record.RepoDid != "" { 369 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 370 if err != nil && !errors.Is(err, sql.ErrNoRows) { 371 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 372 } 373 } 374 if repo == nil && record.Repo != nil { 375 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 376 if parseErr != nil { 377 return parseErr 378 } 379 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 380 if err != nil { 381 return err 382 } 383 } 384 if repo == nil { 385 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 386 } 387 388 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 389 if err != nil || !ok { 390 return err 391 } 392 393 repoDid := repo.RepoDid 394 if repoDid == "" && record.RepoDid != nil { 395 repoDid = *record.RepoDid 396 } 397 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 398 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 399 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 400 } 401 } 402 403 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 404 if err != nil { 405 createdAt = time.Now() 406 } 407 408 artifact := models.Artifact{ 409 Did: did, 410 Rkey: e.Commit.RKey, 411 RepoAt: repo.RepoAt(), 412 Tag: plumbing.Hash(record.Tag), 413 CreatedAt: createdAt, 414 BlobCid: cid.Cid(record.Artifact.Ref), 415 Name: record.Name, 416 Size: uint64(record.Artifact.Size), 417 MimeType: record.Artifact.MimeType, 418 } 419 420 err = db.AddArtifact(i.Db, artifact) 421 case jmodels.CommitOperationDelete: 422 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 423 } 424 425 if err != nil { 426 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 427 } 428 429 return nil 430} 431 432func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 433 did := e.Did 434 var err error 435 436 l := i.Logger.With("handler", "ingestProfile") 437 l = l.With("nsid", e.Commit.Collection) 438 439 if e.Commit.RKey != "self" { 440 return fmt.Errorf("ingestProfile only ingests `self` record") 441 } 442 443 switch e.Commit.Operation { 444 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 445 raw := json.RawMessage(e.Commit.Record) 446 record := tangled.ActorProfile{} 447 err = json.Unmarshal(raw, &record) 448 if err != nil { 449 l.Error("invalid record", "err", err) 450 return err 451 } 452 453 avatar := "" 454 if record.Avatar != nil { 455 avatar = record.Avatar.Ref.String() 456 } 457 458 description := "" 459 if record.Description != nil { 460 description = *record.Description 461 } 462 463 includeBluesky := record.Bluesky 464 465 pronouns := "" 466 if record.Pronouns != nil { 467 pronouns = *record.Pronouns 468 } 469 470 location := "" 471 if record.Location != nil { 472 location = *record.Location 473 } 474 475 var links [5]string 476 for i, l := range record.Links { 477 if i < 5 { 478 links[i] = l 479 } 480 } 481 482 var stats [2]models.VanityStat 483 for i, s := range record.Stats { 484 if i < 2 { 485 stats[i].Kind = models.ParseVanityStatKind(s) 486 } 487 } 488 489 var pinned [6]string 490 for i, r := range record.PinnedRepositories { 491 if i < 6 { 492 pinned[i] = r 493 } 494 } 495 496 var preferredHandle syntax.Handle 497 if record.PreferredHandle != nil { 498 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 499 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 500 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 501 preferredHandle = h 502 } 503 } 504 } 505 506 profile := models.Profile{ 507 Did: did, 508 Avatar: avatar, 509 Description: description, 510 IncludeBluesky: includeBluesky, 511 Location: location, 512 Links: links, 513 Stats: stats, 514 PinnedRepos: pinned, 515 Pronouns: pronouns, 516 PreferredHandle: preferredHandle, 517 } 518 519 tx, err := i.Db.Begin() 520 if err != nil { 521 return fmt.Errorf("failed to start transaction") 522 } 523 524 err = db.ValidateProfile(tx, &profile) 525 if err != nil { 526 return fmt.Errorf("invalid profile record") 527 } 528 529 err = db.UpsertProfile(tx, &profile) 530 if err == nil && i.Cache != nil { 531 pipe := i.Cache.Pipeline() 532 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 533 if preferredHandle != "" { 534 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 535 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 536 } else { 537 pipe.Del(ctx, didKey) 538 } 539 if _, execErr := pipe.Exec(ctx); execErr != nil { 540 l.Warn("failed to update preferred handle cache", "err", execErr) 541 } 542 } 543 case jmodels.CommitOperationDelete: 544 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 545 } 546 547 if err != nil { 548 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 549 } 550 551 return nil 552} 553 554func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 555 did := e.Did 556 var err error 557 558 l := i.Logger.With("handler", "ingestSpindleMember") 559 l = l.With("nsid", e.Commit.Collection) 560 561 switch e.Commit.Operation { 562 case jmodels.CommitOperationCreate: 563 raw := json.RawMessage(e.Commit.Record) 564 record := tangled.SpindleMember{} 565 err = json.Unmarshal(raw, &record) 566 if err != nil { 567 l.Error("invalid record", "err", err) 568 return err 569 } 570 571 // only spindle owner can invite to spindles 572 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 573 if err != nil || !ok { 574 return fmt.Errorf("failed to enforce permissions: %w", err) 575 } 576 577 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 578 if err != nil { 579 return err 580 } 581 582 if memberId.Handle.IsInvalidHandle() { 583 return err 584 } 585 586 err = db.AddSpindleMember(i.Db, models.SpindleMember{ 587 Did: syntax.DID(did), 588 Rkey: e.Commit.RKey, 589 Instance: record.Instance, 590 Subject: memberId.DID, 591 }) 592 if !ok { 593 return fmt.Errorf("failed to add to db: %w", err) 594 } 595 596 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 597 if err != nil { 598 return fmt.Errorf("failed to update ACLs: %w", err) 599 } 600 601 l.Info("added spindle member") 602 case jmodels.CommitOperationDelete: 603 rkey := e.Commit.RKey 604 605 // get record from db first 606 members, err := db.GetSpindleMembers( 607 i.Db, 608 orm.FilterEq("did", did), 609 orm.FilterEq("rkey", rkey), 610 ) 611 if err != nil || len(members) != 1 { 612 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 613 } 614 member := members[0] 615 616 tx, err := i.Db.Begin() 617 if err != nil { 618 return fmt.Errorf("failed to start txn: %w", err) 619 } 620 621 // remove record by rkey && update enforcer 622 if err = db.RemoveSpindleMember( 623 tx, 624 orm.FilterEq("did", did), 625 orm.FilterEq("rkey", rkey), 626 ); err != nil { 627 return fmt.Errorf("failed to remove from db: %w", err) 628 } 629 630 // update enforcer 631 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 632 if err != nil { 633 return fmt.Errorf("failed to update ACLs: %w", err) 634 } 635 636 if err = tx.Commit(); err != nil { 637 return fmt.Errorf("failed to commit txn: %w", err) 638 } 639 640 if err = i.Enforcer.E.SavePolicy(); err != nil { 641 return fmt.Errorf("failed to save ACLs: %w", err) 642 } 643 644 l.Info("removed spindle member") 645 } 646 647 return nil 648} 649 650func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 651 did := e.Did 652 var err error 653 654 l := i.Logger.With("handler", "ingestSpindle") 655 l = l.With("nsid", e.Commit.Collection) 656 657 switch e.Commit.Operation { 658 case jmodels.CommitOperationCreate: 659 raw := json.RawMessage(e.Commit.Record) 660 record := tangled.Spindle{} 661 err = json.Unmarshal(raw, &record) 662 if err != nil { 663 l.Error("invalid record", "err", err) 664 return err 665 } 666 667 instance := e.Commit.RKey 668 669 err := db.AddSpindle(i.Db, models.Spindle{ 670 Owner: syntax.DID(did), 671 Instance: instance, 672 }) 673 if err != nil { 674 l.Error("failed to add spindle to db", "err", err, "instance", instance) 675 return err 676 } 677 678 err = retry.Do( 679 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 680 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 681 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 682 ) 683 if err != nil { 684 l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 685 return err 686 } 687 688 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 689 if err != nil { 690 return fmt.Errorf("failed to mark verified: %w", err) 691 } 692 693 return nil 694 695 case jmodels.CommitOperationDelete: 696 instance := e.Commit.RKey 697 698 // get record from db first 699 spindles, err := db.GetSpindles( 700 ctx, 701 i.Db, 702 orm.FilterEq("owner", did), 703 orm.FilterEq("instance", instance), 704 ) 705 if err != nil || len(spindles) != 1 { 706 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 707 } 708 spindle := spindles[0] 709 710 tx, err := i.Db.Begin() 711 if err != nil { 712 return err 713 } 714 defer func() { 715 tx.Rollback() 716 i.Enforcer.E.LoadPolicy() 717 }() 718 719 // remove spindle members first 720 err = db.RemoveSpindleMember( 721 tx, 722 orm.FilterEq("owner", did), 723 orm.FilterEq("instance", instance), 724 ) 725 if err != nil { 726 return err 727 } 728 729 err = db.DeleteSpindle( 730 tx, 731 orm.FilterEq("owner", did), 732 orm.FilterEq("instance", instance), 733 ) 734 if err != nil { 735 return err 736 } 737 738 if spindle.Verified != nil { 739 err = i.Enforcer.RemoveSpindle(instance) 740 if err != nil { 741 return err 742 } 743 } 744 745 err = tx.Commit() 746 if err != nil { 747 return err 748 } 749 750 err = i.Enforcer.E.SavePolicy() 751 if err != nil { 752 return err 753 } 754 } 755 756 return nil 757} 758 759func (i *Ingester) ingestString(e *jmodels.Event) error { 760 did := e.Did 761 rkey := e.Commit.RKey 762 763 var err error 764 765 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 766 l.Info("ingesting record") 767 768 switch e.Commit.Operation { 769 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 770 raw := json.RawMessage(e.Commit.Record) 771 record := tangled.String{} 772 err = json.Unmarshal(raw, &record) 773 if err != nil { 774 l.Error("invalid record", "err", err) 775 return err 776 } 777 778 string := models.StringFromRecord(did, rkey, record) 779 780 if err = i.Validator.ValidateString(&string); err != nil { 781 l.Error("invalid record", "err", err) 782 return err 783 } 784 785 if err = db.AddString(i.Db, string); err != nil { 786 l.Error("failed to add string", "err", err) 787 return err 788 } 789 790 return nil 791 792 case jmodels.CommitOperationDelete: 793 if err := db.DeleteString( 794 i.Db, 795 orm.FilterEq("did", did), 796 orm.FilterEq("rkey", rkey), 797 ); err != nil { 798 l.Error("failed to delete", "err", err) 799 return fmt.Errorf("failed to delete string record: %w", err) 800 } 801 802 return nil 803 } 804 805 return nil 806} 807 808func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 809 did := e.Did 810 var err error 811 812 l := i.Logger.With("handler", "ingestKnotMember") 813 l = l.With("nsid", e.Commit.Collection) 814 815 switch e.Commit.Operation { 816 case jmodels.CommitOperationCreate: 817 raw := json.RawMessage(e.Commit.Record) 818 record := tangled.KnotMember{} 819 err = json.Unmarshal(raw, &record) 820 if err != nil { 821 l.Error("invalid record", "err", err) 822 return err 823 } 824 825 // only knot owner can invite to knots 826 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 827 if err != nil || !ok { 828 return fmt.Errorf("failed to enforce permissions: %w", err) 829 } 830 831 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 832 if err != nil { 833 return err 834 } 835 836 if memberId.Handle.IsInvalidHandle() { 837 return err 838 } 839 840 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 841 if err != nil { 842 return fmt.Errorf("failed to update ACLs: %w", err) 843 } 844 845 l.Info("added knot member") 846 case jmodels.CommitOperationDelete: 847 // we don't store knot members in a table (like we do for spindle) 848 // and we can't remove this just yet. possibly fixed if we switch 849 // to either: 850 // 1. a knot_members table like with spindle and store the rkey 851 // 2. use the knot host as the rkey 852 // 853 // TODO: implement member deletion 854 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 855 } 856 857 return nil 858} 859 860func (i *Ingester) ingestKnot(e *jmodels.Event) error { 861 did := e.Did 862 var err error 863 864 l := i.Logger.With("handler", "ingestKnot") 865 l = l.With("nsid", e.Commit.Collection) 866 867 switch e.Commit.Operation { 868 case jmodels.CommitOperationCreate: 869 raw := json.RawMessage(e.Commit.Record) 870 record := tangled.Knot{} 871 err = json.Unmarshal(raw, &record) 872 if err != nil { 873 l.Error("invalid record", "err", err) 874 return err 875 } 876 877 domain := e.Commit.RKey 878 879 err := db.AddKnot(i.Db, domain, did) 880 if err != nil { 881 l.Error("failed to add knot to db", "err", err, "domain", domain) 882 return err 883 } 884 885 err = retry.Do( 886 func() error { 887 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 888 }, 889 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 890 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 891 ) 892 if err != nil { 893 l.Error("failed to verify knot after retries", "err", err, "domain", domain) 894 return err 895 } 896 897 err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 898 if err != nil { 899 return fmt.Errorf("failed to mark verified: %w", err) 900 } 901 902 return nil 903 904 case jmodels.CommitOperationDelete: 905 domain := e.Commit.RKey 906 907 // get record from db first 908 registrations, err := db.GetRegistrations( 909 i.Db, 910 orm.FilterEq("domain", domain), 911 orm.FilterEq("did", did), 912 ) 913 if err != nil { 914 return fmt.Errorf("failed to get registration: %w", err) 915 } 916 if len(registrations) != 1 { 917 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 918 } 919 registration := registrations[0] 920 921 tx, err := i.Db.Begin() 922 if err != nil { 923 return err 924 } 925 defer func() { 926 tx.Rollback() 927 i.Enforcer.E.LoadPolicy() 928 }() 929 930 err = db.DeleteKnot( 931 tx, 932 orm.FilterEq("did", did), 933 orm.FilterEq("domain", domain), 934 ) 935 if err != nil { 936 return err 937 } 938 939 if registration.Registered != nil { 940 err = i.Enforcer.RemoveKnot(domain) 941 if err != nil { 942 return err 943 } 944 } 945 946 err = tx.Commit() 947 if err != nil { 948 return err 949 } 950 951 err = i.Enforcer.E.SavePolicy() 952 if err != nil { 953 return err 954 } 955 } 956 957 return nil 958} 959func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 960 did := e.Did 961 rkey := e.Commit.RKey 962 963 var err error 964 965 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 966 l.Info("ingesting record") 967 968 switch e.Commit.Operation { 969 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 970 raw := json.RawMessage(e.Commit.Record) 971 record := tangled.RepoIssue{} 972 err = json.Unmarshal(raw, &record) 973 if err != nil { 974 l.Error("invalid record", "err", err) 975 return err 976 } 977 978 issue := models.IssueFromRecord(did, rkey, record) 979 980 if issue.RepoAt == "" { 981 return fmt.Errorf("issue record has no repo field") 982 } 983 984 if err := i.Validator.ValidateIssue(&issue); err != nil { 985 return fmt.Errorf("failed to validate issue: %w", err) 986 } 987 988 if record.Repo != nil { 989 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 990 if repoErr == nil && repo.RepoDid != "" { 991 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 992 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 993 } 994 } 995 } 996 997 tx, err := i.Db.BeginTx(ctx, nil) 998 if err != nil { 999 l.Error("failed to begin transaction", "err", err) 1000 return err 1001 } 1002 defer tx.Rollback() 1003 1004 err = db.PutIssue(tx, &issue) 1005 if err != nil { 1006 l.Error("failed to create issue", "err", err) 1007 return err 1008 } 1009 1010 err = tx.Commit() 1011 if err != nil { 1012 l.Error("failed to commit txn", "err", err) 1013 return err 1014 } 1015 1016 return nil 1017 1018 case jmodels.CommitOperationDelete: 1019 tx, err := i.Db.BeginTx(ctx, nil) 1020 if err != nil { 1021 l.Error("failed to begin transaction", "err", err) 1022 return err 1023 } 1024 defer tx.Rollback() 1025 1026 if err := db.DeleteIssues( 1027 tx, 1028 did, 1029 rkey, 1030 ); err != nil { 1031 l.Error("failed to delete", "err", err) 1032 return fmt.Errorf("failed to delete issue record: %w", err) 1033 } 1034 if err := tx.Commit(); err != nil { 1035 l.Error("failed to commit txn", "err", err) 1036 return err 1037 } 1038 1039 return nil 1040 } 1041 1042 return nil 1043} 1044 1045func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1046 did := e.Did 1047 rkey := e.Commit.RKey 1048 1049 var err error 1050 1051 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1052 l.Info("ingesting record") 1053 1054 switch e.Commit.Operation { 1055 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1056 raw := json.RawMessage(e.Commit.Record) 1057 record := tangled.RepoPull{} 1058 err = json.Unmarshal(raw, &record) 1059 if err != nil { 1060 l.Error("invalid record", "err", err) 1061 return err 1062 } 1063 1064 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1065 if err != nil { 1066 l.Error("failed to resolve did") 1067 return err 1068 } 1069 1070 // go through and fetch all blobs in parallel 1071 readers := make([]*io.ReadCloser, len(record.Rounds)) 1072 var mu sync.Mutex 1073 1074 g, gctx := errgroup.WithContext(ctx) 1075 1076 for idx, b := range record.Rounds { 1077 g.Go(func() error { 1078 // for some reason, a blob is empty 1079 if b.PatchBlob == nil { 1080 return fmt.Errorf("missing patchBlob in round %d", idx) 1081 } 1082 1083 ownerPds := ownerId.PDSEndpoint() 1084 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1085 q := url.Query() 1086 q.Set("cid", b.PatchBlob.Ref.String()) 1087 q.Set("did", did) 1088 url.RawQuery = q.Encode() 1089 1090 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1091 if err != nil { 1092 l.Error("failed to create request") 1093 return err 1094 } 1095 req.Header.Set("Content-Type", "application/json") 1096 1097 resp, err := http.DefaultClient.Do(req) 1098 if err != nil { 1099 l.Error("failed to make request") 1100 return err 1101 } 1102 1103 mu.Lock() 1104 readers[idx] = &resp.Body 1105 mu.Unlock() 1106 1107 return nil 1108 }) 1109 } 1110 1111 if err := g.Wait(); err != nil { 1112 for _, r := range readers { 1113 if r != nil && *r != nil { 1114 (*r).Close() 1115 } 1116 } 1117 return err 1118 } 1119 1120 defer func() { 1121 for _, r := range readers { 1122 if r != nil && *r != nil { 1123 (*r).Close() 1124 } 1125 } 1126 }() 1127 1128 pull, err := models.PullFromRecord(did, rkey, record, readers) 1129 if err != nil { 1130 return fmt.Errorf("failed to parse pull from record: %w", err) 1131 } 1132 if err := i.Validator.ValidatePull(pull); err != nil { 1133 return fmt.Errorf("failed to validate pull: %w", err) 1134 } 1135 1136 tx, err := i.Db.BeginTx(ctx, nil) 1137 if err != nil { 1138 l.Error("failed to begin transaction", "err", err) 1139 return err 1140 } 1141 defer tx.Rollback() 1142 1143 err = db.PutPull(tx, pull) 1144 if err != nil { 1145 l.Error("failed to create pull", "err", err) 1146 return err 1147 } 1148 1149 err = tx.Commit() 1150 if err != nil { 1151 l.Error("failed to commit txn", "err", err) 1152 return err 1153 } 1154 1155 return nil 1156 1157 case jmodels.CommitOperationDelete: 1158 tx, err := i.Db.BeginTx(ctx, nil) 1159 if err != nil { 1160 l.Error("failed to begin transaction", "err", err) 1161 return err 1162 } 1163 defer tx.Rollback() 1164 1165 if err := db.AbandonPulls( 1166 tx, 1167 orm.FilterEq("owner_did", did), 1168 orm.FilterEq("rkey", rkey), 1169 ); err != nil { 1170 l.Error("failed to abandon", "err", err) 1171 return fmt.Errorf("failed to abandon pull record: %w", err) 1172 } 1173 if err := tx.Commit(); err != nil { 1174 l.Error("failed to commit txn", "err", err) 1175 return err 1176 } 1177 1178 return nil 1179 } 1180 1181 return nil 1182} 1183 1184func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1185 did := e.Did 1186 rkey := e.Commit.RKey 1187 1188 var err error 1189 1190 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1191 l.Info("ingesting record") 1192 1193 switch e.Commit.Operation { 1194 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1195 raw := json.RawMessage(e.Commit.Record) 1196 record := tangled.RepoIssueComment{} 1197 err = json.Unmarshal(raw, &record) 1198 if err != nil { 1199 return fmt.Errorf("invalid record: %w", err) 1200 } 1201 1202 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1203 if err != nil { 1204 return fmt.Errorf("failed to parse comment from record: %w", err) 1205 } 1206 1207 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1208 return fmt.Errorf("failed to validate comment: %w", err) 1209 } 1210 1211 tx, err := i.Db.Begin() 1212 if err != nil { 1213 return fmt.Errorf("failed to start transaction: %w", err) 1214 } 1215 defer tx.Rollback() 1216 1217 _, err = db.AddIssueComment(tx, *comment) 1218 if err != nil { 1219 return fmt.Errorf("failed to create issue comment: %w", err) 1220 } 1221 1222 return tx.Commit() 1223 1224 case jmodels.CommitOperationDelete: 1225 if err := db.DeleteIssueComments( 1226 i.Db, 1227 orm.FilterEq("did", did), 1228 orm.FilterEq("rkey", rkey), 1229 ); err != nil { 1230 return fmt.Errorf("failed to delete issue comment record: %w", err) 1231 } 1232 1233 return nil 1234 } 1235 1236 return nil 1237} 1238 1239func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1240 did := e.Did 1241 rkey := e.Commit.RKey 1242 1243 var err error 1244 1245 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1246 l.Info("ingesting record") 1247 1248 switch e.Commit.Operation { 1249 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1250 raw := json.RawMessage(e.Commit.Record) 1251 record := tangled.LabelDefinition{} 1252 err = json.Unmarshal(raw, &record) 1253 if err != nil { 1254 return fmt.Errorf("invalid record: %w", err) 1255 } 1256 1257 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1258 if err != nil { 1259 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1260 } 1261 1262 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1263 return fmt.Errorf("failed to validate labeldef: %w", err) 1264 } 1265 1266 _, err = db.AddLabelDefinition(i.Db, def) 1267 if err != nil { 1268 return fmt.Errorf("failed to create labeldef: %w", err) 1269 } 1270 1271 return nil 1272 1273 case jmodels.CommitOperationDelete: 1274 if err := db.DeleteLabelDefinition( 1275 i.Db, 1276 orm.FilterEq("did", did), 1277 orm.FilterEq("rkey", rkey), 1278 ); err != nil { 1279 return fmt.Errorf("failed to delete labeldef record: %w", err) 1280 } 1281 1282 return nil 1283 } 1284 1285 return nil 1286} 1287 1288func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1289 did := e.Did 1290 rkey := e.Commit.RKey 1291 1292 var err error 1293 1294 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1295 l.Info("ingesting record") 1296 1297 switch e.Commit.Operation { 1298 case jmodels.CommitOperationCreate: 1299 raw := json.RawMessage(e.Commit.Record) 1300 record := tangled.LabelOp{} 1301 err = json.Unmarshal(raw, &record) 1302 if err != nil { 1303 return fmt.Errorf("invalid record: %w", err) 1304 } 1305 1306 subject := syntax.ATURI(record.Subject) 1307 collection := subject.Collection() 1308 1309 var repo *models.Repo 1310 switch collection { 1311 case tangled.RepoIssueNSID: 1312 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1313 if err != nil || len(i) != 1 { 1314 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1315 } 1316 repo = i[0].Repo 1317 default: 1318 return fmt.Errorf("unsupported label subject: %s", collection) 1319 } 1320 1321 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1322 if err != nil { 1323 return fmt.Errorf("failed to build label application ctx: %w", err) 1324 } 1325 1326 ops := models.LabelOpsFromRecord(did, rkey, record) 1327 1328 for _, o := range ops { 1329 def, ok := actx.Defs[o.OperandKey] 1330 if !ok { 1331 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1332 } 1333 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1334 return fmt.Errorf("failed to validate labelop: %w", err) 1335 } 1336 } 1337 1338 tx, err := i.Db.Begin() 1339 if err != nil { 1340 return err 1341 } 1342 defer tx.Rollback() 1343 1344 for _, o := range ops { 1345 _, err = db.AddLabelOp(tx, &o) 1346 if err != nil { 1347 return fmt.Errorf("failed to add labelop: %w", err) 1348 } 1349 } 1350 1351 if err = tx.Commit(); err != nil { 1352 return err 1353 } 1354 } 1355 1356 return nil 1357}