Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/mentions"
31 "tangled.org/core/appview/models"
32 "tangled.org/core/appview/notify"
33 "tangled.org/core/appview/repoverify"
34 "tangled.org/core/appview/serververify"
35 "tangled.org/core/appview/validator"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/orm"
38 "tangled.org/core/rbac"
39)
40
41type Ingester struct {
42 Ctx context.Context
43 Db *db.DB
44 Enforcer *rbac.Enforcer
45 IdResolver *idresolver.Resolver
46 Cache *cache.Cache
47 Config *config.Config
48 Logger *slog.Logger
49 Validator *validator.Validator
50 MentionsResolver *mentions.Resolver
51 Notifier notify.Notifier
52 Verifier repoverify.Verifier
53}
54
55type processFunc func(ctx context.Context, e *jmodels.Event) error
56
57func (i *Ingester) Ingest() processFunc {
58 return func(ctx context.Context, e *jmodels.Event) error {
59 var err error
60
61 l := i.Logger.With("kind", e.Kind)
62 switch e.Kind {
63 case jmodels.EventKindAccount:
64 // TODO: sync account state to db
65 if e.Account.Active {
66 break
67 }
68 // TODO: revoke sessions by DID
69 if *e.Account.Status == "deactivated" {
70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
71 }
72 case jmodels.EventKindIdentity:
73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
74 case jmodels.EventKindCommit:
75 switch e.Commit.Collection {
76 case tangled.GraphFollowNSID:
77 err = i.ingestFollow(e)
78 case tangled.GraphVouchNSID:
79 err = i.ingestVouch(ctx, e)
80 case tangled.FeedStarNSID:
81 err = i.ingestStar(ctx, e)
82 case tangled.PublicKeyNSID:
83 err = i.ingestPublicKey(e)
84 case tangled.RepoArtifactNSID:
85 err = i.ingestArtifact(ctx, e)
86 case tangled.ActorProfileNSID:
87 err = i.ingestProfile(ctx, e)
88 case tangled.SpindleMemberNSID:
89 err = i.ingestSpindleMember(ctx, e)
90 case tangled.SpindleNSID:
91 err = i.ingestSpindle(ctx, e)
92 case tangled.KnotMemberNSID:
93 err = i.ingestKnotMember(ctx, e)
94 case tangled.KnotNSID:
95 err = i.ingestKnot(ctx, e)
96 case tangled.StringNSID:
97 err = i.ingestString(e)
98 case tangled.RepoIssueNSID:
99 err = i.ingestIssue(ctx, e)
100 case tangled.RepoPullNSID:
101 err = i.ingestPull(ctx, e)
102 case tangled.FeedCommentNSID:
103 err = i.ingestComment(e)
104 case tangled.RepoIssueCommentNSID:
105 err = i.ingestIssueComment(e)
106 case tangled.RepoPullCommentNSID:
107 err = i.ingestPullComment(e)
108 case tangled.LabelDefinitionNSID:
109 err = i.ingestLabelDefinition(e)
110 case tangled.LabelOpNSID:
111 err = i.ingestLabelOp(e)
112 case tangled.RepoNSID:
113 err = i.ingestRepo(ctx, e)
114 }
115 l = i.Logger.With("nsid", e.Commit.Collection)
116 }
117
118 if err != nil {
119 l.Warn("failed to ingest record, skipping", "err", err)
120 }
121
122 lastTimeUs := e.TimeUS + 1
123 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
124 l.Error("failed to save cursor", "err", saveErr)
125 }
126
127 return nil
128 }
129}
130
131func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
132 if strings.HasPrefix(ref, "did:") {
133 return db.GetRepoByDid(i.Db, ref)
134 }
135 return db.GetRepoByAtUri(i.Db, ref)
136}
137
138func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
139 var legacy struct {
140 Subject *string `json:"subject"`
141 SubjectDid *string `json:"subjectDid"`
142 }
143 if err := json.Unmarshal(raw, &legacy); err != nil {
144 return false, err
145 }
146
147 switch {
148 case legacy.SubjectDid != nil:
149 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
150 if err != nil {
151 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
152 return false, nil
153 }
154 star.SubjectType = models.StarSubjectRepo
155 star.Subject = repo.RepoDid
156 return true, nil
157
158 case legacy.Subject != nil:
159 uri, err := syntax.ParseATURI(*legacy.Subject)
160 if err != nil {
161 return false, fmt.Errorf("invalid old-format star subject: %w", err)
162 }
163 switch uri.Collection().String() {
164 case tangled.RepoNSID:
165 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
166 if err != nil {
167 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
168 return false, nil
169 }
170 star.SubjectType = models.StarSubjectRepo
171 star.Subject = repo.RepoDid
172 return true, nil
173 default:
174 star.SubjectType = models.StarSubjectString
175 star.Subject = *legacy.Subject
176 return true, nil
177 }
178
179 default:
180 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
181 }
182}
183
184func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
185 var err error
186 did := e.Did
187
188 l := i.Logger.With("handler", "ingestStar")
189 l = l.With("nsid", e.Commit.Collection)
190
191 switch e.Commit.Operation {
192 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
193 raw := json.RawMessage(e.Commit.Record)
194 record := tangled.FeedStar{}
195 unmarshalErr := json.Unmarshal(raw, &record)
196
197 star := &models.Star{
198 Did: did,
199 Rkey: e.Commit.RKey,
200 }
201
202 switch {
203 case unmarshalErr != nil:
204 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
205 if resolveErr != nil {
206 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
207 return unmarshalErr
208 }
209 if !resolved {
210 return nil
211 }
212
213 case record.Subject == nil:
214 return fmt.Errorf("star record has nil subject")
215
216 case record.Subject.FeedStar_Repo != nil:
217 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
218 if repoErr != nil {
219 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
220 return nil
221 }
222 star.SubjectType = models.StarSubjectRepo
223 star.Subject = repo.RepoDid
224
225 case record.Subject.FeedStar_String != nil:
226 star.SubjectType = models.StarSubjectString
227 star.Subject = record.Subject.FeedStar_String.Uri
228
229 default:
230 return fmt.Errorf("star record has empty subject union")
231 }
232
233 err = db.AddStar(i.Db, star)
234 case jmodels.CommitOperationDelete:
235 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
236 }
237
238 if err != nil {
239 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
240 }
241
242 return nil
243}
244
245func (i *Ingester) ingestFollow(e *jmodels.Event) error {
246 var err error
247 did := e.Did
248
249 l := i.Logger.With("handler", "ingestFollow")
250 l = l.With("nsid", e.Commit.Collection)
251
252 switch e.Commit.Operation {
253 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
254 raw := json.RawMessage(e.Commit.Record)
255 record := tangled.GraphFollow{}
256 err = json.Unmarshal(raw, &record)
257 if err != nil {
258 l.Error("invalid record", "err", err)
259 return err
260 }
261
262 err = db.AddFollow(i.Db, &models.Follow{
263 UserDid: did,
264 SubjectDid: record.Subject,
265 Rkey: e.Commit.RKey,
266 })
267 case jmodels.CommitOperationDelete:
268 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
269 }
270
271 if err != nil {
272 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
273 }
274
275 return nil
276}
277
278func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
279 var err error
280 did := e.Did
281
282 l := i.Logger.With("handler", "ingestVouch")
283 l = l.With("nsid", e.Commit.Collection)
284 l.Info("ingesting vouch")
285
286 switch e.Commit.Operation {
287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
288 raw := json.RawMessage(e.Commit.Record)
289 record := tangled.GraphVouch{}
290 err = json.Unmarshal(raw, &record)
291 if err != nil {
292 l.Error("invalid record", "err", err)
293 return err
294 }
295
296 // rkey is the subject_did being vouched for/denounced
297 subjectDID := e.Commit.RKey
298
299 _, err = syntax.ParseDID(subjectDID)
300 if err != nil {
301 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
302 return fmt.Errorf("invalid subject_did: %w", err)
303 }
304
305 if did == subjectDID {
306 l.Warn("attempted self-vouch", "did", did)
307 return fmt.Errorf("cannot vouch for self")
308 }
309
310 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
311 if err != nil {
312 return err
313 }
314
315 if subjectId.Handle.IsInvalidHandle() {
316 return err
317 }
318
319 kind, err := models.ParseVouchKind(record.Kind)
320 if err != nil {
321 l.Error("invalid kind", "kind", kind)
322 return fmt.Errorf("invalid kind: %s", kind)
323 }
324
325 recordCid, err := cid.Parse(e.Commit.CID)
326 if err != nil {
327 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
328 return fmt.Errorf("invalid cid: %w", err)
329 }
330
331 var evidences []syntax.ATURI
332 for _, raw := range record.Evidences {
333 uri, parseErr := syntax.ParseATURI(raw)
334 if parseErr != nil {
335 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
336 continue
337 }
338 evidences = append(evidences, uri)
339 }
340
341 tx, txErr := i.Db.Begin()
342 if txErr != nil {
343 return fmt.Errorf("failed to start transaction: %w", txErr)
344 }
345
346 addErr := db.AddVouch(tx, &models.Vouch{
347 Did: syntax.DID(did),
348 SubjectDid: subjectId.DID,
349 Cid: recordCid,
350 Kind: kind,
351 Reason: record.Reason,
352 Evidences: evidences,
353 })
354 if addErr != nil {
355 tx.Rollback()
356 err = addErr
357 } else {
358 err = tx.Commit()
359 }
360
361 case jmodels.CommitOperationDelete:
362 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
363 }
364
365 if err != nil {
366 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
367 }
368
369 return nil
370}
371
372func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
373 did := e.Did
374 var err error
375
376 l := i.Logger.With("handler", "ingestPublicKey")
377 l = l.With("nsid", e.Commit.Collection)
378
379 switch e.Commit.Operation {
380 case jmodels.CommitOperationCreate:
381 l.Debug("processing add of pubkey")
382 raw := json.RawMessage(e.Commit.Record)
383 record := tangled.PublicKey{}
384 err = json.Unmarshal(raw, &record)
385 if err != nil {
386 l.Error("invalid record", "err", err)
387 return err
388 }
389
390 name := record.Name
391 key := record.Key
392 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
393 case jmodels.CommitOperationUpdate:
394 l.Debug("processing update of pubkey")
395 raw := json.RawMessage(e.Commit.Record)
396 record := tangled.PublicKey{}
397 err = json.Unmarshal(raw, &record)
398 if err != nil {
399 l.Error("invalid record", "err", err)
400 return err
401 }
402
403 name := record.Name
404 key := record.Key
405 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
406 case jmodels.CommitOperationDelete:
407 l.Debug("processing delete of pubkey")
408 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
409 }
410
411 if err != nil {
412 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
413 }
414
415 return nil
416}
417
418func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
419 did := e.Did
420 var err error
421
422 l := i.Logger.With("handler", "ingestArtifact")
423 l = l.With("nsid", e.Commit.Collection)
424
425 switch e.Commit.Operation {
426 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
427 raw := json.RawMessage(e.Commit.Record)
428 record := tangled.RepoArtifact{}
429 err = json.Unmarshal(raw, &record)
430 if err != nil {
431 l.Error("invalid record", "err", err)
432 return err
433 }
434
435 var repo *models.Repo
436 if record.RepoDid != nil && *record.RepoDid != "" {
437 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
438 if err != nil && !errors.Is(err, sql.ErrNoRows) {
439 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
440 }
441 }
442 if repo == nil && record.Repo != nil {
443 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
444 if parseErr != nil {
445 return parseErr
446 }
447 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
448 if err != nil {
449 return err
450 }
451 }
452 if repo == nil {
453 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
454 }
455
456 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
457 if err != nil || !ok {
458 return err
459 }
460
461 repoDid := repo.RepoDid
462 if repoDid == "" && record.RepoDid != nil {
463 repoDid = *record.RepoDid
464 }
465 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
466 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
467 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
468 }
469 }
470
471 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
472 if err != nil {
473 createdAt = time.Now()
474 }
475
476 artifact := models.Artifact{
477 Did: did,
478 Rkey: e.Commit.RKey,
479 RepoDid: syntax.DID(repo.RepoDid),
480 Tag: plumbing.Hash(record.Tag),
481 CreatedAt: createdAt,
482 BlobCid: cid.Cid(record.Artifact.Ref),
483 Name: record.Name,
484 Size: uint64(record.Artifact.Size),
485 MimeType: record.Artifact.MimeType,
486 }
487
488 err = db.AddArtifact(i.Db, artifact)
489 case jmodels.CommitOperationDelete:
490 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
491 }
492
493 if err != nil {
494 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
495 }
496
497 return nil
498}
499
500func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
501 did := e.Did
502 var err error
503
504 l := i.Logger.With("handler", "ingestProfile")
505 l = l.With("nsid", e.Commit.Collection)
506
507 if e.Commit.RKey != "self" {
508 return fmt.Errorf("ingestProfile only ingests `self` record")
509 }
510
511 switch e.Commit.Operation {
512 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
513 raw := json.RawMessage(e.Commit.Record)
514 record := tangled.ActorProfile{}
515 err = json.Unmarshal(raw, &record)
516 if err != nil {
517 l.Error("invalid record", "err", err)
518 return err
519 }
520
521 avatar := ""
522 if record.Avatar != nil {
523 avatar = record.Avatar.Ref.String()
524 }
525
526 description := ""
527 if record.Description != nil {
528 description = *record.Description
529 }
530
531 includeBluesky := record.Bluesky
532
533 pronouns := ""
534 if record.Pronouns != nil {
535 pronouns = *record.Pronouns
536 }
537
538 location := ""
539 if record.Location != nil {
540 location = *record.Location
541 }
542
543 var links [5]string
544 for i, l := range record.Links {
545 if i < 5 {
546 links[i] = l
547 }
548 }
549
550 var stats [2]models.VanityStat
551 for i, s := range record.Stats {
552 if i < 2 {
553 stats[i].Kind = models.ParseVanityStatKind(s)
554 }
555 }
556
557 var pinned [6]string
558 for i, r := range record.PinnedRepositories {
559 if i < 6 {
560 pinned[i] = r
561 }
562 }
563
564 var preferredHandle syntax.Handle
565 if record.PreferredHandle != nil {
566 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
567 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
568 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
569 preferredHandle = h
570 }
571 }
572 }
573
574 profile := models.Profile{
575 Did: did,
576 Avatar: avatar,
577 Description: description,
578 IncludeBluesky: includeBluesky,
579 Location: location,
580 Links: links,
581 Stats: stats,
582 PinnedRepos: pinned,
583 Pronouns: pronouns,
584 PreferredHandle: preferredHandle,
585 }
586
587 tx, err := i.Db.Begin()
588 if err != nil {
589 return fmt.Errorf("failed to start transaction: %w", err)
590 }
591
592 err = db.ValidateProfile(tx, &profile)
593 if err != nil {
594 return fmt.Errorf("invalid profile record")
595 }
596
597 err = db.UpsertProfile(tx, &profile)
598 if err == nil && i.Cache != nil {
599 pipe := i.Cache.Pipeline()
600 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
601 if preferredHandle != "" {
602 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
603 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
604 } else {
605 pipe.Del(ctx, didKey)
606 }
607 if _, execErr := pipe.Exec(ctx); execErr != nil {
608 l.Warn("failed to update preferred handle cache", "err", execErr)
609 }
610 }
611 case jmodels.CommitOperationDelete:
612 tx, beginErr := i.Db.Begin()
613 if beginErr != nil {
614 return fmt.Errorf("failed to start transaction: %w", beginErr)
615 }
616
617 priorHandle, phErr := db.GetPreferredHandle(tx, did)
618 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
619 l.Warn("failed to read prior preferred handle", "err", phErr)
620 }
621
622 err = db.DeleteProfile(tx, did)
623 if err == nil && i.Cache != nil {
624 pipe := i.Cache.Pipeline()
625 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
626 if priorHandle != "" {
627 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
628 }
629 if _, execErr := pipe.Exec(ctx); execErr != nil {
630 l.Warn("failed to evict preferred handle cache", "err", execErr)
631 }
632 }
633 }
634
635 if err != nil {
636 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
637 }
638
639 return nil
640}
641
642func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
643 did := e.Did
644 var err error
645
646 l := i.Logger.With("handler", "ingestSpindleMember")
647 l = l.With("nsid", e.Commit.Collection)
648
649 switch e.Commit.Operation {
650 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
651 raw := json.RawMessage(e.Commit.Record)
652 record := tangled.SpindleMember{}
653 err = json.Unmarshal(raw, &record)
654 if err != nil {
655 l.Error("invalid record", "err", err)
656 return err
657 }
658
659 // only spindle owner can invite to spindles
660 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
661 if err != nil {
662 return fmt.Errorf("failed to check invite permission: %w", err)
663 }
664 if !ok {
665 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
666 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
667 }
668 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
669 if err != nil {
670 return fmt.Errorf("failed to re-check invite permission: %w", err)
671 }
672 if !ok {
673 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
674 }
675 }
676
677 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
678 if err != nil {
679 return err
680 }
681
682 if memberId.Handle.IsInvalidHandle() {
683 return fmt.Errorf("invalid handle for member %s", record.Subject)
684 }
685
686 existing, err := db.GetSpindleMembers(i.Db,
687 orm.FilterEq("did", did),
688 orm.FilterEq("rkey", e.Commit.RKey),
689 )
690 if err != nil {
691 return fmt.Errorf("failed to look up existing member: %w", err)
692 }
693 if len(existing) > 1 {
694 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
695 }
696
697 tx, err := i.Db.Begin()
698 if err != nil {
699 return fmt.Errorf("failed to start txn: %w", err)
700 }
701 committed := false
702 defer func() {
703 if committed {
704 return
705 }
706 tx.Rollback()
707 i.Enforcer.E.LoadPolicy()
708 }()
709
710 if len(existing) == 1 {
711 prev := existing[0]
712 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
713 if err = db.RemoveSpindleMember(tx,
714 orm.FilterEq("did", did),
715 orm.FilterEq("rkey", e.Commit.RKey),
716 ); err != nil {
717 return fmt.Errorf("failed to remove stale row: %w", err)
718 }
719 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
720 return fmt.Errorf("failed to remove stale ACL: %w", err)
721 }
722 }
723 }
724
725 if err = db.AddSpindleMember(tx, models.SpindleMember{
726 Did: syntax.DID(did),
727 Rkey: e.Commit.RKey,
728 Instance: record.Instance,
729 Subject: memberId.DID,
730 }); err != nil {
731 return fmt.Errorf("failed to add to db: %w", err)
732 }
733
734 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
735 return fmt.Errorf("failed to update ACLs: %w", err)
736 }
737
738 if err = tx.Commit(); err != nil {
739 return fmt.Errorf("failed to commit txn: %w", err)
740 }
741
742 if err = i.Enforcer.E.SavePolicy(); err != nil {
743 return fmt.Errorf("failed to save ACLs: %w", err)
744 }
745 committed = true
746
747 l.Info("upserted spindle member")
748 case jmodels.CommitOperationDelete:
749 rkey := e.Commit.RKey
750
751 // get record from db first
752 members, err := db.GetSpindleMembers(
753 i.Db,
754 orm.FilterEq("did", did),
755 orm.FilterEq("rkey", rkey),
756 )
757 if err != nil || len(members) != 1 {
758 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
759 }
760 member := members[0]
761
762 tx, err := i.Db.Begin()
763 if err != nil {
764 return fmt.Errorf("failed to start txn: %w", err)
765 }
766 committed := false
767 defer func() {
768 if committed {
769 return
770 }
771 tx.Rollback()
772 i.Enforcer.E.LoadPolicy()
773 }()
774
775 // remove record by rkey && update enforcer
776 if err = db.RemoveSpindleMember(
777 tx,
778 orm.FilterEq("did", did),
779 orm.FilterEq("rkey", rkey),
780 ); err != nil {
781 return fmt.Errorf("failed to remove from db: %w", err)
782 }
783
784 // update enforcer
785 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
786 if err != nil {
787 return fmt.Errorf("failed to update ACLs: %w", err)
788 }
789
790 if err = tx.Commit(); err != nil {
791 return fmt.Errorf("failed to commit txn: %w", err)
792 }
793
794 if err = i.Enforcer.E.SavePolicy(); err != nil {
795 return fmt.Errorf("failed to save ACLs: %w", err)
796 }
797 committed = true
798
799 l.Info("removed spindle member")
800 }
801
802 return nil
803}
804
805func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
806 did := e.Did
807 var err error
808
809 l := i.Logger.With("handler", "ingestSpindle")
810 l = l.With("nsid", e.Commit.Collection)
811
812 switch e.Commit.Operation {
813 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
814 raw := json.RawMessage(e.Commit.Record)
815 record := tangled.Spindle{}
816 err = json.Unmarshal(raw, &record)
817 if err != nil {
818 l.Error("invalid record", "err", err)
819 return err
820 }
821
822 instance := e.Commit.RKey
823
824 err := db.AddSpindle(i.Db, models.Spindle{
825 Owner: syntax.DID(did),
826 Instance: instance,
827 })
828 if err != nil {
829 l.Error("failed to add spindle to db", "err", err, "instance", instance)
830 return err
831 }
832
833 if err := i.verifySpindle(ctx, instance, did); err != nil {
834 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
835 }
836
837 return nil
838
839 case jmodels.CommitOperationDelete:
840 instance := e.Commit.RKey
841
842 // get record from db first
843 spindles, err := db.GetSpindles(
844 ctx,
845 i.Db,
846 orm.FilterEq("owner", did),
847 orm.FilterEq("instance", instance),
848 )
849 if err != nil || len(spindles) != 1 {
850 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
851 }
852 spindle := spindles[0]
853
854 tx, err := i.Db.Begin()
855 if err != nil {
856 return err
857 }
858 defer func() {
859 tx.Rollback()
860 i.Enforcer.E.LoadPolicy()
861 }()
862
863 // remove spindle members first
864 err = db.RemoveSpindleMember(
865 tx,
866 orm.FilterEq("owner", did),
867 orm.FilterEq("instance", instance),
868 )
869 if err != nil {
870 return err
871 }
872
873 err = db.DeleteSpindle(
874 tx,
875 orm.FilterEq("owner", did),
876 orm.FilterEq("instance", instance),
877 )
878 if err != nil {
879 return err
880 }
881
882 if spindle.Verified != nil {
883 err = i.Enforcer.RemoveSpindle(instance)
884 if err != nil {
885 return err
886 }
887 }
888
889 err = tx.Commit()
890 if err != nil {
891 return err
892 }
893
894 err = i.Enforcer.E.SavePolicy()
895 if err != nil {
896 return err
897 }
898 }
899
900 return nil
901}
902
903func (i *Ingester) ingestString(e *jmodels.Event) error {
904 did := e.Did
905 rkey := e.Commit.RKey
906
907 var err error
908
909 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
910 l.Info("ingesting record")
911
912 switch e.Commit.Operation {
913 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
914 raw := json.RawMessage(e.Commit.Record)
915 record := tangled.String{}
916 err = json.Unmarshal(raw, &record)
917 if err != nil {
918 l.Error("invalid record", "err", err)
919 return err
920 }
921
922 string := models.StringFromRecord(did, rkey, record)
923
924 if err = i.Validator.ValidateString(&string); err != nil {
925 l.Error("invalid record", "err", err)
926 return err
927 }
928
929 if err = db.AddString(i.Db, string); err != nil {
930 l.Error("failed to add string", "err", err)
931 return err
932 }
933
934 return nil
935
936 case jmodels.CommitOperationDelete:
937 if err := db.DeleteString(
938 i.Db,
939 orm.FilterEq("did", did),
940 orm.FilterEq("rkey", rkey),
941 ); err != nil {
942 l.Error("failed to delete", "err", err)
943 return fmt.Errorf("failed to delete string record: %w", err)
944 }
945
946 return nil
947 }
948
949 return nil
950}
951
952func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error {
953 did := e.Did
954 var err error
955
956 l := i.Logger.With("handler", "ingestKnotMember")
957 l = l.With("nsid", e.Commit.Collection)
958
959 switch e.Commit.Operation {
960 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
961 raw := json.RawMessage(e.Commit.Record)
962 record := tangled.KnotMember{}
963 err = json.Unmarshal(raw, &record)
964 if err != nil {
965 l.Error("invalid record", "err", err)
966 return err
967 }
968
969 // only knot owner can invite to knots
970 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
971 if err != nil {
972 return fmt.Errorf("failed to check invite permission: %w", err)
973 }
974 if !ok {
975 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
976 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
977 }
978 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
979 if err != nil {
980 return fmt.Errorf("failed to re-check invite permission: %w", err)
981 }
982 if !ok {
983 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
984 }
985 }
986
987 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
988 if err != nil {
989 return err
990 }
991
992 if memberId.Handle.IsInvalidHandle() {
993 return fmt.Errorf("invalid handle for member %s", record.Subject)
994 }
995
996 existing, err := db.GetKnotMembers(i.Db,
997 orm.FilterEq("did", did),
998 orm.FilterEq("rkey", e.Commit.RKey),
999 )
1000 if err != nil {
1001 return fmt.Errorf("failed to look up existing member: %w", err)
1002 }
1003 if len(existing) > 1 {
1004 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1005 }
1006
1007 tx, err := i.Db.Begin()
1008 if err != nil {
1009 return fmt.Errorf("failed to start txn: %w", err)
1010 }
1011 committed := false
1012 defer func() {
1013 if committed {
1014 return
1015 }
1016 tx.Rollback()
1017 i.Enforcer.E.LoadPolicy()
1018 }()
1019
1020 if len(existing) == 1 {
1021 prev := existing[0]
1022 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1023 if err = db.RemoveKnotMember(tx,
1024 orm.FilterEq("did", did),
1025 orm.FilterEq("rkey", e.Commit.RKey),
1026 ); err != nil {
1027 return fmt.Errorf("failed to remove stale row: %w", err)
1028 }
1029 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1030 return fmt.Errorf("failed to remove stale ACL: %w", err)
1031 }
1032 }
1033 }
1034
1035 if err = db.AddKnotMember(tx, models.KnotMember{
1036 Did: syntax.DID(did),
1037 Rkey: e.Commit.RKey,
1038 Domain: record.Domain,
1039 Subject: memberId.DID,
1040 }); err != nil {
1041 return fmt.Errorf("failed to add to db: %w", err)
1042 }
1043
1044 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1045 return fmt.Errorf("failed to update ACLs: %w", err)
1046 }
1047
1048 if err = tx.Commit(); err != nil {
1049 return fmt.Errorf("failed to commit txn: %w", err)
1050 }
1051
1052 if err = i.Enforcer.E.SavePolicy(); err != nil {
1053 return fmt.Errorf("failed to save ACLs: %w", err)
1054 }
1055 committed = true
1056
1057 l.Info("upserted knot member")
1058 case jmodels.CommitOperationDelete:
1059 rkey := e.Commit.RKey
1060
1061 members, err := db.GetKnotMembers(
1062 i.Db,
1063 orm.FilterEq("did", did),
1064 orm.FilterEq("rkey", rkey),
1065 )
1066 if err != nil {
1067 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1068 }
1069 if len(members) == 0 {
1070 l.Info("knot member already removed", "rkey", rkey)
1071 return nil
1072 }
1073 if len(members) > 1 {
1074 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1075 }
1076 member := members[0]
1077
1078 tx, err := i.Db.Begin()
1079 if err != nil {
1080 return fmt.Errorf("failed to start txn: %w", err)
1081 }
1082 committed := false
1083 defer func() {
1084 if committed {
1085 return
1086 }
1087 tx.Rollback()
1088 i.Enforcer.E.LoadPolicy()
1089 }()
1090
1091 if err = db.RemoveKnotMember(
1092 tx,
1093 orm.FilterEq("did", did),
1094 orm.FilterEq("rkey", rkey),
1095 ); err != nil {
1096 return fmt.Errorf("failed to remove from db: %w", err)
1097 }
1098
1099 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1100 return fmt.Errorf("failed to update ACLs: %w", err)
1101 }
1102
1103 if err = tx.Commit(); err != nil {
1104 return fmt.Errorf("failed to commit txn: %w", err)
1105 }
1106
1107 if err = i.Enforcer.E.SavePolicy(); err != nil {
1108 return fmt.Errorf("failed to save ACLs: %w", err)
1109 }
1110 committed = true
1111
1112 l.Info("removed knot member")
1113 }
1114
1115 return nil
1116}
1117
1118func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error {
1119 did := e.Did
1120 var err error
1121
1122 l := i.Logger.With("handler", "ingestKnot")
1123 l = l.With("nsid", e.Commit.Collection)
1124
1125 switch e.Commit.Operation {
1126 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1127 raw := json.RawMessage(e.Commit.Record)
1128 record := tangled.Knot{}
1129 err = json.Unmarshal(raw, &record)
1130 if err != nil {
1131 l.Error("invalid record", "err", err)
1132 return err
1133 }
1134
1135 domain := e.Commit.RKey
1136
1137 err := db.AddKnot(i.Db, domain, did)
1138 if err != nil {
1139 l.Error("failed to add knot to db", "err", err, "domain", domain)
1140 return err
1141 }
1142
1143 if err := i.verifyKnot(ctx, domain, did); err != nil {
1144 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1145 }
1146
1147 return nil
1148
1149 case jmodels.CommitOperationDelete:
1150 domain := e.Commit.RKey
1151
1152 // get record from db first
1153 registrations, err := db.GetRegistrations(
1154 i.Db,
1155 orm.FilterEq("domain", domain),
1156 orm.FilterEq("did", did),
1157 )
1158 if err != nil {
1159 return fmt.Errorf("failed to get registration: %w", err)
1160 }
1161 if len(registrations) != 1 {
1162 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1163 }
1164 registration := registrations[0]
1165
1166 tx, err := i.Db.Begin()
1167 if err != nil {
1168 return err
1169 }
1170 defer func() {
1171 tx.Rollback()
1172 i.Enforcer.E.LoadPolicy()
1173 }()
1174
1175 err = db.RemoveKnotMember(
1176 tx,
1177 orm.FilterEq("did", did),
1178 orm.FilterEq("domain", domain),
1179 )
1180 if err != nil {
1181 return err
1182 }
1183
1184 err = db.DeleteKnot(
1185 tx,
1186 orm.FilterEq("did", did),
1187 orm.FilterEq("domain", domain),
1188 )
1189 if err != nil {
1190 return err
1191 }
1192
1193 err = db.RemoveReposByKnot(tx, domain)
1194 if err != nil {
1195 return err
1196 }
1197
1198 if registration.Registered != nil {
1199 err = i.Enforcer.RemoveKnot(domain)
1200 if err != nil {
1201 return err
1202 }
1203 }
1204
1205 err = tx.Commit()
1206 if err != nil {
1207 return err
1208 }
1209
1210 err = i.Enforcer.E.SavePolicy()
1211 if err != nil {
1212 return err
1213 }
1214 }
1215
1216 return nil
1217}
1218
1219const (
1220 verifyAttempts = 4
1221 verifyMinDelay = 1 * time.Second
1222 verifyMaxDelay = 5 * time.Second
1223)
1224
1225func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1226 regs, err := db.GetRegistrations(i.Db,
1227 orm.FilterEq("domain", domain),
1228 orm.FilterEq("did", did),
1229 )
1230 if err != nil {
1231 return fmt.Errorf("look up registration: %w", err)
1232 }
1233 if len(regs) != 1 {
1234 return fmt.Errorf("no registration for %s by %s", domain, did)
1235 }
1236 if regs[0].Registered != nil {
1237 return nil
1238 }
1239
1240 err = retry.Do(
1241 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1242 retry.Context(ctx),
1243 retry.Attempts(verifyAttempts),
1244 retry.Delay(verifyMinDelay),
1245 retry.MaxDelay(verifyMaxDelay),
1246 retry.DelayType(retry.BackOffDelay),
1247 retry.LastErrorOnly(true),
1248 )
1249 if err != nil {
1250 return fmt.Errorf("verify: %w", err)
1251 }
1252 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1253}
1254
1255func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1256 spindles, err := db.GetSpindles(ctx, i.Db,
1257 orm.FilterEq("instance", instance),
1258 orm.FilterEq("owner", did),
1259 )
1260 if err != nil {
1261 return fmt.Errorf("look up spindle: %w", err)
1262 }
1263 if len(spindles) != 1 {
1264 return fmt.Errorf("no spindle for %s by %s", instance, did)
1265 }
1266 if spindles[0].Verified != nil {
1267 return nil
1268 }
1269
1270 err = retry.Do(
1271 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1272 retry.Context(ctx),
1273 retry.Attempts(verifyAttempts),
1274 retry.Delay(verifyMinDelay),
1275 retry.MaxDelay(verifyMaxDelay),
1276 retry.DelayType(retry.BackOffDelay),
1277 retry.LastErrorOnly(true),
1278 )
1279 if err != nil {
1280 return fmt.Errorf("verify: %w", err)
1281 }
1282 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1283 return err
1284}
1285
1286const sweepConcurrency = 4
1287
1288func (i *Ingester) SweepPendingVerifications() {
1289 l := i.Logger.With("handler", "SweepPendingVerifications")
1290
1291 var g errgroup.Group
1292 g.SetLimit(sweepConcurrency)
1293
1294 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1295 if err != nil {
1296 l.Error("failed to list unverified knots", "err", err)
1297 } else {
1298 for _, reg := range regs {
1299 g.Go(func() error {
1300 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1301 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1302 }
1303 return nil
1304 })
1305 }
1306 }
1307
1308 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1309 if err != nil {
1310 l.Error("failed to list unverified spindles", "err", err)
1311 g.Wait()
1312 return
1313 }
1314 for _, s := range spindles {
1315 g.Go(func() error {
1316 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1317 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1318 }
1319 return nil
1320 })
1321 }
1322 g.Wait()
1323}
1324
1325func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1326 did := e.Did
1327 rkey := e.Commit.RKey
1328
1329 var err error
1330
1331 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1332 l.Info("ingesting record")
1333
1334 switch e.Commit.Operation {
1335 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1336 raw := json.RawMessage(e.Commit.Record)
1337 record := tangled.RepoIssue{}
1338 err = json.Unmarshal(raw, &record)
1339 if err != nil {
1340 l.Error("invalid record", "err", err)
1341 return err
1342 }
1343
1344 issue := models.IssueFromRecord(did, rkey, record)
1345
1346 if issue.RepoDid == "" {
1347 return fmt.Errorf("issue record has no repo field")
1348 }
1349 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1350 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1351 }
1352
1353 if err := i.Validator.ValidateIssue(&issue); err != nil {
1354 return fmt.Errorf("failed to validate issue: %w", err)
1355 }
1356
1357 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1358 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1359 if repoErr == nil && repo.RepoDid != "" {
1360 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1361 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1362 }
1363 }
1364 }
1365
1366 tx, err := i.Db.BeginTx(ctx, nil)
1367 if err != nil {
1368 l.Error("failed to begin transaction", "err", err)
1369 return err
1370 }
1371 defer tx.Rollback()
1372
1373 err = db.PutIssue(tx, &issue)
1374 if err != nil {
1375 l.Error("failed to create issue", "err", err)
1376 return err
1377 }
1378
1379 err = tx.Commit()
1380 if err != nil {
1381 l.Error("failed to commit txn", "err", err)
1382 return err
1383 }
1384
1385 return nil
1386
1387 case jmodels.CommitOperationDelete:
1388 tx, err := i.Db.BeginTx(ctx, nil)
1389 if err != nil {
1390 l.Error("failed to begin transaction", "err", err)
1391 return err
1392 }
1393 defer tx.Rollback()
1394
1395 if err := db.DeleteIssues(
1396 tx,
1397 did,
1398 rkey,
1399 ); err != nil {
1400 l.Error("failed to delete", "err", err)
1401 return fmt.Errorf("failed to delete issue record: %w", err)
1402 }
1403 if err := tx.Commit(); err != nil {
1404 l.Error("failed to commit txn", "err", err)
1405 return err
1406 }
1407
1408 return nil
1409 }
1410
1411 return nil
1412}
1413
1414func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1415 did := e.Did
1416 rkey := e.Commit.RKey
1417
1418 var err error
1419
1420 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1421 l.Info("ingesting record")
1422
1423 switch e.Commit.Operation {
1424 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1425 raw := json.RawMessage(e.Commit.Record)
1426 record := tangled.RepoPull{}
1427 err = json.Unmarshal(raw, &record)
1428 if err != nil {
1429 l.Error("invalid record", "err", err)
1430 return err
1431 }
1432
1433 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1434 if err != nil {
1435 l.Error("failed to resolve did")
1436 return err
1437 }
1438
1439 // go through and fetch all blobs in parallel
1440 readers := make([]*io.ReadCloser, len(record.Rounds))
1441 var mu sync.Mutex
1442
1443 g, gctx := errgroup.WithContext(ctx)
1444
1445 for idx, b := range record.Rounds {
1446 g.Go(func() error {
1447 // for some reason, a blob is empty
1448 if b.PatchBlob == nil {
1449 return fmt.Errorf("missing patchBlob in round %d", idx)
1450 }
1451
1452 ownerPds := ownerId.PDSEndpoint()
1453 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1454 q := url.Query()
1455 q.Set("cid", b.PatchBlob.Ref.String())
1456 q.Set("did", did)
1457 url.RawQuery = q.Encode()
1458
1459 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1460 if err != nil {
1461 l.Error("failed to create request")
1462 return err
1463 }
1464 req.Header.Set("Content-Type", "application/json")
1465
1466 resp, err := http.DefaultClient.Do(req)
1467 if err != nil {
1468 l.Error("failed to make request")
1469 return err
1470 }
1471
1472 mu.Lock()
1473 readers[idx] = &resp.Body
1474 mu.Unlock()
1475
1476 return nil
1477 })
1478 }
1479
1480 if err := g.Wait(); err != nil {
1481 for _, r := range readers {
1482 if r != nil && *r != nil {
1483 (*r).Close()
1484 }
1485 }
1486 return err
1487 }
1488
1489 defer func() {
1490 for _, r := range readers {
1491 if r != nil && *r != nil {
1492 (*r).Close()
1493 }
1494 }
1495 }()
1496
1497 pull, err := models.PullFromRecord(did, rkey, record, readers)
1498 if err != nil {
1499 return fmt.Errorf("failed to parse pull from record: %w", err)
1500 }
1501 if err := i.Validator.ValidatePull(pull); err != nil {
1502 return fmt.Errorf("failed to validate pull: %w", err)
1503 }
1504
1505 tx, err := i.Db.BeginTx(ctx, nil)
1506 if err != nil {
1507 l.Error("failed to begin transaction", "err", err)
1508 return err
1509 }
1510 defer tx.Rollback()
1511
1512 err = db.PutPull(tx, pull)
1513 if err != nil {
1514 l.Error("failed to create pull", "err", err)
1515 return err
1516 }
1517
1518 err = tx.Commit()
1519 if err != nil {
1520 l.Error("failed to commit txn", "err", err)
1521 return err
1522 }
1523
1524 return nil
1525
1526 case jmodels.CommitOperationDelete:
1527 tx, err := i.Db.BeginTx(ctx, nil)
1528 if err != nil {
1529 l.Error("failed to begin transaction", "err", err)
1530 return err
1531 }
1532 defer tx.Rollback()
1533
1534 if err := db.AbandonPulls(
1535 tx,
1536 orm.FilterEq("owner_did", did),
1537 orm.FilterEq("rkey", rkey),
1538 ); err != nil {
1539 l.Error("failed to abandon", "err", err)
1540 return fmt.Errorf("failed to abandon pull record: %w", err)
1541 }
1542 if err := tx.Commit(); err != nil {
1543 l.Error("failed to commit txn", "err", err)
1544 return err
1545 }
1546
1547 return nil
1548 }
1549
1550 return nil
1551}
1552
1553// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1554func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1555 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1556 l.Info("ingesting record")
1557
1558 switch e.Commit.Operation {
1559 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1560 // no-op. sh.tangled.repo.issue.comment is deprecated
1561
1562 case jmodels.CommitOperationDelete:
1563 if err := db.PurgeComments(
1564 i.Db,
1565 orm.FilterEq("did", e.Did),
1566 orm.FilterEq("collection", e.Commit.Collection),
1567 orm.FilterEq("rkey", e.Commit.RKey),
1568 ); err != nil {
1569 return fmt.Errorf("failed to delete comment record: %w", err)
1570 }
1571 }
1572
1573 return nil
1574}
1575
1576// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1577func (i *Ingester) ingestPullComment(e *jmodels.Event) error {
1578 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1579 l.Info("ingesting record")
1580
1581 switch e.Commit.Operation {
1582 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1583 // no-op. sh.tangled.repo.pull.comment is deprecated
1584
1585 case jmodels.CommitOperationDelete:
1586 if err := db.PurgeComments(
1587 i.Db,
1588 orm.FilterEq("did", e.Did),
1589 orm.FilterEq("collection", e.Commit.Collection),
1590 orm.FilterEq("rkey", e.Commit.RKey),
1591 ); err != nil {
1592 return fmt.Errorf("failed to delete comment record: %w", err)
1593 }
1594 }
1595
1596 return nil
1597}
1598
1599func (i *Ingester) ingestComment(e *jmodels.Event) error {
1600 did := e.Did
1601 rkey := e.Commit.RKey
1602 cid := e.Commit.CID
1603
1604 var err error
1605
1606 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1607 l.Info("ingesting record")
1608
1609 ctx := context.Background()
1610
1611 switch e.Commit.Operation {
1612 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1613 raw := json.RawMessage(e.Commit.Record)
1614 record := tangled.FeedComment{}
1615 err = json.Unmarshal(raw, &record)
1616 if err != nil {
1617 return fmt.Errorf("invalid record: %w", err)
1618 }
1619
1620 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1621 if err != nil {
1622 return fmt.Errorf("failed to parse comment from record: %w", err)
1623 }
1624
1625 if err := comment.Validate(); err != nil {
1626 return fmt.Errorf("failed to validate comment: %w", err)
1627 }
1628
1629 var references []syntax.ATURI
1630 if comment.Body.Original != nil {
1631 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1632 }
1633
1634 tx, err := i.Db.Begin()
1635 if err != nil {
1636 return fmt.Errorf("failed to start transaction: %w", err)
1637 }
1638 defer tx.Rollback()
1639
1640 _, err = db.PutComment(tx, comment, references)
1641 if err != nil {
1642 return fmt.Errorf("failed to create comment: %w", err)
1643 }
1644
1645 if err := tx.Commit(); err != nil {
1646 return err
1647 }
1648
1649 case jmodels.CommitOperationDelete:
1650 if err := db.DeleteComments(
1651 i.Db,
1652 orm.FilterEq("did", did),
1653 orm.FilterEq("collection", e.Commit.Collection),
1654 orm.FilterEq("rkey", rkey),
1655 ); err != nil {
1656 return fmt.Errorf("failed to delete comment record: %w", err)
1657 }
1658
1659 return nil
1660 }
1661
1662 return nil
1663}
1664
1665func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1666 did := e.Did
1667 rkey := e.Commit.RKey
1668
1669 var err error
1670
1671 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1672 l.Info("ingesting record")
1673
1674 switch e.Commit.Operation {
1675 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1676 raw := json.RawMessage(e.Commit.Record)
1677 record := tangled.LabelDefinition{}
1678 err = json.Unmarshal(raw, &record)
1679 if err != nil {
1680 return fmt.Errorf("invalid record: %w", err)
1681 }
1682
1683 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1684 if err != nil {
1685 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1686 }
1687
1688 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1689 return fmt.Errorf("failed to validate labeldef: %w", err)
1690 }
1691
1692 _, err = db.AddLabelDefinition(i.Db, def)
1693 if err != nil {
1694 return fmt.Errorf("failed to create labeldef: %w", err)
1695 }
1696
1697 return nil
1698
1699 case jmodels.CommitOperationDelete:
1700 if err := db.DeleteLabelDefinition(
1701 i.Db,
1702 orm.FilterEq("did", did),
1703 orm.FilterEq("rkey", rkey),
1704 ); err != nil {
1705 return fmt.Errorf("failed to delete labeldef record: %w", err)
1706 }
1707
1708 return nil
1709 }
1710
1711 return nil
1712}
1713
1714func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1715 did := e.Did
1716 rkey := e.Commit.RKey
1717
1718 var err error
1719
1720 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1721 l.Info("ingesting record")
1722
1723 switch e.Commit.Operation {
1724 case jmodels.CommitOperationCreate:
1725 raw := json.RawMessage(e.Commit.Record)
1726 record := tangled.LabelOp{}
1727 err = json.Unmarshal(raw, &record)
1728 if err != nil {
1729 return fmt.Errorf("invalid record: %w", err)
1730 }
1731
1732 subject := syntax.ATURI(record.Subject)
1733 collection := subject.Collection()
1734
1735 var repo *models.Repo
1736 switch collection {
1737 case tangled.RepoIssueNSID:
1738 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1739 if err != nil || len(i) != 1 {
1740 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1741 }
1742 repo = i[0].Repo
1743 case tangled.RepoPullNSID:
1744 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1745 if err != nil || len(p) != 1 {
1746 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1747 }
1748 repo = p[0].Repo
1749 default:
1750 return fmt.Errorf("unsupported label subject: %s", collection)
1751 }
1752
1753 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1754 if err != nil {
1755 return fmt.Errorf("failed to build label application ctx: %w", err)
1756 }
1757
1758 ops := models.LabelOpsFromRecord(did, rkey, record)
1759
1760 for _, o := range ops {
1761 def, ok := actx.Defs[o.OperandKey]
1762 if !ok {
1763 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1764 }
1765 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1766 return fmt.Errorf("failed to validate labelop: %w", err)
1767 }
1768 }
1769
1770 tx, err := i.Db.Begin()
1771 if err != nil {
1772 return err
1773 }
1774 defer tx.Rollback()
1775
1776 for _, o := range ops {
1777 _, err = db.AddLabelOp(tx, &o)
1778 if err != nil {
1779 return fmt.Errorf("failed to add labelop: %w", err)
1780 }
1781 }
1782
1783 if err = tx.Commit(); err != nil {
1784 return err
1785 }
1786 }
1787
1788 return nil
1789}