Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/mentions"
31 "tangled.org/core/appview/models"
32 "tangled.org/core/appview/notify"
33 "tangled.org/core/appview/repoverify"
34 "tangled.org/core/appview/serververify"
35 "tangled.org/core/appview/validator"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/orm"
38 "tangled.org/core/rbac"
39)
40
41type Ingester struct {
42 Ctx context.Context
43 Db *db.DB
44 Enforcer *rbac.Enforcer
45 IdResolver *idresolver.Resolver
46 Cache *cache.Cache
47 Config *config.Config
48 Logger *slog.Logger
49 Validator *validator.Validator
50 MentionsResolver *mentions.Resolver
51 Notifier notify.Notifier
52 Verifier repoverify.Verifier
53}
54
55type processFunc func(ctx context.Context, e *jmodels.Event) error
56
57func (i *Ingester) Ingest() processFunc {
58 return func(ctx context.Context, e *jmodels.Event) error {
59 var err error
60
61 l := i.Logger.With("kind", e.Kind)
62 switch e.Kind {
63 case jmodels.EventKindAccount:
64 // TODO: sync account state to db
65 if e.Account.Active {
66 break
67 }
68 // TODO: revoke sessions by DID
69 if *e.Account.Status == "deactivated" {
70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
71 }
72 case jmodels.EventKindIdentity:
73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
74 case jmodels.EventKindCommit:
75 switch e.Commit.Collection {
76 case tangled.GraphFollowNSID:
77 err = i.ingestFollow(e)
78 case tangled.GraphVouchNSID:
79 err = i.ingestVouch(ctx, e)
80 case tangled.FeedStarNSID:
81 err = i.ingestStar(ctx, e)
82 case tangled.PublicKeyNSID:
83 err = i.ingestPublicKey(e)
84 case tangled.RepoArtifactNSID:
85 err = i.ingestArtifact(ctx, e)
86 case tangled.ActorProfileNSID:
87 err = i.ingestProfile(ctx, e)
88 case tangled.SpindleMemberNSID:
89 err = i.ingestSpindleMember(ctx, e)
90 case tangled.SpindleNSID:
91 err = i.ingestSpindle(ctx, e)
92 case tangled.KnotMemberNSID:
93 err = i.ingestKnotMember(ctx, e)
94 case tangled.KnotNSID:
95 err = i.ingestKnot(ctx, e)
96 case tangled.StringNSID:
97 err = i.ingestString(e)
98 case tangled.RepoIssueNSID:
99 err = i.ingestIssue(ctx, e)
100 case tangled.RepoPullNSID:
101 err = i.ingestPull(ctx, e)
102 case tangled.FeedCommentNSID:
103 err = i.ingestComment(e)
104 case tangled.RepoIssueCommentNSID:
105 err = i.ingestIssueComment(e)
106 case tangled.RepoPullCommentNSID:
107 err = i.ingestPullComment(e)
108 case tangled.LabelDefinitionNSID:
109 err = i.ingestLabelDefinition(e)
110 case tangled.LabelOpNSID:
111 err = i.ingestLabelOp(e)
112 case tangled.RepoNSID:
113 err = i.ingestRepo(ctx, e)
114 }
115 l = i.Logger.With("nsid", e.Commit.Collection)
116 }
117
118 if err != nil {
119 l.Warn("failed to ingest record, skipping", "err", err)
120 }
121
122 lastTimeUs := e.TimeUS + 1
123 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
124 l.Error("failed to save cursor", "err", saveErr)
125 }
126
127 return nil
128 }
129}
130
131func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
132 if strings.HasPrefix(ref, "did:") {
133 return db.GetRepoByDid(i.Db, ref)
134 }
135 return db.GetRepoByAtUri(i.Db, ref)
136}
137
138func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
139 var legacy struct {
140 Subject *string `json:"subject"`
141 SubjectDid *string `json:"subjectDid"`
142 }
143 if err := json.Unmarshal(raw, &legacy); err != nil {
144 return false, err
145 }
146
147 switch {
148 case legacy.SubjectDid != nil:
149 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
150 if err != nil {
151 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
152 return false, nil
153 }
154 star.SubjectType = models.StarSubjectRepo
155 star.Subject = repo.RepoDid
156 return true, nil
157
158 case legacy.Subject != nil:
159 uri, err := syntax.ParseATURI(*legacy.Subject)
160 if err != nil {
161 return false, fmt.Errorf("invalid old-format star subject: %w", err)
162 }
163 switch uri.Collection().String() {
164 case tangled.RepoNSID:
165 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
166 if err != nil {
167 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
168 return false, nil
169 }
170 star.SubjectType = models.StarSubjectRepo
171 star.Subject = repo.RepoDid
172 return true, nil
173 default:
174 star.SubjectType = models.StarSubjectString
175 star.Subject = *legacy.Subject
176 return true, nil
177 }
178
179 default:
180 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
181 }
182}
183
184func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
185 var err error
186 did := e.Did
187
188 l := i.Logger.With("handler", "ingestStar")
189 l = l.With("nsid", e.Commit.Collection)
190
191 switch e.Commit.Operation {
192 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
193 raw := json.RawMessage(e.Commit.Record)
194 record := tangled.FeedStar{}
195 unmarshalErr := json.Unmarshal(raw, &record)
196
197 star := &models.Star{
198 Did: did,
199 Rkey: e.Commit.RKey,
200 }
201
202 switch {
203 case unmarshalErr != nil:
204 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
205 if resolveErr != nil {
206 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
207 return unmarshalErr
208 }
209 if !resolved {
210 return nil
211 }
212
213 case record.Subject == nil:
214 return fmt.Errorf("star record has nil subject")
215
216 case record.Subject.FeedStar_Repo != nil:
217 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
218 if repoErr != nil {
219 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
220 return nil
221 }
222 star.SubjectType = models.StarSubjectRepo
223 star.Subject = repo.RepoDid
224
225 case record.Subject.FeedStar_String != nil:
226 star.SubjectType = models.StarSubjectString
227 star.Subject = record.Subject.FeedStar_String.Uri
228
229 default:
230 return fmt.Errorf("star record has empty subject union")
231 }
232
233 err = db.AddStar(i.Db, star)
234 case jmodels.CommitOperationDelete:
235 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
236 }
237
238 if err != nil {
239 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
240 }
241
242 return nil
243}
244
245func (i *Ingester) ingestFollow(e *jmodels.Event) error {
246 var err error
247 did := e.Did
248
249 l := i.Logger.With("handler", "ingestFollow")
250 l = l.With("nsid", e.Commit.Collection)
251
252 switch e.Commit.Operation {
253 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
254 raw := json.RawMessage(e.Commit.Record)
255 record := tangled.GraphFollow{}
256 err = json.Unmarshal(raw, &record)
257 if err != nil {
258 l.Error("invalid record", "err", err)
259 return err
260 }
261
262 err = db.AddFollow(i.Db, &models.Follow{
263 UserDid: did,
264 SubjectDid: record.Subject,
265 Rkey: e.Commit.RKey,
266 })
267 case jmodels.CommitOperationDelete:
268 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
269 }
270
271 if err != nil {
272 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
273 }
274
275 return nil
276}
277
278func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
279 var err error
280 did := e.Did
281
282 l := i.Logger.With("handler", "ingestVouch")
283 l = l.With("nsid", e.Commit.Collection)
284 l.Info("ingesting vouch")
285
286 switch e.Commit.Operation {
287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
288 raw := json.RawMessage(e.Commit.Record)
289 record := tangled.GraphVouch{}
290 err = json.Unmarshal(raw, &record)
291 if err != nil {
292 l.Error("invalid record", "err", err)
293 return err
294 }
295
296 // rkey is the subject_did being vouched for/denounced
297 subjectDID := e.Commit.RKey
298
299 _, err = syntax.ParseDID(subjectDID)
300 if err != nil {
301 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
302 return fmt.Errorf("invalid subject_did: %w", err)
303 }
304
305 if did == subjectDID {
306 l.Warn("attempted self-vouch", "did", did)
307 return fmt.Errorf("cannot vouch for self")
308 }
309
310 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
311 if err != nil {
312 return err
313 }
314
315 if subjectId.Handle.IsInvalidHandle() {
316 return err
317 }
318
319 kind, err := models.ParseVouchKind(record.Kind)
320 if err != nil {
321 l.Error("invalid kind", "kind", kind)
322 return fmt.Errorf("invalid kind: %s", kind)
323 }
324
325 recordCid, err := cid.Parse(e.Commit.CID)
326 if err != nil {
327 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
328 return fmt.Errorf("invalid cid: %w", err)
329 }
330
331 var evidences []syntax.ATURI
332 for _, raw := range record.Evidences {
333 uri, parseErr := syntax.ParseATURI(raw)
334 if parseErr != nil {
335 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
336 continue
337 }
338 evidences = append(evidences, uri)
339 }
340
341 tx, txErr := i.Db.Begin()
342 if txErr != nil {
343 return fmt.Errorf("failed to start transaction: %w", txErr)
344 }
345
346 addErr := db.AddVouch(tx, &models.Vouch{
347 Did: syntax.DID(did),
348 SubjectDid: subjectId.DID,
349 Cid: recordCid,
350 Kind: kind,
351 Reason: record.Reason,
352 Evidences: evidences,
353 })
354 if addErr != nil {
355 tx.Rollback()
356 err = addErr
357 } else {
358 err = tx.Commit()
359 }
360
361 case jmodels.CommitOperationDelete:
362 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
363 }
364
365 if err != nil {
366 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
367 }
368
369 return nil
370}
371
372func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
373 did := e.Did
374 var err error
375
376 l := i.Logger.With("handler", "ingestPublicKey")
377 l = l.With("nsid", e.Commit.Collection)
378
379 switch e.Commit.Operation {
380 case jmodels.CommitOperationCreate:
381 l.Debug("processing add of pubkey")
382 raw := json.RawMessage(e.Commit.Record)
383 record := tangled.PublicKey{}
384 err = json.Unmarshal(raw, &record)
385 if err != nil {
386 l.Error("invalid record", "err", err)
387 return err
388 }
389
390 name := record.Name
391 key := record.Key
392 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
393 case jmodels.CommitOperationUpdate:
394 l.Debug("processing update of pubkey")
395 raw := json.RawMessage(e.Commit.Record)
396 record := tangled.PublicKey{}
397 err = json.Unmarshal(raw, &record)
398 if err != nil {
399 l.Error("invalid record", "err", err)
400 return err
401 }
402
403 name := record.Name
404 key := record.Key
405 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
406 case jmodels.CommitOperationDelete:
407 l.Debug("processing delete of pubkey")
408 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
409 }
410
411 if err != nil {
412 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
413 }
414
415 return nil
416}
417
418func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
419 did := e.Did
420 var err error
421
422 l := i.Logger.With("handler", "ingestArtifact")
423 l = l.With("nsid", e.Commit.Collection)
424
425 switch e.Commit.Operation {
426 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
427 raw := json.RawMessage(e.Commit.Record)
428 record := tangled.RepoArtifact{}
429 err = json.Unmarshal(raw, &record)
430 if err != nil {
431 l.Error("invalid record", "err", err)
432 return err
433 }
434
435 var repo *models.Repo
436 if record.RepoDid != nil && *record.RepoDid != "" {
437 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
438 if err != nil && !errors.Is(err, sql.ErrNoRows) {
439 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
440 }
441 }
442 if repo == nil && record.Repo != nil {
443 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
444 if parseErr != nil {
445 return parseErr
446 }
447 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
448 if err != nil {
449 return err
450 }
451 }
452 if repo == nil {
453 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
454 }
455
456 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
457 if err != nil || !ok {
458 return err
459 }
460
461 repoDid := repo.RepoDid
462 if repoDid == "" && record.RepoDid != nil {
463 repoDid = *record.RepoDid
464 }
465 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
466 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
467 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
468 }
469 }
470
471 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
472 if err != nil {
473 createdAt = time.Now()
474 }
475
476 artifact := models.Artifact{
477 Did: did,
478 Rkey: e.Commit.RKey,
479 RepoDid: syntax.DID(repo.RepoDid),
480 Tag: plumbing.Hash(record.Tag),
481 CreatedAt: createdAt,
482 BlobCid: cid.Cid(record.Artifact.Ref),
483 Name: record.Name,
484 Size: uint64(record.Artifact.Size),
485 MimeType: record.Artifact.MimeType,
486 }
487
488 err = db.AddArtifact(i.Db, artifact)
489 case jmodels.CommitOperationDelete:
490 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
491 }
492
493 if err != nil {
494 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
495 }
496
497 return nil
498}
499
500func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
501 did := e.Did
502 var err error
503
504 l := i.Logger.With("handler", "ingestProfile")
505 l = l.With("nsid", e.Commit.Collection)
506
507 if e.Commit.RKey != "self" {
508 return fmt.Errorf("ingestProfile only ingests `self` record")
509 }
510
511 switch e.Commit.Operation {
512 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
513 raw := json.RawMessage(e.Commit.Record)
514 record := tangled.ActorProfile{}
515 err = json.Unmarshal(raw, &record)
516 if err != nil {
517 l.Error("invalid record", "err", err)
518 return err
519 }
520
521 avatar := ""
522 if record.Avatar != nil {
523 avatar = record.Avatar.Ref.String()
524 }
525
526 description := ""
527 if record.Description != nil {
528 description = *record.Description
529 }
530
531 includeBluesky := record.Bluesky
532
533 pronouns := ""
534 if record.Pronouns != nil {
535 pronouns = *record.Pronouns
536 }
537
538 location := ""
539 if record.Location != nil {
540 location = *record.Location
541 }
542
543 var links [5]string
544 for i, l := range record.Links {
545 if i < 5 {
546 links[i] = l
547 }
548 }
549
550 var stats [2]models.VanityStat
551 for i, s := range record.Stats {
552 if i < 2 {
553 stats[i].Kind = models.ParseVanityStatKind(s)
554 }
555 }
556
557 var pinned [6]string
558 for i, r := range record.PinnedRepositories {
559 if i < 6 {
560 pinned[i] = r
561 }
562 }
563
564 var preferredHandle syntax.Handle
565 if record.PreferredHandle != nil {
566 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
567 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
568 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
569 preferredHandle = h
570 }
571 }
572 }
573
574 profile := models.Profile{
575 Did: did,
576 Avatar: avatar,
577 Description: description,
578 IncludeBluesky: includeBluesky,
579 Location: location,
580 Links: links,
581 Stats: stats,
582 PinnedRepos: pinned,
583 Pronouns: pronouns,
584 PreferredHandle: preferredHandle,
585 }
586
587 tx, err := i.Db.Begin()
588 if err != nil {
589 return fmt.Errorf("failed to start transaction: %w", err)
590 }
591
592 err = db.ValidateProfile(tx, &profile)
593 if err != nil {
594 return fmt.Errorf("invalid profile record")
595 }
596
597 err = db.UpsertProfile(tx, &profile)
598 if err == nil && i.Cache != nil {
599 pipe := i.Cache.Pipeline()
600 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
601 if preferredHandle != "" {
602 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
603 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
604 } else {
605 pipe.Del(ctx, didKey)
606 }
607 if _, execErr := pipe.Exec(ctx); execErr != nil {
608 l.Warn("failed to update preferred handle cache", "err", execErr)
609 }
610 }
611 case jmodels.CommitOperationDelete:
612 tx, beginErr := i.Db.Begin()
613 if beginErr != nil {
614 return fmt.Errorf("failed to start transaction: %w", beginErr)
615 }
616
617 priorHandle, phErr := db.GetPreferredHandle(tx, did)
618 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
619 l.Warn("failed to read prior preferred handle", "err", phErr)
620 }
621
622 err = db.DeleteProfile(tx, did)
623 if err == nil && i.Cache != nil {
624 pipe := i.Cache.Pipeline()
625 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
626 if priorHandle != "" {
627 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
628 }
629 if _, execErr := pipe.Exec(ctx); execErr != nil {
630 l.Warn("failed to evict preferred handle cache", "err", execErr)
631 }
632 }
633 }
634
635 if err != nil {
636 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
637 }
638
639 return nil
640}
641
642func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
643 did := e.Did
644 var err error
645
646 l := i.Logger.With("handler", "ingestSpindleMember")
647 l = l.With("nsid", e.Commit.Collection)
648
649 switch e.Commit.Operation {
650 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
651 raw := json.RawMessage(e.Commit.Record)
652 record := tangled.SpindleMember{}
653 err = json.Unmarshal(raw, &record)
654 if err != nil {
655 l.Error("invalid record", "err", err)
656 return err
657 }
658
659 // only spindle owner can invite to spindles
660 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
661 if err != nil {
662 return fmt.Errorf("failed to check invite permission: %w", err)
663 }
664 if !ok {
665 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
666 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
667 }
668 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
669 if err != nil {
670 return fmt.Errorf("failed to re-check invite permission: %w", err)
671 }
672 if !ok {
673 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
674 }
675 }
676
677 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
678 if err != nil {
679 return err
680 }
681
682 if memberId.Handle.IsInvalidHandle() {
683 return fmt.Errorf("invalid handle for member %s", record.Subject)
684 }
685
686 existing, err := db.GetSpindleMembers(i.Db,
687 orm.FilterEq("did", did),
688 orm.FilterEq("rkey", e.Commit.RKey),
689 )
690 if err != nil {
691 return fmt.Errorf("failed to look up existing member: %w", err)
692 }
693 if len(existing) > 1 {
694 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
695 }
696
697 tx, err := i.Db.Begin()
698 if err != nil {
699 return fmt.Errorf("failed to start txn: %w", err)
700 }
701 committed := false
702 defer func() {
703 if committed {
704 return
705 }
706 tx.Rollback()
707 i.Enforcer.E.LoadPolicy()
708 }()
709
710 if len(existing) == 1 {
711 prev := existing[0]
712 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
713 if err = db.RemoveSpindleMember(tx,
714 orm.FilterEq("did", did),
715 orm.FilterEq("rkey", e.Commit.RKey),
716 ); err != nil {
717 return fmt.Errorf("failed to remove stale row: %w", err)
718 }
719 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
720 return fmt.Errorf("failed to remove stale ACL: %w", err)
721 }
722 }
723 }
724
725 if err = db.AddSpindleMember(tx, models.SpindleMember{
726 Did: syntax.DID(did),
727 Rkey: e.Commit.RKey,
728 Instance: record.Instance,
729 Subject: memberId.DID,
730 }); err != nil {
731 return fmt.Errorf("failed to add to db: %w", err)
732 }
733
734 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
735 return fmt.Errorf("failed to update ACLs: %w", err)
736 }
737
738 if err = tx.Commit(); err != nil {
739 return fmt.Errorf("failed to commit txn: %w", err)
740 }
741
742 if err = i.Enforcer.E.SavePolicy(); err != nil {
743 return fmt.Errorf("failed to save ACLs: %w", err)
744 }
745 committed = true
746
747 l.Info("upserted spindle member")
748 case jmodels.CommitOperationDelete:
749 rkey := e.Commit.RKey
750
751 // get record from db first
752 members, err := db.GetSpindleMembers(
753 i.Db,
754 orm.FilterEq("did", did),
755 orm.FilterEq("rkey", rkey),
756 )
757 if err != nil || len(members) != 1 {
758 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
759 }
760 member := members[0]
761
762 tx, err := i.Db.Begin()
763 if err != nil {
764 return fmt.Errorf("failed to start txn: %w", err)
765 }
766 committed := false
767 defer func() {
768 if committed {
769 return
770 }
771 tx.Rollback()
772 i.Enforcer.E.LoadPolicy()
773 }()
774
775 // remove record by rkey && update enforcer
776 if err = db.RemoveSpindleMember(
777 tx,
778 orm.FilterEq("did", did),
779 orm.FilterEq("rkey", rkey),
780 ); err != nil {
781 return fmt.Errorf("failed to remove from db: %w", err)
782 }
783
784 // update enforcer
785 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
786 if err != nil {
787 return fmt.Errorf("failed to update ACLs: %w", err)
788 }
789
790 if err = tx.Commit(); err != nil {
791 return fmt.Errorf("failed to commit txn: %w", err)
792 }
793
794 if err = i.Enforcer.E.SavePolicy(); err != nil {
795 return fmt.Errorf("failed to save ACLs: %w", err)
796 }
797 committed = true
798
799 l.Info("removed spindle member")
800 }
801
802 return nil
803}
804
805func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
806 did := e.Did
807 var err error
808
809 l := i.Logger.With("handler", "ingestSpindle")
810 l = l.With("nsid", e.Commit.Collection)
811
812 switch e.Commit.Operation {
813 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
814 raw := json.RawMessage(e.Commit.Record)
815 record := tangled.Spindle{}
816 err = json.Unmarshal(raw, &record)
817 if err != nil {
818 l.Error("invalid record", "err", err)
819 return err
820 }
821
822 instance := e.Commit.RKey
823
824 err := db.AddSpindle(i.Db, models.Spindle{
825 Owner: syntax.DID(did),
826 Instance: instance,
827 })
828 if err != nil {
829 l.Error("failed to add spindle to db", "err", err, "instance", instance)
830 return err
831 }
832
833 if err := i.verifySpindle(ctx, instance, did); err != nil {
834 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
835 }
836
837 return nil
838
839 case jmodels.CommitOperationDelete:
840 instance := e.Commit.RKey
841
842 // get record from db first
843 spindles, err := db.GetSpindles(
844 ctx,
845 i.Db,
846 orm.FilterEq("owner", did),
847 orm.FilterEq("instance", instance),
848 )
849 if err != nil || len(spindles) != 1 {
850 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
851 }
852 spindle := spindles[0]
853
854 tx, err := i.Db.Begin()
855 if err != nil {
856 return err
857 }
858 defer func() {
859 tx.Rollback()
860 i.Enforcer.E.LoadPolicy()
861 }()
862
863 // remove spindle members first
864 err = db.RemoveSpindleMember(
865 tx,
866 orm.FilterEq("owner", did),
867 orm.FilterEq("instance", instance),
868 )
869 if err != nil {
870 return err
871 }
872
873 err = db.DeleteSpindle(
874 tx,
875 orm.FilterEq("owner", did),
876 orm.FilterEq("instance", instance),
877 )
878 if err != nil {
879 return err
880 }
881
882 if spindle.Verified != nil {
883 err = i.Enforcer.RemoveSpindle(instance)
884 if err != nil {
885 return err
886 }
887 }
888
889 err = tx.Commit()
890 if err != nil {
891 return err
892 }
893
894 err = i.Enforcer.E.SavePolicy()
895 if err != nil {
896 return err
897 }
898 }
899
900 return nil
901}
902
903func (i *Ingester) ingestString(e *jmodels.Event) error {
904 did := e.Did
905 rkey := e.Commit.RKey
906
907 var err error
908
909 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
910 l.Info("ingesting record")
911
912 switch e.Commit.Operation {
913 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
914 raw := json.RawMessage(e.Commit.Record)
915 record := tangled.String{}
916 err = json.Unmarshal(raw, &record)
917 if err != nil {
918 l.Error("invalid record", "err", err)
919 return err
920 }
921
922 string := models.StringFromRecord(did, rkey, record)
923
924 if err = i.Validator.ValidateString(&string); err != nil {
925 l.Error("invalid record", "err", err)
926 return err
927 }
928
929 if err = db.AddString(i.Db, string); err != nil {
930 l.Error("failed to add string", "err", err)
931 return err
932 }
933
934 return nil
935
936 case jmodels.CommitOperationDelete:
937 if err := db.DeleteString(
938 i.Db,
939 orm.FilterEq("did", did),
940 orm.FilterEq("rkey", rkey),
941 ); err != nil {
942 l.Error("failed to delete", "err", err)
943 return fmt.Errorf("failed to delete string record: %w", err)
944 }
945
946 return nil
947 }
948
949 return nil
950}
951
952func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error {
953 did := e.Did
954 var err error
955
956 l := i.Logger.With("handler", "ingestKnotMember")
957 l = l.With("nsid", e.Commit.Collection)
958
959 switch e.Commit.Operation {
960 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
961 raw := json.RawMessage(e.Commit.Record)
962 record := tangled.KnotMember{}
963 err = json.Unmarshal(raw, &record)
964 if err != nil {
965 l.Error("invalid record", "err", err)
966 return err
967 }
968
969 // only knot owner can invite to knots
970 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
971 if err != nil {
972 return fmt.Errorf("failed to check invite permission: %w", err)
973 }
974 if !ok {
975 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
976 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
977 }
978 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
979 if err != nil {
980 return fmt.Errorf("failed to re-check invite permission: %w", err)
981 }
982 if !ok {
983 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
984 }
985 }
986
987 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
988 if err != nil {
989 return err
990 }
991
992 if memberId.Handle.IsInvalidHandle() {
993 return fmt.Errorf("invalid handle for member %s", record.Subject)
994 }
995
996 existing, err := db.GetKnotMembers(i.Db,
997 orm.FilterEq("did", did),
998 orm.FilterEq("rkey", e.Commit.RKey),
999 )
1000 if err != nil {
1001 return fmt.Errorf("failed to look up existing member: %w", err)
1002 }
1003 if len(existing) > 1 {
1004 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1005 }
1006
1007 tx, err := i.Db.Begin()
1008 if err != nil {
1009 return fmt.Errorf("failed to start txn: %w", err)
1010 }
1011 committed := false
1012 defer func() {
1013 if committed {
1014 return
1015 }
1016 tx.Rollback()
1017 i.Enforcer.E.LoadPolicy()
1018 }()
1019
1020 if len(existing) == 1 {
1021 prev := existing[0]
1022 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1023 if err = db.RemoveKnotMember(tx,
1024 orm.FilterEq("did", did),
1025 orm.FilterEq("rkey", e.Commit.RKey),
1026 ); err != nil {
1027 return fmt.Errorf("failed to remove stale row: %w", err)
1028 }
1029 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1030 return fmt.Errorf("failed to remove stale ACL: %w", err)
1031 }
1032 }
1033 }
1034
1035 if err = db.AddKnotMember(tx, models.KnotMember{
1036 Did: syntax.DID(did),
1037 Rkey: e.Commit.RKey,
1038 Domain: record.Domain,
1039 Subject: memberId.DID,
1040 }); err != nil {
1041 return fmt.Errorf("failed to add to db: %w", err)
1042 }
1043
1044 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1045 return fmt.Errorf("failed to update ACLs: %w", err)
1046 }
1047
1048 if err = tx.Commit(); err != nil {
1049 return fmt.Errorf("failed to commit txn: %w", err)
1050 }
1051
1052 if err = i.Enforcer.E.SavePolicy(); err != nil {
1053 return fmt.Errorf("failed to save ACLs: %w", err)
1054 }
1055 committed = true
1056
1057 l.Info("upserted knot member")
1058 case jmodels.CommitOperationDelete:
1059 rkey := e.Commit.RKey
1060
1061 members, err := db.GetKnotMembers(
1062 i.Db,
1063 orm.FilterEq("did", did),
1064 orm.FilterEq("rkey", rkey),
1065 )
1066 if err != nil {
1067 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1068 }
1069 if len(members) == 0 {
1070 l.Info("knot member already removed", "rkey", rkey)
1071 return nil
1072 }
1073 if len(members) > 1 {
1074 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1075 }
1076 member := members[0]
1077
1078 tx, err := i.Db.Begin()
1079 if err != nil {
1080 return fmt.Errorf("failed to start txn: %w", err)
1081 }
1082 committed := false
1083 defer func() {
1084 if committed {
1085 return
1086 }
1087 tx.Rollback()
1088 i.Enforcer.E.LoadPolicy()
1089 }()
1090
1091 if err = db.RemoveKnotMember(
1092 tx,
1093 orm.FilterEq("did", did),
1094 orm.FilterEq("rkey", rkey),
1095 ); err != nil {
1096 return fmt.Errorf("failed to remove from db: %w", err)
1097 }
1098
1099 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1100 return fmt.Errorf("failed to update ACLs: %w", err)
1101 }
1102
1103 if err = tx.Commit(); err != nil {
1104 return fmt.Errorf("failed to commit txn: %w", err)
1105 }
1106
1107 if err = i.Enforcer.E.SavePolicy(); err != nil {
1108 return fmt.Errorf("failed to save ACLs: %w", err)
1109 }
1110 committed = true
1111
1112 l.Info("removed knot member")
1113 }
1114
1115 return nil
1116}
1117
1118func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error {
1119 did := e.Did
1120 var err error
1121
1122 l := i.Logger.With("handler", "ingestKnot")
1123 l = l.With("nsid", e.Commit.Collection)
1124
1125 switch e.Commit.Operation {
1126 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1127 raw := json.RawMessage(e.Commit.Record)
1128 record := tangled.Knot{}
1129 err = json.Unmarshal(raw, &record)
1130 if err != nil {
1131 l.Error("invalid record", "err", err)
1132 return err
1133 }
1134
1135 domain := e.Commit.RKey
1136
1137 err := db.AddKnot(i.Db, domain, did)
1138 if err != nil {
1139 l.Error("failed to add knot to db", "err", err, "domain", domain)
1140 return err
1141 }
1142
1143 if err := i.verifyKnot(ctx, domain, did); err != nil {
1144 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1145 }
1146
1147 return nil
1148
1149 case jmodels.CommitOperationDelete:
1150 domain := e.Commit.RKey
1151
1152 // get record from db first
1153 registrations, err := db.GetRegistrations(
1154 i.Db,
1155 orm.FilterEq("domain", domain),
1156 orm.FilterEq("did", did),
1157 )
1158 if err != nil {
1159 return fmt.Errorf("failed to get registration: %w", err)
1160 }
1161 if len(registrations) != 1 {
1162 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1163 }
1164 registration := registrations[0]
1165
1166 tx, err := i.Db.Begin()
1167 if err != nil {
1168 return err
1169 }
1170 defer func() {
1171 tx.Rollback()
1172 i.Enforcer.E.LoadPolicy()
1173 }()
1174
1175 err = db.RemoveKnotMember(
1176 tx,
1177 orm.FilterEq("did", did),
1178 orm.FilterEq("domain", domain),
1179 )
1180 if err != nil {
1181 return err
1182 }
1183
1184 err = db.DeleteKnot(
1185 tx,
1186 orm.FilterEq("did", did),
1187 orm.FilterEq("domain", domain),
1188 )
1189 if err != nil {
1190 return err
1191 }
1192
1193 if registration.Registered != nil {
1194 err = i.Enforcer.RemoveKnot(domain)
1195 if err != nil {
1196 return err
1197 }
1198 }
1199
1200 err = tx.Commit()
1201 if err != nil {
1202 return err
1203 }
1204
1205 err = i.Enforcer.E.SavePolicy()
1206 if err != nil {
1207 return err
1208 }
1209 }
1210
1211 return nil
1212}
1213
1214const (
1215 verifyAttempts = 4
1216 verifyMinDelay = 1 * time.Second
1217 verifyMaxDelay = 5 * time.Second
1218)
1219
1220func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1221 regs, err := db.GetRegistrations(i.Db,
1222 orm.FilterEq("domain", domain),
1223 orm.FilterEq("did", did),
1224 )
1225 if err != nil {
1226 return fmt.Errorf("look up registration: %w", err)
1227 }
1228 if len(regs) != 1 {
1229 return fmt.Errorf("no registration for %s by %s", domain, did)
1230 }
1231 if regs[0].Registered != nil {
1232 return nil
1233 }
1234
1235 err = retry.Do(
1236 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1237 retry.Context(ctx),
1238 retry.Attempts(verifyAttempts),
1239 retry.Delay(verifyMinDelay),
1240 retry.MaxDelay(verifyMaxDelay),
1241 retry.DelayType(retry.BackOffDelay),
1242 retry.LastErrorOnly(true),
1243 )
1244 if err != nil {
1245 return fmt.Errorf("verify: %w", err)
1246 }
1247 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1248}
1249
1250func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1251 spindles, err := db.GetSpindles(ctx, i.Db,
1252 orm.FilterEq("instance", instance),
1253 orm.FilterEq("owner", did),
1254 )
1255 if err != nil {
1256 return fmt.Errorf("look up spindle: %w", err)
1257 }
1258 if len(spindles) != 1 {
1259 return fmt.Errorf("no spindle for %s by %s", instance, did)
1260 }
1261 if spindles[0].Verified != nil {
1262 return nil
1263 }
1264
1265 err = retry.Do(
1266 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1267 retry.Context(ctx),
1268 retry.Attempts(verifyAttempts),
1269 retry.Delay(verifyMinDelay),
1270 retry.MaxDelay(verifyMaxDelay),
1271 retry.DelayType(retry.BackOffDelay),
1272 retry.LastErrorOnly(true),
1273 )
1274 if err != nil {
1275 return fmt.Errorf("verify: %w", err)
1276 }
1277 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1278 return err
1279}
1280
1281const sweepConcurrency = 4
1282
1283func (i *Ingester) SweepPendingVerifications() {
1284 l := i.Logger.With("handler", "SweepPendingVerifications")
1285
1286 var g errgroup.Group
1287 g.SetLimit(sweepConcurrency)
1288
1289 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1290 if err != nil {
1291 l.Error("failed to list unverified knots", "err", err)
1292 } else {
1293 for _, reg := range regs {
1294 g.Go(func() error {
1295 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1296 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1297 }
1298 return nil
1299 })
1300 }
1301 }
1302
1303 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1304 if err != nil {
1305 l.Error("failed to list unverified spindles", "err", err)
1306 g.Wait()
1307 return
1308 }
1309 for _, s := range spindles {
1310 g.Go(func() error {
1311 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1312 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1313 }
1314 return nil
1315 })
1316 }
1317 g.Wait()
1318}
1319
1320func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1321 did := e.Did
1322 rkey := e.Commit.RKey
1323
1324 var err error
1325
1326 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1327 l.Info("ingesting record")
1328
1329 switch e.Commit.Operation {
1330 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1331 raw := json.RawMessage(e.Commit.Record)
1332 record := tangled.RepoIssue{}
1333 err = json.Unmarshal(raw, &record)
1334 if err != nil {
1335 l.Error("invalid record", "err", err)
1336 return err
1337 }
1338
1339 issue := models.IssueFromRecord(did, rkey, record)
1340
1341 if issue.RepoDid == "" {
1342 return fmt.Errorf("issue record has no repo field")
1343 }
1344 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1345 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1346 }
1347
1348 if err := i.Validator.ValidateIssue(&issue); err != nil {
1349 return fmt.Errorf("failed to validate issue: %w", err)
1350 }
1351
1352 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1353 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1354 if repoErr == nil && repo.RepoDid != "" {
1355 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1356 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1357 }
1358 }
1359 }
1360
1361 tx, err := i.Db.BeginTx(ctx, nil)
1362 if err != nil {
1363 l.Error("failed to begin transaction", "err", err)
1364 return err
1365 }
1366 defer tx.Rollback()
1367
1368 err = db.PutIssue(tx, &issue)
1369 if err != nil {
1370 l.Error("failed to create issue", "err", err)
1371 return err
1372 }
1373
1374 err = tx.Commit()
1375 if err != nil {
1376 l.Error("failed to commit txn", "err", err)
1377 return err
1378 }
1379
1380 return nil
1381
1382 case jmodels.CommitOperationDelete:
1383 tx, err := i.Db.BeginTx(ctx, nil)
1384 if err != nil {
1385 l.Error("failed to begin transaction", "err", err)
1386 return err
1387 }
1388 defer tx.Rollback()
1389
1390 if err := db.DeleteIssues(
1391 tx,
1392 did,
1393 rkey,
1394 ); err != nil {
1395 l.Error("failed to delete", "err", err)
1396 return fmt.Errorf("failed to delete issue record: %w", err)
1397 }
1398 if err := tx.Commit(); err != nil {
1399 l.Error("failed to commit txn", "err", err)
1400 return err
1401 }
1402
1403 return nil
1404 }
1405
1406 return nil
1407}
1408
1409func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1410 did := e.Did
1411 rkey := e.Commit.RKey
1412
1413 var err error
1414
1415 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1416 l.Info("ingesting record")
1417
1418 switch e.Commit.Operation {
1419 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1420 raw := json.RawMessage(e.Commit.Record)
1421 record := tangled.RepoPull{}
1422 err = json.Unmarshal(raw, &record)
1423 if err != nil {
1424 l.Error("invalid record", "err", err)
1425 return err
1426 }
1427
1428 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1429 if err != nil {
1430 l.Error("failed to resolve did")
1431 return err
1432 }
1433
1434 // go through and fetch all blobs in parallel
1435 readers := make([]*io.ReadCloser, len(record.Rounds))
1436 var mu sync.Mutex
1437
1438 g, gctx := errgroup.WithContext(ctx)
1439
1440 for idx, b := range record.Rounds {
1441 g.Go(func() error {
1442 // for some reason, a blob is empty
1443 if b.PatchBlob == nil {
1444 return fmt.Errorf("missing patchBlob in round %d", idx)
1445 }
1446
1447 ownerPds := ownerId.PDSEndpoint()
1448 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1449 q := url.Query()
1450 q.Set("cid", b.PatchBlob.Ref.String())
1451 q.Set("did", did)
1452 url.RawQuery = q.Encode()
1453
1454 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1455 if err != nil {
1456 l.Error("failed to create request")
1457 return err
1458 }
1459 req.Header.Set("Content-Type", "application/json")
1460
1461 resp, err := http.DefaultClient.Do(req)
1462 if err != nil {
1463 l.Error("failed to make request")
1464 return err
1465 }
1466
1467 mu.Lock()
1468 readers[idx] = &resp.Body
1469 mu.Unlock()
1470
1471 return nil
1472 })
1473 }
1474
1475 if err := g.Wait(); err != nil {
1476 for _, r := range readers {
1477 if r != nil && *r != nil {
1478 (*r).Close()
1479 }
1480 }
1481 return err
1482 }
1483
1484 defer func() {
1485 for _, r := range readers {
1486 if r != nil && *r != nil {
1487 (*r).Close()
1488 }
1489 }
1490 }()
1491
1492 pull, err := models.PullFromRecord(did, rkey, record, readers)
1493 if err != nil {
1494 return fmt.Errorf("failed to parse pull from record: %w", err)
1495 }
1496 if err := i.Validator.ValidatePull(pull); err != nil {
1497 return fmt.Errorf("failed to validate pull: %w", err)
1498 }
1499
1500 tx, err := i.Db.BeginTx(ctx, nil)
1501 if err != nil {
1502 l.Error("failed to begin transaction", "err", err)
1503 return err
1504 }
1505 defer tx.Rollback()
1506
1507 err = db.PutPull(tx, pull)
1508 if err != nil {
1509 l.Error("failed to create pull", "err", err)
1510 return err
1511 }
1512
1513 err = tx.Commit()
1514 if err != nil {
1515 l.Error("failed to commit txn", "err", err)
1516 return err
1517 }
1518
1519 return nil
1520
1521 case jmodels.CommitOperationDelete:
1522 tx, err := i.Db.BeginTx(ctx, nil)
1523 if err != nil {
1524 l.Error("failed to begin transaction", "err", err)
1525 return err
1526 }
1527 defer tx.Rollback()
1528
1529 if err := db.AbandonPulls(
1530 tx,
1531 orm.FilterEq("owner_did", did),
1532 orm.FilterEq("rkey", rkey),
1533 ); err != nil {
1534 l.Error("failed to abandon", "err", err)
1535 return fmt.Errorf("failed to abandon pull record: %w", err)
1536 }
1537 if err := tx.Commit(); err != nil {
1538 l.Error("failed to commit txn", "err", err)
1539 return err
1540 }
1541
1542 return nil
1543 }
1544
1545 return nil
1546}
1547
1548// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1549func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1550 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1551 l.Info("ingesting record")
1552
1553 switch e.Commit.Operation {
1554 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1555 // no-op. sh.tangled.repo.issue.comment is deprecated
1556
1557 case jmodels.CommitOperationDelete:
1558 if err := db.PurgeComments(
1559 i.Db,
1560 orm.FilterEq("did", e.Did),
1561 orm.FilterEq("collection", e.Commit.Collection),
1562 orm.FilterEq("rkey", e.Commit.RKey),
1563 ); err != nil {
1564 return fmt.Errorf("failed to delete comment record: %w", err)
1565 }
1566 }
1567
1568 return nil
1569}
1570
1571// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1572func (i *Ingester) ingestPullComment(e *jmodels.Event) error {
1573 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1574 l.Info("ingesting record")
1575
1576 switch e.Commit.Operation {
1577 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1578 // no-op. sh.tangled.repo.pull.comment is deprecated
1579
1580 case jmodels.CommitOperationDelete:
1581 if err := db.PurgeComments(
1582 i.Db,
1583 orm.FilterEq("did", e.Did),
1584 orm.FilterEq("collection", e.Commit.Collection),
1585 orm.FilterEq("rkey", e.Commit.RKey),
1586 ); err != nil {
1587 return fmt.Errorf("failed to delete comment record: %w", err)
1588 }
1589 }
1590
1591 return nil
1592}
1593
1594func (i *Ingester) ingestComment(e *jmodels.Event) error {
1595 did := e.Did
1596 rkey := e.Commit.RKey
1597 cid := e.Commit.CID
1598
1599 var err error
1600
1601 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1602 l.Info("ingesting record")
1603
1604 ctx := context.Background()
1605
1606 switch e.Commit.Operation {
1607 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1608 raw := json.RawMessage(e.Commit.Record)
1609 record := tangled.FeedComment{}
1610 err = json.Unmarshal(raw, &record)
1611 if err != nil {
1612 return fmt.Errorf("invalid record: %w", err)
1613 }
1614
1615 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1616 if err != nil {
1617 return fmt.Errorf("failed to parse comment from record: %w", err)
1618 }
1619
1620 if err := comment.Validate(); err != nil {
1621 return fmt.Errorf("failed to validate comment: %w", err)
1622 }
1623
1624 var references []syntax.ATURI
1625 if comment.Body.Original != nil {
1626 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1627 }
1628
1629 tx, err := i.Db.Begin()
1630 if err != nil {
1631 return fmt.Errorf("failed to start transaction: %w", err)
1632 }
1633 defer tx.Rollback()
1634
1635 _, err = db.PutComment(tx, comment, references)
1636 if err != nil {
1637 return fmt.Errorf("failed to create comment: %w", err)
1638 }
1639
1640 if err := tx.Commit(); err != nil {
1641 return err
1642 }
1643
1644 case jmodels.CommitOperationDelete:
1645 if err := db.DeleteComments(
1646 i.Db,
1647 orm.FilterEq("did", did),
1648 orm.FilterEq("collection", e.Commit.Collection),
1649 orm.FilterEq("rkey", rkey),
1650 ); err != nil {
1651 return fmt.Errorf("failed to delete comment record: %w", err)
1652 }
1653
1654 return nil
1655 }
1656
1657 return nil
1658}
1659
1660func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1661 did := e.Did
1662 rkey := e.Commit.RKey
1663
1664 var err error
1665
1666 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1667 l.Info("ingesting record")
1668
1669 switch e.Commit.Operation {
1670 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1671 raw := json.RawMessage(e.Commit.Record)
1672 record := tangled.LabelDefinition{}
1673 err = json.Unmarshal(raw, &record)
1674 if err != nil {
1675 return fmt.Errorf("invalid record: %w", err)
1676 }
1677
1678 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1679 if err != nil {
1680 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1681 }
1682
1683 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1684 return fmt.Errorf("failed to validate labeldef: %w", err)
1685 }
1686
1687 _, err = db.AddLabelDefinition(i.Db, def)
1688 if err != nil {
1689 return fmt.Errorf("failed to create labeldef: %w", err)
1690 }
1691
1692 return nil
1693
1694 case jmodels.CommitOperationDelete:
1695 if err := db.DeleteLabelDefinition(
1696 i.Db,
1697 orm.FilterEq("did", did),
1698 orm.FilterEq("rkey", rkey),
1699 ); err != nil {
1700 return fmt.Errorf("failed to delete labeldef record: %w", err)
1701 }
1702
1703 return nil
1704 }
1705
1706 return nil
1707}
1708
1709func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1710 did := e.Did
1711 rkey := e.Commit.RKey
1712
1713 var err error
1714
1715 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1716 l.Info("ingesting record")
1717
1718 switch e.Commit.Operation {
1719 case jmodels.CommitOperationCreate:
1720 raw := json.RawMessage(e.Commit.Record)
1721 record := tangled.LabelOp{}
1722 err = json.Unmarshal(raw, &record)
1723 if err != nil {
1724 return fmt.Errorf("invalid record: %w", err)
1725 }
1726
1727 subject := syntax.ATURI(record.Subject)
1728 collection := subject.Collection()
1729
1730 var repo *models.Repo
1731 switch collection {
1732 case tangled.RepoIssueNSID:
1733 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1734 if err != nil || len(i) != 1 {
1735 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1736 }
1737 repo = i[0].Repo
1738 default:
1739 return fmt.Errorf("unsupported label subject: %s", collection)
1740 }
1741
1742 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1743 if err != nil {
1744 return fmt.Errorf("failed to build label application ctx: %w", err)
1745 }
1746
1747 ops := models.LabelOpsFromRecord(did, rkey, record)
1748
1749 for _, o := range ops {
1750 def, ok := actx.Defs[o.OperandKey]
1751 if !ok {
1752 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1753 }
1754 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1755 return fmt.Errorf("failed to validate labelop: %w", err)
1756 }
1757 }
1758
1759 tx, err := i.Db.Begin()
1760 if err != nil {
1761 return err
1762 }
1763 defer tx.Rollback()
1764
1765 for _, o := range ops {
1766 _, err = db.AddLabelOp(tx, &o)
1767 if err != nil {
1768 return fmt.Errorf("failed to add labelop: %w", err)
1769 }
1770 }
1771
1772 if err = tx.Commit(); err != nil {
1773 return err
1774 }
1775 }
1776
1777 return nil
1778}