Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "log/slog" 11 "maps" 12 "net/http" 13 "net/url" 14 "slices" 15 "strings" 16 "sync" 17 18 "time" 19 20 "github.com/avast/retry-go/v4" 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 jmodels "github.com/bluesky-social/jetstream/pkg/models" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/ipfs/go-cid" 25 "golang.org/x/sync/errgroup" 26 "tangled.org/core/api/tangled" 27 "tangled.org/core/appview/cache" 28 "tangled.org/core/appview/config" 29 "tangled.org/core/appview/db" 30 "tangled.org/core/appview/models" 31 "tangled.org/core/appview/notify" 32 "tangled.org/core/appview/repoverify" 33 "tangled.org/core/appview/serververify" 34 "tangled.org/core/appview/validator" 35 "tangled.org/core/idresolver" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38) 39 40type Ingester struct { 41 Ctx context.Context 42 Db *db.DB 43 Enforcer *rbac.Enforcer 44 IdResolver *idresolver.Resolver 45 Cache *cache.Cache 46 Config *config.Config 47 Logger *slog.Logger 48 Validator *validator.Validator 49 Notifier notify.Notifier 50 Verifier repoverify.Verifier 51} 52 53type processFunc func(ctx context.Context, e *jmodels.Event) error 54 55func (i *Ingester) Ingest() processFunc { 56 return func(ctx context.Context, e *jmodels.Event) error { 57 var err error 58 59 l := i.Logger.With("kind", e.Kind) 60 switch e.Kind { 61 case jmodels.EventKindAccount: 62 // TODO: sync account state to db 63 if e.Account.Active { 64 break 65 } 66 // TODO: revoke sessions by DID 67 if *e.Account.Status == "deactivated" { 68 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 69 } 70 case jmodels.EventKindIdentity: 71 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 72 case jmodels.EventKindCommit: 73 switch e.Commit.Collection { 74 case tangled.GraphFollowNSID: 75 err = i.ingestFollow(e) 76 case tangled.GraphVouchNSID: 77 err = i.ingestVouch(ctx, e) 78 case tangled.FeedStarNSID: 79 err = i.ingestStar(ctx, e) 80 case tangled.PublicKeyNSID: 81 err = i.ingestPublicKey(e) 82 case tangled.RepoArtifactNSID: 83 err = i.ingestArtifact(ctx, e) 84 case tangled.ActorProfileNSID: 85 err = i.ingestProfile(ctx, e) 86 case tangled.SpindleMemberNSID: 87 err = i.ingestSpindleMember(ctx, e) 88 case tangled.SpindleNSID: 89 err = i.ingestSpindle(ctx, e) 90 case tangled.KnotMemberNSID: 91 err = i.ingestKnotMember(ctx, e) 92 case tangled.KnotNSID: 93 err = i.ingestKnot(ctx, e) 94 case tangled.StringNSID: 95 err = i.ingestString(e) 96 case tangled.RepoIssueNSID: 97 err = i.ingestIssue(ctx, e) 98 case tangled.RepoPullNSID: 99 err = i.ingestPull(ctx, e) 100 case tangled.RepoIssueCommentNSID: 101 err = i.ingestIssueComment(e) 102 case tangled.LabelDefinitionNSID: 103 err = i.ingestLabelDefinition(e) 104 case tangled.LabelOpNSID: 105 err = i.ingestLabelOp(e) 106 case tangled.RepoNSID: 107 err = i.ingestRepo(ctx, e) 108 } 109 l = i.Logger.With("nsid", e.Commit.Collection) 110 } 111 112 if err != nil { 113 l.Warn("failed to ingest record, skipping", "err", err) 114 } 115 116 lastTimeUs := e.TimeUS + 1 117 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 118 l.Error("failed to save cursor", "err", saveErr) 119 } 120 121 return nil 122 } 123} 124 125func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 126 if strings.HasPrefix(ref, "did:") { 127 return db.GetRepoByDid(i.Db, ref) 128 } 129 return db.GetRepoByAtUri(i.Db, ref) 130} 131 132func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 133 var legacy struct { 134 Subject *string `json:"subject"` 135 SubjectDid *string `json:"subjectDid"` 136 } 137 if err := json.Unmarshal(raw, &legacy); err != nil { 138 return false, err 139 } 140 141 switch { 142 case legacy.SubjectDid != nil: 143 repo, err := i.resolveRepoRef(*legacy.SubjectDid) 144 if err != nil { 145 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 146 return false, nil 147 } 148 star.SubjectType = models.StarSubjectRepo 149 star.Subject = repo.RepoDid 150 return true, nil 151 152 case legacy.Subject != nil: 153 uri, err := syntax.ParseATURI(*legacy.Subject) 154 if err != nil { 155 return false, fmt.Errorf("invalid old-format star subject: %w", err) 156 } 157 switch uri.Collection().String() { 158 case tangled.RepoNSID: 159 repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 160 if err != nil { 161 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 162 return false, nil 163 } 164 star.SubjectType = models.StarSubjectRepo 165 star.Subject = repo.RepoDid 166 return true, nil 167 default: 168 star.SubjectType = models.StarSubjectString 169 star.Subject = *legacy.Subject 170 return true, nil 171 } 172 173 default: 174 return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 175 } 176} 177 178func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 179 var err error 180 did := e.Did 181 182 l := i.Logger.With("handler", "ingestStar") 183 l = l.With("nsid", e.Commit.Collection) 184 185 switch e.Commit.Operation { 186 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 187 raw := json.RawMessage(e.Commit.Record) 188 record := tangled.FeedStar{} 189 unmarshalErr := json.Unmarshal(raw, &record) 190 191 star := &models.Star{ 192 Did: did, 193 Rkey: e.Commit.RKey, 194 } 195 196 switch { 197 case unmarshalErr != nil: 198 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 199 if resolveErr != nil { 200 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 201 return unmarshalErr 202 } 203 if !resolved { 204 return nil 205 } 206 207 case record.Subject == nil: 208 return fmt.Errorf("star record has nil subject") 209 210 case record.Subject.FeedStar_Repo != nil: 211 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 212 if repoErr != nil { 213 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 214 return nil 215 } 216 star.SubjectType = models.StarSubjectRepo 217 star.Subject = repo.RepoDid 218 219 case record.Subject.FeedStar_String != nil: 220 star.SubjectType = models.StarSubjectString 221 star.Subject = record.Subject.FeedStar_String.Uri 222 223 default: 224 return fmt.Errorf("star record has empty subject union") 225 } 226 227 err = db.AddStar(i.Db, star) 228 case jmodels.CommitOperationDelete: 229 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 230 } 231 232 if err != nil { 233 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 234 } 235 236 return nil 237} 238 239func (i *Ingester) ingestFollow(e *jmodels.Event) error { 240 var err error 241 did := e.Did 242 243 l := i.Logger.With("handler", "ingestFollow") 244 l = l.With("nsid", e.Commit.Collection) 245 246 switch e.Commit.Operation { 247 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 248 raw := json.RawMessage(e.Commit.Record) 249 record := tangled.GraphFollow{} 250 err = json.Unmarshal(raw, &record) 251 if err != nil { 252 l.Error("invalid record", "err", err) 253 return err 254 } 255 256 err = db.AddFollow(i.Db, &models.Follow{ 257 UserDid: did, 258 SubjectDid: record.Subject, 259 Rkey: e.Commit.RKey, 260 }) 261 case jmodels.CommitOperationDelete: 262 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 263 } 264 265 if err != nil { 266 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 267 } 268 269 return nil 270} 271 272func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error { 273 var err error 274 did := e.Did 275 276 l := i.Logger.With("handler", "ingestVouch") 277 l = l.With("nsid", e.Commit.Collection) 278 l.Info("ingesting vouch") 279 280 switch e.Commit.Operation { 281 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 282 raw := json.RawMessage(e.Commit.Record) 283 record := tangled.GraphVouch{} 284 err = json.Unmarshal(raw, &record) 285 if err != nil { 286 l.Error("invalid record", "err", err) 287 return err 288 } 289 290 // rkey is the subject_did being vouched for/denounced 291 subjectDID := e.Commit.RKey 292 293 _, err = syntax.ParseDID(subjectDID) 294 if err != nil { 295 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID) 296 return fmt.Errorf("invalid subject_did: %w", err) 297 } 298 299 if did == subjectDID { 300 l.Warn("attempted self-vouch", "did", did) 301 return fmt.Errorf("cannot vouch for self") 302 } 303 304 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID) 305 if err != nil { 306 return err 307 } 308 309 if subjectId.Handle.IsInvalidHandle() { 310 return err 311 } 312 313 kind, err := models.ParseVouchKind(record.Kind) 314 if err != nil { 315 l.Error("invalid kind", "kind", kind) 316 return fmt.Errorf("invalid kind: %s", kind) 317 } 318 319 recordCid, err := cid.Parse(e.Commit.CID) 320 if err != nil { 321 l.Error("invalid cid", "err", err, "cid", e.Commit.CID) 322 return fmt.Errorf("invalid cid: %w", err) 323 } 324 325 var evidences []syntax.ATURI 326 for _, raw := range record.Evidences { 327 uri, parseErr := syntax.ParseATURI(raw) 328 if parseErr != nil { 329 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr) 330 continue 331 } 332 evidences = append(evidences, uri) 333 } 334 335 tx, txErr := i.Db.Begin() 336 if txErr != nil { 337 return fmt.Errorf("failed to start transaction: %w", txErr) 338 } 339 340 addErr := db.AddVouch(tx, &models.Vouch{ 341 Did: syntax.DID(did), 342 SubjectDid: subjectId.DID, 343 Cid: recordCid, 344 Kind: kind, 345 Reason: record.Reason, 346 Evidences: evidences, 347 }) 348 if addErr != nil { 349 tx.Rollback() 350 err = addErr 351 } else { 352 err = tx.Commit() 353 } 354 355 case jmodels.CommitOperationDelete: 356 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey) 357 } 358 359 if err != nil { 360 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err) 361 } 362 363 return nil 364} 365 366func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 367 did := e.Did 368 var err error 369 370 l := i.Logger.With("handler", "ingestPublicKey") 371 l = l.With("nsid", e.Commit.Collection) 372 373 switch e.Commit.Operation { 374 case jmodels.CommitOperationCreate: 375 l.Debug("processing add of pubkey") 376 raw := json.RawMessage(e.Commit.Record) 377 record := tangled.PublicKey{} 378 err = json.Unmarshal(raw, &record) 379 if err != nil { 380 l.Error("invalid record", "err", err) 381 return err 382 } 383 384 name := record.Name 385 key := record.Key 386 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 387 case jmodels.CommitOperationUpdate: 388 l.Debug("processing update of pubkey") 389 raw := json.RawMessage(e.Commit.Record) 390 record := tangled.PublicKey{} 391 err = json.Unmarshal(raw, &record) 392 if err != nil { 393 l.Error("invalid record", "err", err) 394 return err 395 } 396 397 name := record.Name 398 key := record.Key 399 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey) 400 case jmodels.CommitOperationDelete: 401 l.Debug("processing delete of pubkey") 402 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 403 } 404 405 if err != nil { 406 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 407 } 408 409 return nil 410} 411 412func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 413 did := e.Did 414 var err error 415 416 l := i.Logger.With("handler", "ingestArtifact") 417 l = l.With("nsid", e.Commit.Collection) 418 419 switch e.Commit.Operation { 420 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 421 raw := json.RawMessage(e.Commit.Record) 422 record := tangled.RepoArtifact{} 423 err = json.Unmarshal(raw, &record) 424 if err != nil { 425 l.Error("invalid record", "err", err) 426 return err 427 } 428 429 var repo *models.Repo 430 if record.RepoDid != nil && *record.RepoDid != "" { 431 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 432 if err != nil && !errors.Is(err, sql.ErrNoRows) { 433 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 434 } 435 } 436 if repo == nil && record.Repo != nil { 437 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 438 if parseErr != nil { 439 return parseErr 440 } 441 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 442 if err != nil { 443 return err 444 } 445 } 446 if repo == nil { 447 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 448 } 449 450 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 451 if err != nil || !ok { 452 return err 453 } 454 455 repoDid := repo.RepoDid 456 if repoDid == "" && record.RepoDid != nil { 457 repoDid = *record.RepoDid 458 } 459 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 460 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 461 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 462 } 463 } 464 465 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 466 if err != nil { 467 createdAt = time.Now() 468 } 469 470 artifact := models.Artifact{ 471 Did: did, 472 Rkey: e.Commit.RKey, 473 RepoDid: syntax.DID(repo.RepoDid), 474 Tag: plumbing.Hash(record.Tag), 475 CreatedAt: createdAt, 476 BlobCid: cid.Cid(record.Artifact.Ref), 477 Name: record.Name, 478 Size: uint64(record.Artifact.Size), 479 MimeType: record.Artifact.MimeType, 480 } 481 482 err = db.AddArtifact(i.Db, artifact) 483 case jmodels.CommitOperationDelete: 484 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 485 } 486 487 if err != nil { 488 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 489 } 490 491 return nil 492} 493 494func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error { 495 did := e.Did 496 var err error 497 498 l := i.Logger.With("handler", "ingestProfile") 499 l = l.With("nsid", e.Commit.Collection) 500 501 if e.Commit.RKey != "self" { 502 return fmt.Errorf("ingestProfile only ingests `self` record") 503 } 504 505 switch e.Commit.Operation { 506 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 507 raw := json.RawMessage(e.Commit.Record) 508 record := tangled.ActorProfile{} 509 err = json.Unmarshal(raw, &record) 510 if err != nil { 511 l.Error("invalid record", "err", err) 512 return err 513 } 514 515 avatar := "" 516 if record.Avatar != nil { 517 avatar = record.Avatar.Ref.String() 518 } 519 520 description := "" 521 if record.Description != nil { 522 description = *record.Description 523 } 524 525 includeBluesky := record.Bluesky 526 527 pronouns := "" 528 if record.Pronouns != nil { 529 pronouns = *record.Pronouns 530 } 531 532 location := "" 533 if record.Location != nil { 534 location = *record.Location 535 } 536 537 var links [5]string 538 for i, l := range record.Links { 539 if i < 5 { 540 links[i] = l 541 } 542 } 543 544 var stats [2]models.VanityStat 545 for i, s := range record.Stats { 546 if i < 2 { 547 stats[i].Kind = models.ParseVanityStatKind(s) 548 } 549 } 550 551 var pinned [6]string 552 for i, r := range record.PinnedRepositories { 553 if i < 6 { 554 pinned[i] = r 555 } 556 } 557 558 var preferredHandle syntax.Handle 559 if record.PreferredHandle != nil { 560 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil { 561 ident, identErr := i.IdResolver.ResolveIdent(ctx, did) 562 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) { 563 preferredHandle = h 564 } 565 } 566 } 567 568 profile := models.Profile{ 569 Did: did, 570 Avatar: avatar, 571 Description: description, 572 IncludeBluesky: includeBluesky, 573 Location: location, 574 Links: links, 575 Stats: stats, 576 PinnedRepos: pinned, 577 Pronouns: pronouns, 578 PreferredHandle: preferredHandle, 579 } 580 581 tx, err := i.Db.Begin() 582 if err != nil { 583 return fmt.Errorf("failed to start transaction: %w", err) 584 } 585 586 err = db.ValidateProfile(tx, &profile) 587 if err != nil { 588 return fmt.Errorf("invalid profile record") 589 } 590 591 err = db.UpsertProfile(tx, &profile) 592 if err == nil && i.Cache != nil { 593 pipe := i.Cache.Pipeline() 594 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did) 595 if preferredHandle != "" { 596 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL) 597 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL) 598 } else { 599 pipe.Del(ctx, didKey) 600 } 601 if _, execErr := pipe.Exec(ctx); execErr != nil { 602 l.Warn("failed to update preferred handle cache", "err", execErr) 603 } 604 } 605 case jmodels.CommitOperationDelete: 606 tx, beginErr := i.Db.Begin() 607 if beginErr != nil { 608 return fmt.Errorf("failed to start transaction: %w", beginErr) 609 } 610 611 priorHandle, phErr := db.GetPreferredHandle(tx, did) 612 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) { 613 l.Warn("failed to read prior preferred handle", "err", phErr) 614 } 615 616 err = db.DeleteProfile(tx, did) 617 if err == nil && i.Cache != nil { 618 pipe := i.Cache.Pipeline() 619 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did)) 620 if priorHandle != "" { 621 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle))) 622 } 623 if _, execErr := pipe.Exec(ctx); execErr != nil { 624 l.Warn("failed to evict preferred handle cache", "err", execErr) 625 } 626 } 627 } 628 629 if err != nil { 630 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 631 } 632 633 return nil 634} 635 636func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 637 did := e.Did 638 var err error 639 640 l := i.Logger.With("handler", "ingestSpindleMember") 641 l = l.With("nsid", e.Commit.Collection) 642 643 switch e.Commit.Operation { 644 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 645 raw := json.RawMessage(e.Commit.Record) 646 record := tangled.SpindleMember{} 647 err = json.Unmarshal(raw, &record) 648 if err != nil { 649 l.Error("invalid record", "err", err) 650 return err 651 } 652 653 // only spindle owner can invite to spindles 654 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 655 if err != nil { 656 return fmt.Errorf("failed to check invite permission: %w", err) 657 } 658 if !ok { 659 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 660 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 661 } 662 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 663 if err != nil { 664 return fmt.Errorf("failed to re-check invite permission: %w", err) 665 } 666 if !ok { 667 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 668 } 669 } 670 671 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 672 if err != nil { 673 return err 674 } 675 676 if memberId.Handle.IsInvalidHandle() { 677 return fmt.Errorf("invalid handle for member %s", record.Subject) 678 } 679 680 existing, err := db.GetSpindleMembers(i.Db, 681 orm.FilterEq("did", did), 682 orm.FilterEq("rkey", e.Commit.RKey), 683 ) 684 if err != nil { 685 return fmt.Errorf("failed to look up existing member: %w", err) 686 } 687 if len(existing) > 1 { 688 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 689 } 690 691 tx, err := i.Db.Begin() 692 if err != nil { 693 return fmt.Errorf("failed to start txn: %w", err) 694 } 695 committed := false 696 defer func() { 697 if committed { 698 return 699 } 700 tx.Rollback() 701 i.Enforcer.E.LoadPolicy() 702 }() 703 704 if len(existing) == 1 { 705 prev := existing[0] 706 if prev.Instance != record.Instance || prev.Subject != memberId.DID { 707 if err = db.RemoveSpindleMember(tx, 708 orm.FilterEq("did", did), 709 orm.FilterEq("rkey", e.Commit.RKey), 710 ); err != nil { 711 return fmt.Errorf("failed to remove stale row: %w", err) 712 } 713 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 714 return fmt.Errorf("failed to remove stale ACL: %w", err) 715 } 716 } 717 } 718 719 if err = db.AddSpindleMember(tx, models.SpindleMember{ 720 Did: syntax.DID(did), 721 Rkey: e.Commit.RKey, 722 Instance: record.Instance, 723 Subject: memberId.DID, 724 }); err != nil { 725 return fmt.Errorf("failed to add to db: %w", err) 726 } 727 728 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 729 return fmt.Errorf("failed to update ACLs: %w", err) 730 } 731 732 if err = tx.Commit(); err != nil { 733 return fmt.Errorf("failed to commit txn: %w", err) 734 } 735 736 if err = i.Enforcer.E.SavePolicy(); err != nil { 737 return fmt.Errorf("failed to save ACLs: %w", err) 738 } 739 committed = true 740 741 l.Info("upserted spindle member") 742 case jmodels.CommitOperationDelete: 743 rkey := e.Commit.RKey 744 745 // get record from db first 746 members, err := db.GetSpindleMembers( 747 i.Db, 748 orm.FilterEq("did", did), 749 orm.FilterEq("rkey", rkey), 750 ) 751 if err != nil || len(members) != 1 { 752 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 753 } 754 member := members[0] 755 756 tx, err := i.Db.Begin() 757 if err != nil { 758 return fmt.Errorf("failed to start txn: %w", err) 759 } 760 committed := false 761 defer func() { 762 if committed { 763 return 764 } 765 tx.Rollback() 766 i.Enforcer.E.LoadPolicy() 767 }() 768 769 // remove record by rkey && update enforcer 770 if err = db.RemoveSpindleMember( 771 tx, 772 orm.FilterEq("did", did), 773 orm.FilterEq("rkey", rkey), 774 ); err != nil { 775 return fmt.Errorf("failed to remove from db: %w", err) 776 } 777 778 // update enforcer 779 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 780 if err != nil { 781 return fmt.Errorf("failed to update ACLs: %w", err) 782 } 783 784 if err = tx.Commit(); err != nil { 785 return fmt.Errorf("failed to commit txn: %w", err) 786 } 787 788 if err = i.Enforcer.E.SavePolicy(); err != nil { 789 return fmt.Errorf("failed to save ACLs: %w", err) 790 } 791 committed = true 792 793 l.Info("removed spindle member") 794 } 795 796 return nil 797} 798 799func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 800 did := e.Did 801 var err error 802 803 l := i.Logger.With("handler", "ingestSpindle") 804 l = l.With("nsid", e.Commit.Collection) 805 806 switch e.Commit.Operation { 807 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 808 raw := json.RawMessage(e.Commit.Record) 809 record := tangled.Spindle{} 810 err = json.Unmarshal(raw, &record) 811 if err != nil { 812 l.Error("invalid record", "err", err) 813 return err 814 } 815 816 instance := e.Commit.RKey 817 818 err := db.AddSpindle(i.Db, models.Spindle{ 819 Owner: syntax.DID(did), 820 Instance: instance, 821 }) 822 if err != nil { 823 l.Error("failed to add spindle to db", "err", err, "instance", instance) 824 return err 825 } 826 827 if err := i.verifySpindle(ctx, instance, did); err != nil { 828 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 829 } 830 831 return nil 832 833 case jmodels.CommitOperationDelete: 834 instance := e.Commit.RKey 835 836 // get record from db first 837 spindles, err := db.GetSpindles( 838 ctx, 839 i.Db, 840 orm.FilterEq("owner", did), 841 orm.FilterEq("instance", instance), 842 ) 843 if err != nil || len(spindles) != 1 { 844 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 845 } 846 spindle := spindles[0] 847 848 tx, err := i.Db.Begin() 849 if err != nil { 850 return err 851 } 852 defer func() { 853 tx.Rollback() 854 i.Enforcer.E.LoadPolicy() 855 }() 856 857 // remove spindle members first 858 err = db.RemoveSpindleMember( 859 tx, 860 orm.FilterEq("owner", did), 861 orm.FilterEq("instance", instance), 862 ) 863 if err != nil { 864 return err 865 } 866 867 err = db.DeleteSpindle( 868 tx, 869 orm.FilterEq("owner", did), 870 orm.FilterEq("instance", instance), 871 ) 872 if err != nil { 873 return err 874 } 875 876 if spindle.Verified != nil { 877 err = i.Enforcer.RemoveSpindle(instance) 878 if err != nil { 879 return err 880 } 881 } 882 883 err = tx.Commit() 884 if err != nil { 885 return err 886 } 887 888 err = i.Enforcer.E.SavePolicy() 889 if err != nil { 890 return err 891 } 892 } 893 894 return nil 895} 896 897func (i *Ingester) ingestString(e *jmodels.Event) error { 898 did := e.Did 899 rkey := e.Commit.RKey 900 901 var err error 902 903 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 904 l.Info("ingesting record") 905 906 switch e.Commit.Operation { 907 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 908 raw := json.RawMessage(e.Commit.Record) 909 record := tangled.String{} 910 err = json.Unmarshal(raw, &record) 911 if err != nil { 912 l.Error("invalid record", "err", err) 913 return err 914 } 915 916 string := models.StringFromRecord(did, rkey, record) 917 918 if err = i.Validator.ValidateString(&string); err != nil { 919 l.Error("invalid record", "err", err) 920 return err 921 } 922 923 if err = db.AddString(i.Db, string); err != nil { 924 l.Error("failed to add string", "err", err) 925 return err 926 } 927 928 return nil 929 930 case jmodels.CommitOperationDelete: 931 if err := db.DeleteString( 932 i.Db, 933 orm.FilterEq("did", did), 934 orm.FilterEq("rkey", rkey), 935 ); err != nil { 936 l.Error("failed to delete", "err", err) 937 return fmt.Errorf("failed to delete string record: %w", err) 938 } 939 940 return nil 941 } 942 943 return nil 944} 945 946func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error { 947 did := e.Did 948 var err error 949 950 l := i.Logger.With("handler", "ingestKnotMember") 951 l = l.With("nsid", e.Commit.Collection) 952 953 switch e.Commit.Operation { 954 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 955 raw := json.RawMessage(e.Commit.Record) 956 record := tangled.KnotMember{} 957 err = json.Unmarshal(raw, &record) 958 if err != nil { 959 l.Error("invalid record", "err", err) 960 return err 961 } 962 963 // only knot owner can invite to knots 964 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 965 if err != nil { 966 return fmt.Errorf("failed to check invite permission: %w", err) 967 } 968 if !ok { 969 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 970 return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 971 } 972 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 973 if err != nil { 974 return fmt.Errorf("failed to re-check invite permission: %w", err) 975 } 976 if !ok { 977 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 978 } 979 } 980 981 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 982 if err != nil { 983 return err 984 } 985 986 if memberId.Handle.IsInvalidHandle() { 987 return fmt.Errorf("invalid handle for member %s", record.Subject) 988 } 989 990 existing, err := db.GetKnotMembers(i.Db, 991 orm.FilterEq("did", did), 992 orm.FilterEq("rkey", e.Commit.RKey), 993 ) 994 if err != nil { 995 return fmt.Errorf("failed to look up existing member: %w", err) 996 } 997 if len(existing) > 1 { 998 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 999 } 1000 1001 tx, err := i.Db.Begin() 1002 if err != nil { 1003 return fmt.Errorf("failed to start txn: %w", err) 1004 } 1005 committed := false 1006 defer func() { 1007 if committed { 1008 return 1009 } 1010 tx.Rollback() 1011 i.Enforcer.E.LoadPolicy() 1012 }() 1013 1014 if len(existing) == 1 { 1015 prev := existing[0] 1016 if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1017 if err = db.RemoveKnotMember(tx, 1018 orm.FilterEq("did", did), 1019 orm.FilterEq("rkey", e.Commit.RKey), 1020 ); err != nil { 1021 return fmt.Errorf("failed to remove stale row: %w", err) 1022 } 1023 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1024 return fmt.Errorf("failed to remove stale ACL: %w", err) 1025 } 1026 } 1027 } 1028 1029 if err = db.AddKnotMember(tx, models.KnotMember{ 1030 Did: syntax.DID(did), 1031 Rkey: e.Commit.RKey, 1032 Domain: record.Domain, 1033 Subject: memberId.DID, 1034 }); err != nil { 1035 return fmt.Errorf("failed to add to db: %w", err) 1036 } 1037 1038 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 1039 return fmt.Errorf("failed to update ACLs: %w", err) 1040 } 1041 1042 if err = tx.Commit(); err != nil { 1043 return fmt.Errorf("failed to commit txn: %w", err) 1044 } 1045 1046 if err = i.Enforcer.E.SavePolicy(); err != nil { 1047 return fmt.Errorf("failed to save ACLs: %w", err) 1048 } 1049 committed = true 1050 1051 l.Info("upserted knot member") 1052 case jmodels.CommitOperationDelete: 1053 rkey := e.Commit.RKey 1054 1055 members, err := db.GetKnotMembers( 1056 i.Db, 1057 orm.FilterEq("did", did), 1058 orm.FilterEq("rkey", rkey), 1059 ) 1060 if err != nil { 1061 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1062 } 1063 if len(members) == 0 { 1064 l.Info("knot member already removed", "rkey", rkey) 1065 return nil 1066 } 1067 if len(members) > 1 { 1068 return fmt.Errorf("multiple knot members with rkey %s", rkey) 1069 } 1070 member := members[0] 1071 1072 tx, err := i.Db.Begin() 1073 if err != nil { 1074 return fmt.Errorf("failed to start txn: %w", err) 1075 } 1076 committed := false 1077 defer func() { 1078 if committed { 1079 return 1080 } 1081 tx.Rollback() 1082 i.Enforcer.E.LoadPolicy() 1083 }() 1084 1085 if err = db.RemoveKnotMember( 1086 tx, 1087 orm.FilterEq("did", did), 1088 orm.FilterEq("rkey", rkey), 1089 ); err != nil { 1090 return fmt.Errorf("failed to remove from db: %w", err) 1091 } 1092 1093 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1094 return fmt.Errorf("failed to update ACLs: %w", err) 1095 } 1096 1097 if err = tx.Commit(); err != nil { 1098 return fmt.Errorf("failed to commit txn: %w", err) 1099 } 1100 1101 if err = i.Enforcer.E.SavePolicy(); err != nil { 1102 return fmt.Errorf("failed to save ACLs: %w", err) 1103 } 1104 committed = true 1105 1106 l.Info("removed knot member") 1107 } 1108 1109 return nil 1110} 1111 1112func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error { 1113 did := e.Did 1114 var err error 1115 1116 l := i.Logger.With("handler", "ingestKnot") 1117 l = l.With("nsid", e.Commit.Collection) 1118 1119 switch e.Commit.Operation { 1120 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1121 raw := json.RawMessage(e.Commit.Record) 1122 record := tangled.Knot{} 1123 err = json.Unmarshal(raw, &record) 1124 if err != nil { 1125 l.Error("invalid record", "err", err) 1126 return err 1127 } 1128 1129 domain := e.Commit.RKey 1130 1131 err := db.AddKnot(i.Db, domain, did) 1132 if err != nil { 1133 l.Error("failed to add knot to db", "err", err, "domain", domain) 1134 return err 1135 } 1136 1137 if err := i.verifyKnot(ctx, domain, did); err != nil { 1138 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 1139 } 1140 1141 return nil 1142 1143 case jmodels.CommitOperationDelete: 1144 domain := e.Commit.RKey 1145 1146 // get record from db first 1147 registrations, err := db.GetRegistrations( 1148 i.Db, 1149 orm.FilterEq("domain", domain), 1150 orm.FilterEq("did", did), 1151 ) 1152 if err != nil { 1153 return fmt.Errorf("failed to get registration: %w", err) 1154 } 1155 if len(registrations) != 1 { 1156 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations)) 1157 } 1158 registration := registrations[0] 1159 1160 tx, err := i.Db.Begin() 1161 if err != nil { 1162 return err 1163 } 1164 defer func() { 1165 tx.Rollback() 1166 i.Enforcer.E.LoadPolicy() 1167 }() 1168 1169 err = db.RemoveKnotMember( 1170 tx, 1171 orm.FilterEq("did", did), 1172 orm.FilterEq("domain", domain), 1173 ) 1174 if err != nil { 1175 return err 1176 } 1177 1178 err = db.DeleteKnot( 1179 tx, 1180 orm.FilterEq("did", did), 1181 orm.FilterEq("domain", domain), 1182 ) 1183 if err != nil { 1184 return err 1185 } 1186 1187 if registration.Registered != nil { 1188 err = i.Enforcer.RemoveKnot(domain) 1189 if err != nil { 1190 return err 1191 } 1192 } 1193 1194 err = tx.Commit() 1195 if err != nil { 1196 return err 1197 } 1198 1199 err = i.Enforcer.E.SavePolicy() 1200 if err != nil { 1201 return err 1202 } 1203 } 1204 1205 return nil 1206} 1207 1208const ( 1209 verifyAttempts = 4 1210 verifyMinDelay = 1 * time.Second 1211 verifyMaxDelay = 5 * time.Second 1212) 1213 1214func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1215 regs, err := db.GetRegistrations(i.Db, 1216 orm.FilterEq("domain", domain), 1217 orm.FilterEq("did", did), 1218 ) 1219 if err != nil { 1220 return fmt.Errorf("look up registration: %w", err) 1221 } 1222 if len(regs) != 1 { 1223 return fmt.Errorf("no registration for %s by %s", domain, did) 1224 } 1225 if regs[0].Registered != nil { 1226 return nil 1227 } 1228 1229 err = retry.Do( 1230 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1231 retry.Context(ctx), 1232 retry.Attempts(verifyAttempts), 1233 retry.Delay(verifyMinDelay), 1234 retry.MaxDelay(verifyMaxDelay), 1235 retry.DelayType(retry.BackOffDelay), 1236 retry.LastErrorOnly(true), 1237 ) 1238 if err != nil { 1239 return fmt.Errorf("verify: %w", err) 1240 } 1241 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1242} 1243 1244func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1245 spindles, err := db.GetSpindles(ctx, i.Db, 1246 orm.FilterEq("instance", instance), 1247 orm.FilterEq("owner", did), 1248 ) 1249 if err != nil { 1250 return fmt.Errorf("look up spindle: %w", err) 1251 } 1252 if len(spindles) != 1 { 1253 return fmt.Errorf("no spindle for %s by %s", instance, did) 1254 } 1255 if spindles[0].Verified != nil { 1256 return nil 1257 } 1258 1259 err = retry.Do( 1260 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1261 retry.Context(ctx), 1262 retry.Attempts(verifyAttempts), 1263 retry.Delay(verifyMinDelay), 1264 retry.MaxDelay(verifyMaxDelay), 1265 retry.DelayType(retry.BackOffDelay), 1266 retry.LastErrorOnly(true), 1267 ) 1268 if err != nil { 1269 return fmt.Errorf("verify: %w", err) 1270 } 1271 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1272 return err 1273} 1274 1275const sweepConcurrency = 4 1276 1277func (i *Ingester) SweepPendingVerifications() { 1278 l := i.Logger.With("handler", "SweepPendingVerifications") 1279 1280 var g errgroup.Group 1281 g.SetLimit(sweepConcurrency) 1282 1283 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1284 if err != nil { 1285 l.Error("failed to list unverified knots", "err", err) 1286 } else { 1287 for _, reg := range regs { 1288 g.Go(func() error { 1289 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1290 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1291 } 1292 return nil 1293 }) 1294 } 1295 } 1296 1297 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1298 if err != nil { 1299 l.Error("failed to list unverified spindles", "err", err) 1300 g.Wait() 1301 return 1302 } 1303 for _, s := range spindles { 1304 g.Go(func() error { 1305 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1306 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1307 } 1308 return nil 1309 }) 1310 } 1311 g.Wait() 1312} 1313 1314func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1315 did := e.Did 1316 rkey := e.Commit.RKey 1317 1318 var err error 1319 1320 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1321 l.Info("ingesting record") 1322 1323 switch e.Commit.Operation { 1324 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1325 raw := json.RawMessage(e.Commit.Record) 1326 record := tangled.RepoIssue{} 1327 err = json.Unmarshal(raw, &record) 1328 if err != nil { 1329 l.Error("invalid record", "err", err) 1330 return err 1331 } 1332 1333 issue := models.IssueFromRecord(did, rkey, record) 1334 1335 if issue.RepoDid == "" { 1336 return fmt.Errorf("issue record has no repo field") 1337 } 1338 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1339 return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 1340 } 1341 1342 if err := i.Validator.ValidateIssue(&issue); err != nil { 1343 return fmt.Errorf("failed to validate issue: %w", err) 1344 } 1345 1346 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") { 1347 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 1348 if repoErr == nil && repo.RepoDid != "" { 1349 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1350 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1351 } 1352 } 1353 } 1354 1355 tx, err := i.Db.BeginTx(ctx, nil) 1356 if err != nil { 1357 l.Error("failed to begin transaction", "err", err) 1358 return err 1359 } 1360 defer tx.Rollback() 1361 1362 err = db.PutIssue(tx, &issue) 1363 if err != nil { 1364 l.Error("failed to create issue", "err", err) 1365 return err 1366 } 1367 1368 err = tx.Commit() 1369 if err != nil { 1370 l.Error("failed to commit txn", "err", err) 1371 return err 1372 } 1373 1374 return nil 1375 1376 case jmodels.CommitOperationDelete: 1377 tx, err := i.Db.BeginTx(ctx, nil) 1378 if err != nil { 1379 l.Error("failed to begin transaction", "err", err) 1380 return err 1381 } 1382 defer tx.Rollback() 1383 1384 if err := db.DeleteIssues( 1385 tx, 1386 did, 1387 rkey, 1388 ); err != nil { 1389 l.Error("failed to delete", "err", err) 1390 return fmt.Errorf("failed to delete issue record: %w", err) 1391 } 1392 if err := tx.Commit(); err != nil { 1393 l.Error("failed to commit txn", "err", err) 1394 return err 1395 } 1396 1397 return nil 1398 } 1399 1400 return nil 1401} 1402 1403func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 1404 did := e.Did 1405 rkey := e.Commit.RKey 1406 1407 var err error 1408 1409 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1410 l.Info("ingesting record") 1411 1412 switch e.Commit.Operation { 1413 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1414 raw := json.RawMessage(e.Commit.Record) 1415 record := tangled.RepoPull{} 1416 err = json.Unmarshal(raw, &record) 1417 if err != nil { 1418 l.Error("invalid record", "err", err) 1419 return err 1420 } 1421 1422 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 1423 if err != nil { 1424 l.Error("failed to resolve did") 1425 return err 1426 } 1427 1428 // go through and fetch all blobs in parallel 1429 readers := make([]*io.ReadCloser, len(record.Rounds)) 1430 var mu sync.Mutex 1431 1432 g, gctx := errgroup.WithContext(ctx) 1433 1434 for idx, b := range record.Rounds { 1435 g.Go(func() error { 1436 // for some reason, a blob is empty 1437 if b.PatchBlob == nil { 1438 return fmt.Errorf("missing patchBlob in round %d", idx) 1439 } 1440 1441 ownerPds := ownerId.PDSEndpoint() 1442 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 1443 q := url.Query() 1444 q.Set("cid", b.PatchBlob.Ref.String()) 1445 q.Set("did", did) 1446 url.RawQuery = q.Encode() 1447 1448 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 1449 if err != nil { 1450 l.Error("failed to create request") 1451 return err 1452 } 1453 req.Header.Set("Content-Type", "application/json") 1454 1455 resp, err := http.DefaultClient.Do(req) 1456 if err != nil { 1457 l.Error("failed to make request") 1458 return err 1459 } 1460 1461 mu.Lock() 1462 readers[idx] = &resp.Body 1463 mu.Unlock() 1464 1465 return nil 1466 }) 1467 } 1468 1469 if err := g.Wait(); err != nil { 1470 for _, r := range readers { 1471 if r != nil && *r != nil { 1472 (*r).Close() 1473 } 1474 } 1475 return err 1476 } 1477 1478 defer func() { 1479 for _, r := range readers { 1480 if r != nil && *r != nil { 1481 (*r).Close() 1482 } 1483 } 1484 }() 1485 1486 pull, err := models.PullFromRecord(did, rkey, record, readers) 1487 if err != nil { 1488 return fmt.Errorf("failed to parse pull from record: %w", err) 1489 } 1490 if err := i.Validator.ValidatePull(pull); err != nil { 1491 return fmt.Errorf("failed to validate pull: %w", err) 1492 } 1493 1494 tx, err := i.Db.BeginTx(ctx, nil) 1495 if err != nil { 1496 l.Error("failed to begin transaction", "err", err) 1497 return err 1498 } 1499 defer tx.Rollback() 1500 1501 err = db.PutPull(tx, pull) 1502 if err != nil { 1503 l.Error("failed to create pull", "err", err) 1504 return err 1505 } 1506 1507 err = tx.Commit() 1508 if err != nil { 1509 l.Error("failed to commit txn", "err", err) 1510 return err 1511 } 1512 1513 return nil 1514 1515 case jmodels.CommitOperationDelete: 1516 tx, err := i.Db.BeginTx(ctx, nil) 1517 if err != nil { 1518 l.Error("failed to begin transaction", "err", err) 1519 return err 1520 } 1521 defer tx.Rollback() 1522 1523 if err := db.AbandonPulls( 1524 tx, 1525 orm.FilterEq("owner_did", did), 1526 orm.FilterEq("rkey", rkey), 1527 ); err != nil { 1528 l.Error("failed to abandon", "err", err) 1529 return fmt.Errorf("failed to abandon pull record: %w", err) 1530 } 1531 if err := tx.Commit(); err != nil { 1532 l.Error("failed to commit txn", "err", err) 1533 return err 1534 } 1535 1536 return nil 1537 } 1538 1539 return nil 1540} 1541 1542func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1543 did := e.Did 1544 rkey := e.Commit.RKey 1545 1546 var err error 1547 1548 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1549 l.Info("ingesting record") 1550 1551 switch e.Commit.Operation { 1552 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1553 raw := json.RawMessage(e.Commit.Record) 1554 record := tangled.RepoIssueComment{} 1555 err = json.Unmarshal(raw, &record) 1556 if err != nil { 1557 return fmt.Errorf("invalid record: %w", err) 1558 } 1559 1560 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1561 if err != nil { 1562 return fmt.Errorf("failed to parse comment from record: %w", err) 1563 } 1564 1565 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1566 return fmt.Errorf("failed to validate comment: %w", err) 1567 } 1568 1569 tx, err := i.Db.Begin() 1570 if err != nil { 1571 return fmt.Errorf("failed to start transaction: %w", err) 1572 } 1573 defer tx.Rollback() 1574 1575 _, err = db.AddIssueComment(tx, *comment) 1576 if err != nil { 1577 return fmt.Errorf("failed to create issue comment: %w", err) 1578 } 1579 1580 return tx.Commit() 1581 1582 case jmodels.CommitOperationDelete: 1583 if err := db.DeleteIssueComments( 1584 i.Db, 1585 orm.FilterEq("did", did), 1586 orm.FilterEq("rkey", rkey), 1587 ); err != nil { 1588 return fmt.Errorf("failed to delete issue comment record: %w", err) 1589 } 1590 1591 return nil 1592 } 1593 1594 return nil 1595} 1596 1597func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1598 did := e.Did 1599 rkey := e.Commit.RKey 1600 1601 var err error 1602 1603 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1604 l.Info("ingesting record") 1605 1606 switch e.Commit.Operation { 1607 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1608 raw := json.RawMessage(e.Commit.Record) 1609 record := tangled.LabelDefinition{} 1610 err = json.Unmarshal(raw, &record) 1611 if err != nil { 1612 return fmt.Errorf("invalid record: %w", err) 1613 } 1614 1615 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1616 if err != nil { 1617 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1618 } 1619 1620 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1621 return fmt.Errorf("failed to validate labeldef: %w", err) 1622 } 1623 1624 _, err = db.AddLabelDefinition(i.Db, def) 1625 if err != nil { 1626 return fmt.Errorf("failed to create labeldef: %w", err) 1627 } 1628 1629 return nil 1630 1631 case jmodels.CommitOperationDelete: 1632 if err := db.DeleteLabelDefinition( 1633 i.Db, 1634 orm.FilterEq("did", did), 1635 orm.FilterEq("rkey", rkey), 1636 ); err != nil { 1637 return fmt.Errorf("failed to delete labeldef record: %w", err) 1638 } 1639 1640 return nil 1641 } 1642 1643 return nil 1644} 1645 1646func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1647 did := e.Did 1648 rkey := e.Commit.RKey 1649 1650 var err error 1651 1652 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1653 l.Info("ingesting record") 1654 1655 switch e.Commit.Operation { 1656 case jmodels.CommitOperationCreate: 1657 raw := json.RawMessage(e.Commit.Record) 1658 record := tangled.LabelOp{} 1659 err = json.Unmarshal(raw, &record) 1660 if err != nil { 1661 return fmt.Errorf("invalid record: %w", err) 1662 } 1663 1664 subject := syntax.ATURI(record.Subject) 1665 collection := subject.Collection() 1666 1667 var repo *models.Repo 1668 switch collection { 1669 case tangled.RepoIssueNSID: 1670 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1671 if err != nil || len(i) != 1 { 1672 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1673 } 1674 repo = i[0].Repo 1675 default: 1676 return fmt.Errorf("unsupported label subject: %s", collection) 1677 } 1678 1679 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1680 if err != nil { 1681 return fmt.Errorf("failed to build label application ctx: %w", err) 1682 } 1683 1684 ops := models.LabelOpsFromRecord(did, rkey, record) 1685 1686 for _, o := range ops { 1687 def, ok := actx.Defs[o.OperandKey] 1688 if !ok { 1689 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1690 } 1691 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1692 return fmt.Errorf("failed to validate labelop: %w", err) 1693 } 1694 } 1695 1696 tx, err := i.Db.Begin() 1697 if err != nil { 1698 return err 1699 } 1700 defer tx.Rollback() 1701 1702 for _, o := range ops { 1703 _, err = db.AddLabelOp(tx, &o) 1704 if err != nil { 1705 return fmt.Errorf("failed to add labelop: %w", err) 1706 } 1707 } 1708 1709 if err = tx.Commit(); err != nil { 1710 return err 1711 } 1712 } 1713 1714 return nil 1715}