Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/mentions"
31 "tangled.org/core/appview/models"
32 "tangled.org/core/appview/notify"
33 "tangled.org/core/appview/repoverify"
34 "tangled.org/core/appview/serververify"
35 "tangled.org/core/appview/validator"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/orm"
38 "tangled.org/core/rbac"
39)
40
41type Ingester struct {
42 Ctx context.Context
43 Db *db.DB
44 Enforcer *rbac.Enforcer
45 IdResolver *idresolver.Resolver
46 Cache *cache.Cache
47 Config *config.Config
48 Logger *slog.Logger
49 Validator *validator.Validator
50 MentionsResolver *mentions.Resolver
51 Notifier notify.Notifier
52 Verifier repoverify.Verifier
53}
54
55type processFunc func(ctx context.Context, e *jmodels.Event) error
56
57func (i *Ingester) Ingest() processFunc {
58 return func(ctx context.Context, e *jmodels.Event) error {
59 var err error
60
61 l := i.Logger.With("kind", e.Kind)
62 switch e.Kind {
63 case jmodels.EventKindAccount:
64 // TODO: sync account state to db
65 if e.Account.Active {
66 break
67 }
68 // TODO: revoke sessions by DID
69 if *e.Account.Status == "deactivated" {
70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
71 }
72 case jmodels.EventKindIdentity:
73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
74 case jmodels.EventKindCommit:
75 switch e.Commit.Collection {
76 case tangled.GraphFollowNSID:
77 err = i.ingestFollow(e)
78 case tangled.GraphVouchNSID:
79 err = i.ingestVouch(ctx, e)
80 case tangled.FeedStarNSID:
81 err = i.ingestStar(ctx, e)
82 case tangled.FeedReactionNSID:
83 err = i.ingestReaction(e)
84 case tangled.PublicKeyNSID:
85 err = i.ingestPublicKey(e)
86 case tangled.RepoArtifactNSID:
87 err = i.ingestArtifact(ctx, e)
88 case tangled.ActorProfileNSID:
89 err = i.ingestProfile(ctx, e)
90 case tangled.SpindleMemberNSID:
91 err = i.ingestSpindleMember(ctx, e)
92 case tangled.SpindleNSID:
93 err = i.ingestSpindle(ctx, e)
94 case tangled.KnotMemberNSID:
95 err = i.ingestKnotMember(ctx, e)
96 case tangled.KnotNSID:
97 err = i.ingestKnot(ctx, e)
98 case tangled.StringNSID:
99 err = i.ingestString(e)
100 case tangled.RepoIssueNSID:
101 err = i.ingestIssue(ctx, e)
102 case tangled.RepoPullNSID:
103 err = i.ingestPull(ctx, e)
104 case tangled.FeedCommentNSID:
105 err = i.ingestComment(e)
106 case tangled.RepoIssueCommentNSID:
107 err = i.ingestIssueComment(e)
108 case tangled.RepoPullCommentNSID:
109 err = i.ingestPullComment(e)
110 case tangled.LabelDefinitionNSID:
111 err = i.ingestLabelDefinition(e)
112 case tangled.LabelOpNSID:
113 err = i.ingestLabelOp(e)
114 case tangled.RepoNSID:
115 err = i.ingestRepo(ctx, e)
116 }
117 l = i.Logger.With("nsid", e.Commit.Collection)
118 }
119
120 if err != nil {
121 l.Warn("failed to ingest record, skipping", "err", err)
122 }
123
124 lastTimeUs := e.TimeUS + 1
125 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
126 l.Error("failed to save cursor", "err", saveErr)
127 }
128
129 return nil
130 }
131}
132
133func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
134 if strings.HasPrefix(ref, "did:") {
135 return db.GetRepoByDid(i.Db, ref)
136 }
137 return db.GetRepoByAtUri(i.Db, ref)
138}
139
140func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
141 var legacy struct {
142 Subject *string `json:"subject"`
143 SubjectDid *string `json:"subjectDid"`
144 }
145 if err := json.Unmarshal(raw, &legacy); err != nil {
146 return false, err
147 }
148
149 switch {
150 case legacy.SubjectDid != nil:
151 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
152 if err != nil {
153 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
154 return false, nil
155 }
156 star.SubjectType = models.StarSubjectRepo
157 star.Subject = repo.RepoDid
158 return true, nil
159
160 case legacy.Subject != nil:
161 uri, err := syntax.ParseATURI(*legacy.Subject)
162 if err != nil {
163 return false, fmt.Errorf("invalid old-format star subject: %w", err)
164 }
165 switch uri.Collection().String() {
166 case tangled.RepoNSID:
167 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
168 if err != nil {
169 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
170 return false, nil
171 }
172 star.SubjectType = models.StarSubjectRepo
173 star.Subject = repo.RepoDid
174 return true, nil
175 default:
176 star.SubjectType = models.StarSubjectString
177 star.Subject = *legacy.Subject
178 return true, nil
179 }
180
181 default:
182 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
183 }
184}
185
186func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
187 var err error
188 did := e.Did
189
190 l := i.Logger.With("handler", "ingestStar")
191 l = l.With("nsid", e.Commit.Collection)
192
193 switch e.Commit.Operation {
194 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
195 raw := json.RawMessage(e.Commit.Record)
196 record := tangled.FeedStar{}
197 unmarshalErr := json.Unmarshal(raw, &record)
198
199 star := &models.Star{
200 Did: did,
201 Rkey: e.Commit.RKey,
202 }
203
204 switch {
205 case unmarshalErr != nil:
206 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
207 if resolveErr != nil {
208 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
209 return unmarshalErr
210 }
211 if !resolved {
212 return nil
213 }
214
215 case record.Subject == nil:
216 return fmt.Errorf("star record has nil subject")
217
218 case record.Subject.FeedStar_Repo != nil:
219 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
220 if repoErr != nil {
221 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
222 return nil
223 }
224 star.SubjectType = models.StarSubjectRepo
225 star.Subject = repo.RepoDid
226
227 case record.Subject.FeedStar_String != nil:
228 star.SubjectType = models.StarSubjectString
229 star.Subject = record.Subject.FeedStar_String.Uri
230
231 default:
232 return fmt.Errorf("star record has empty subject union")
233 }
234
235 err = db.AddStar(i.Db, star)
236 case jmodels.CommitOperationDelete:
237 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
238 }
239
240 if err != nil {
241 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
242 }
243
244 return nil
245}
246
247func (i *Ingester) ingestFollow(e *jmodels.Event) error {
248 var err error
249 did := e.Did
250
251 l := i.Logger.With("handler", "ingestFollow")
252 l = l.With("nsid", e.Commit.Collection)
253
254 switch e.Commit.Operation {
255 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
256 raw := json.RawMessage(e.Commit.Record)
257 record := tangled.GraphFollow{}
258 err = json.Unmarshal(raw, &record)
259 if err != nil {
260 l.Error("invalid record", "err", err)
261 return err
262 }
263
264 err = db.AddFollow(i.Db, &models.Follow{
265 UserDid: did,
266 SubjectDid: record.Subject,
267 Rkey: e.Commit.RKey,
268 })
269 case jmodels.CommitOperationDelete:
270 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
271 }
272
273 if err != nil {
274 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
275 }
276
277 return nil
278}
279
280func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
281 var err error
282 did := e.Did
283
284 l := i.Logger.With("handler", "ingestVouch")
285 l = l.With("nsid", e.Commit.Collection)
286 l.Info("ingesting vouch")
287
288 switch e.Commit.Operation {
289 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
290 raw := json.RawMessage(e.Commit.Record)
291 record := tangled.GraphVouch{}
292 err = json.Unmarshal(raw, &record)
293 if err != nil {
294 l.Error("invalid record", "err", err)
295 return err
296 }
297
298 // rkey is the subject_did being vouched for/denounced
299 subjectDID := e.Commit.RKey
300
301 _, err = syntax.ParseDID(subjectDID)
302 if err != nil {
303 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
304 return fmt.Errorf("invalid subject_did: %w", err)
305 }
306
307 if did == subjectDID {
308 l.Warn("attempted self-vouch", "did", did)
309 return fmt.Errorf("cannot vouch for self")
310 }
311
312 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
313 if err != nil {
314 return err
315 }
316
317 if subjectId.Handle.IsInvalidHandle() {
318 return err
319 }
320
321 kind, err := models.ParseVouchKind(record.Kind)
322 if err != nil {
323 l.Error("invalid kind", "kind", kind)
324 return fmt.Errorf("invalid kind: %s", kind)
325 }
326
327 recordCid, err := cid.Parse(e.Commit.CID)
328 if err != nil {
329 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
330 return fmt.Errorf("invalid cid: %w", err)
331 }
332
333 var evidences []syntax.ATURI
334 for _, raw := range record.Evidences {
335 uri, parseErr := syntax.ParseATURI(raw)
336 if parseErr != nil {
337 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
338 continue
339 }
340 evidences = append(evidences, uri)
341 }
342
343 tx, txErr := i.Db.Begin()
344 if txErr != nil {
345 return fmt.Errorf("failed to start transaction: %w", txErr)
346 }
347
348 addErr := db.AddVouch(tx, &models.Vouch{
349 Did: syntax.DID(did),
350 SubjectDid: subjectId.DID,
351 Cid: recordCid,
352 Kind: kind,
353 Reason: record.Reason,
354 Evidences: evidences,
355 })
356 if addErr != nil {
357 tx.Rollback()
358 err = addErr
359 } else {
360 err = tx.Commit()
361 }
362
363 case jmodels.CommitOperationDelete:
364 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
365 }
366
367 if err != nil {
368 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
369 }
370
371 return nil
372}
373
374func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
375 did := e.Did
376 var err error
377
378 l := i.Logger.With("handler", "ingestPublicKey")
379 l = l.With("nsid", e.Commit.Collection)
380
381 switch e.Commit.Operation {
382 case jmodels.CommitOperationCreate:
383 l.Debug("processing add of pubkey")
384 raw := json.RawMessage(e.Commit.Record)
385 record := tangled.PublicKey{}
386 err = json.Unmarshal(raw, &record)
387 if err != nil {
388 l.Error("invalid record", "err", err)
389 return err
390 }
391
392 name := record.Name
393 key := record.Key
394 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
395 case jmodels.CommitOperationUpdate:
396 l.Debug("processing update of pubkey")
397 raw := json.RawMessage(e.Commit.Record)
398 record := tangled.PublicKey{}
399 err = json.Unmarshal(raw, &record)
400 if err != nil {
401 l.Error("invalid record", "err", err)
402 return err
403 }
404
405 name := record.Name
406 key := record.Key
407 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
408 case jmodels.CommitOperationDelete:
409 l.Debug("processing delete of pubkey")
410 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
411 }
412
413 if err != nil {
414 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
415 }
416
417 return nil
418}
419
420func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
421 did := e.Did
422 var err error
423
424 l := i.Logger.With("handler", "ingestArtifact")
425 l = l.With("nsid", e.Commit.Collection)
426
427 switch e.Commit.Operation {
428 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
429 raw := json.RawMessage(e.Commit.Record)
430 record := tangled.RepoArtifact{}
431 err = json.Unmarshal(raw, &record)
432 if err != nil {
433 l.Error("invalid record", "err", err)
434 return err
435 }
436
437 var repo *models.Repo
438 if record.RepoDid != nil && *record.RepoDid != "" {
439 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
440 if err != nil && !errors.Is(err, sql.ErrNoRows) {
441 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
442 }
443 }
444 if repo == nil && record.Repo != nil {
445 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
446 if parseErr != nil {
447 return parseErr
448 }
449 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
450 if err != nil {
451 return err
452 }
453 }
454 if repo == nil {
455 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
456 }
457
458 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
459 if err != nil || !ok {
460 return err
461 }
462
463 repoDid := repo.RepoDid
464 if repoDid == "" && record.RepoDid != nil {
465 repoDid = *record.RepoDid
466 }
467 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
468 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
469 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
470 }
471 }
472
473 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
474 if err != nil {
475 createdAt = time.Now()
476 }
477
478 artifact := models.Artifact{
479 Did: did,
480 Rkey: e.Commit.RKey,
481 RepoDid: syntax.DID(repo.RepoDid),
482 Tag: plumbing.Hash(record.Tag),
483 CreatedAt: createdAt,
484 BlobCid: cid.Cid(record.Artifact.Ref),
485 Name: record.Name,
486 Size: uint64(record.Artifact.Size),
487 MimeType: record.Artifact.MimeType,
488 }
489
490 err = db.AddArtifact(i.Db, artifact)
491 case jmodels.CommitOperationDelete:
492 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
493 }
494
495 if err != nil {
496 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
497 }
498
499 return nil
500}
501
502func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
503 did := e.Did
504 var err error
505
506 l := i.Logger.With("handler", "ingestProfile")
507 l = l.With("nsid", e.Commit.Collection)
508
509 if e.Commit.RKey != "self" {
510 return fmt.Errorf("ingestProfile only ingests `self` record")
511 }
512
513 switch e.Commit.Operation {
514 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
515 raw := json.RawMessage(e.Commit.Record)
516 record := tangled.ActorProfile{}
517 err = json.Unmarshal(raw, &record)
518 if err != nil {
519 l.Error("invalid record", "err", err)
520 return err
521 }
522
523 avatar := ""
524 if record.Avatar != nil {
525 avatar = record.Avatar.Ref.String()
526 }
527
528 description := ""
529 if record.Description != nil {
530 description = *record.Description
531 }
532
533 includeBluesky := record.Bluesky
534
535 pronouns := ""
536 if record.Pronouns != nil {
537 pronouns = *record.Pronouns
538 }
539
540 location := ""
541 if record.Location != nil {
542 location = *record.Location
543 }
544
545 var links [5]string
546 for i, l := range record.Links {
547 if i < 5 {
548 links[i] = l
549 }
550 }
551
552 var stats [2]models.VanityStat
553 for i, s := range record.Stats {
554 if i < 2 {
555 stats[i].Kind = models.ParseVanityStatKind(s)
556 }
557 }
558
559 var pinned [6]string
560 for i, r := range record.PinnedRepositories {
561 if i < 6 {
562 pinned[i] = r
563 }
564 }
565
566 var preferredHandle syntax.Handle
567 if record.PreferredHandle != nil {
568 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
569 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
570 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
571 preferredHandle = h
572 }
573 }
574 }
575
576 profile := models.Profile{
577 Did: did,
578 Avatar: avatar,
579 Description: description,
580 IncludeBluesky: includeBluesky,
581 Location: location,
582 Links: links,
583 Stats: stats,
584 PinnedRepos: pinned,
585 Pronouns: pronouns,
586 PreferredHandle: preferredHandle,
587 }
588
589 tx, err := i.Db.Begin()
590 if err != nil {
591 return fmt.Errorf("failed to start transaction: %w", err)
592 }
593
594 err = db.ValidateProfile(tx, &profile)
595 if err != nil {
596 return fmt.Errorf("invalid profile record")
597 }
598
599 err = db.UpsertProfile(tx, &profile)
600 if err == nil && i.Cache != nil {
601 pipe := i.Cache.Pipeline()
602 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
603 if preferredHandle != "" {
604 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
605 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
606 } else {
607 pipe.Del(ctx, didKey)
608 }
609 if _, execErr := pipe.Exec(ctx); execErr != nil {
610 l.Warn("failed to update preferred handle cache", "err", execErr)
611 }
612 }
613 case jmodels.CommitOperationDelete:
614 tx, beginErr := i.Db.Begin()
615 if beginErr != nil {
616 return fmt.Errorf("failed to start transaction: %w", beginErr)
617 }
618
619 priorHandle, phErr := db.GetPreferredHandle(tx, did)
620 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
621 l.Warn("failed to read prior preferred handle", "err", phErr)
622 }
623
624 err = db.DeleteProfile(tx, did)
625 if err == nil && i.Cache != nil {
626 pipe := i.Cache.Pipeline()
627 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
628 if priorHandle != "" {
629 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
630 }
631 if _, execErr := pipe.Exec(ctx); execErr != nil {
632 l.Warn("failed to evict preferred handle cache", "err", execErr)
633 }
634 }
635 }
636
637 if err != nil {
638 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
639 }
640
641 return nil
642}
643
644func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
645 did := e.Did
646 var err error
647
648 l := i.Logger.With("handler", "ingestSpindleMember")
649 l = l.With("nsid", e.Commit.Collection)
650
651 switch e.Commit.Operation {
652 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
653 raw := json.RawMessage(e.Commit.Record)
654 record := tangled.SpindleMember{}
655 err = json.Unmarshal(raw, &record)
656 if err != nil {
657 l.Error("invalid record", "err", err)
658 return err
659 }
660
661 // only spindle owner can invite to spindles
662 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
663 if err != nil {
664 return fmt.Errorf("failed to check invite permission: %w", err)
665 }
666 if !ok {
667 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
668 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
669 }
670 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
671 if err != nil {
672 return fmt.Errorf("failed to re-check invite permission: %w", err)
673 }
674 if !ok {
675 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
676 }
677 }
678
679 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
680 if err != nil {
681 return err
682 }
683
684 if memberId.Handle.IsInvalidHandle() {
685 return fmt.Errorf("invalid handle for member %s", record.Subject)
686 }
687
688 existing, err := db.GetSpindleMembers(i.Db,
689 orm.FilterEq("did", did),
690 orm.FilterEq("rkey", e.Commit.RKey),
691 )
692 if err != nil {
693 return fmt.Errorf("failed to look up existing member: %w", err)
694 }
695 if len(existing) > 1 {
696 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
697 }
698
699 tx, err := i.Db.Begin()
700 if err != nil {
701 return fmt.Errorf("failed to start txn: %w", err)
702 }
703 committed := false
704 defer func() {
705 if committed {
706 return
707 }
708 tx.Rollback()
709 i.Enforcer.E.LoadPolicy()
710 }()
711
712 if len(existing) == 1 {
713 prev := existing[0]
714 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
715 if err = db.RemoveSpindleMember(tx,
716 orm.FilterEq("did", did),
717 orm.FilterEq("rkey", e.Commit.RKey),
718 ); err != nil {
719 return fmt.Errorf("failed to remove stale row: %w", err)
720 }
721 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
722 return fmt.Errorf("failed to remove stale ACL: %w", err)
723 }
724 }
725 }
726
727 if err = db.AddSpindleMember(tx, models.SpindleMember{
728 Did: syntax.DID(did),
729 Rkey: e.Commit.RKey,
730 Instance: record.Instance,
731 Subject: memberId.DID,
732 }); err != nil {
733 return fmt.Errorf("failed to add to db: %w", err)
734 }
735
736 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
737 return fmt.Errorf("failed to update ACLs: %w", err)
738 }
739
740 if err = tx.Commit(); err != nil {
741 return fmt.Errorf("failed to commit txn: %w", err)
742 }
743
744 if err = i.Enforcer.E.SavePolicy(); err != nil {
745 return fmt.Errorf("failed to save ACLs: %w", err)
746 }
747 committed = true
748
749 l.Info("upserted spindle member")
750 case jmodels.CommitOperationDelete:
751 rkey := e.Commit.RKey
752
753 // get record from db first
754 members, err := db.GetSpindleMembers(
755 i.Db,
756 orm.FilterEq("did", did),
757 orm.FilterEq("rkey", rkey),
758 )
759 if err != nil || len(members) != 1 {
760 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
761 }
762 member := members[0]
763
764 tx, err := i.Db.Begin()
765 if err != nil {
766 return fmt.Errorf("failed to start txn: %w", err)
767 }
768 committed := false
769 defer func() {
770 if committed {
771 return
772 }
773 tx.Rollback()
774 i.Enforcer.E.LoadPolicy()
775 }()
776
777 // remove record by rkey && update enforcer
778 if err = db.RemoveSpindleMember(
779 tx,
780 orm.FilterEq("did", did),
781 orm.FilterEq("rkey", rkey),
782 ); err != nil {
783 return fmt.Errorf("failed to remove from db: %w", err)
784 }
785
786 // update enforcer
787 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
788 if err != nil {
789 return fmt.Errorf("failed to update ACLs: %w", err)
790 }
791
792 if err = tx.Commit(); err != nil {
793 return fmt.Errorf("failed to commit txn: %w", err)
794 }
795
796 if err = i.Enforcer.E.SavePolicy(); err != nil {
797 return fmt.Errorf("failed to save ACLs: %w", err)
798 }
799 committed = true
800
801 l.Info("removed spindle member")
802 }
803
804 return nil
805}
806
807func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
808 did := e.Did
809 var err error
810
811 l := i.Logger.With("handler", "ingestSpindle")
812 l = l.With("nsid", e.Commit.Collection)
813
814 switch e.Commit.Operation {
815 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
816 raw := json.RawMessage(e.Commit.Record)
817 record := tangled.Spindle{}
818 err = json.Unmarshal(raw, &record)
819 if err != nil {
820 l.Error("invalid record", "err", err)
821 return err
822 }
823
824 instance := e.Commit.RKey
825
826 err := db.AddSpindle(i.Db, models.Spindle{
827 Owner: syntax.DID(did),
828 Instance: instance,
829 })
830 if err != nil {
831 l.Error("failed to add spindle to db", "err", err, "instance", instance)
832 return err
833 }
834
835 if err := i.verifySpindle(ctx, instance, did); err != nil {
836 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
837 }
838
839 return nil
840
841 case jmodels.CommitOperationDelete:
842 instance := e.Commit.RKey
843
844 // get record from db first
845 spindles, err := db.GetSpindles(
846 ctx,
847 i.Db,
848 orm.FilterEq("owner", did),
849 orm.FilterEq("instance", instance),
850 )
851 if err != nil || len(spindles) != 1 {
852 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
853 }
854 spindle := spindles[0]
855
856 tx, err := i.Db.Begin()
857 if err != nil {
858 return err
859 }
860 defer func() {
861 tx.Rollback()
862 i.Enforcer.E.LoadPolicy()
863 }()
864
865 // remove spindle members first
866 err = db.RemoveSpindleMember(
867 tx,
868 orm.FilterEq("owner", did),
869 orm.FilterEq("instance", instance),
870 )
871 if err != nil {
872 return err
873 }
874
875 err = db.DeleteSpindle(
876 tx,
877 orm.FilterEq("owner", did),
878 orm.FilterEq("instance", instance),
879 )
880 if err != nil {
881 return err
882 }
883
884 if spindle.Verified != nil {
885 err = i.Enforcer.RemoveSpindle(instance)
886 if err != nil {
887 return err
888 }
889 }
890
891 err = tx.Commit()
892 if err != nil {
893 return err
894 }
895
896 err = i.Enforcer.E.SavePolicy()
897 if err != nil {
898 return err
899 }
900 }
901
902 return nil
903}
904
905func (i *Ingester) ingestString(e *jmodels.Event) error {
906 did := e.Did
907 rkey := e.Commit.RKey
908
909 var err error
910
911 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
912 l.Info("ingesting record")
913
914 switch e.Commit.Operation {
915 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
916 raw := json.RawMessage(e.Commit.Record)
917 record := tangled.String{}
918 err = json.Unmarshal(raw, &record)
919 if err != nil {
920 l.Error("invalid record", "err", err)
921 return err
922 }
923
924 string := models.StringFromRecord(did, rkey, record)
925
926 if err = i.Validator.ValidateString(&string); err != nil {
927 l.Error("invalid record", "err", err)
928 return err
929 }
930
931 if err = db.AddString(i.Db, string); err != nil {
932 l.Error("failed to add string", "err", err)
933 return err
934 }
935
936 return nil
937
938 case jmodels.CommitOperationDelete:
939 if err := db.DeleteString(
940 i.Db,
941 orm.FilterEq("did", did),
942 orm.FilterEq("rkey", rkey),
943 ); err != nil {
944 l.Error("failed to delete", "err", err)
945 return fmt.Errorf("failed to delete string record: %w", err)
946 }
947
948 return nil
949 }
950
951 return nil
952}
953
954func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error {
955 did := e.Did
956 var err error
957
958 l := i.Logger.With("handler", "ingestKnotMember")
959 l = l.With("nsid", e.Commit.Collection)
960
961 switch e.Commit.Operation {
962 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
963 raw := json.RawMessage(e.Commit.Record)
964 record := tangled.KnotMember{}
965 err = json.Unmarshal(raw, &record)
966 if err != nil {
967 l.Error("invalid record", "err", err)
968 return err
969 }
970
971 // only knot owner can invite to knots
972 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
973 if err != nil {
974 return fmt.Errorf("failed to check invite permission: %w", err)
975 }
976 if !ok {
977 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
978 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
979 }
980 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
981 if err != nil {
982 return fmt.Errorf("failed to re-check invite permission: %w", err)
983 }
984 if !ok {
985 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
986 }
987 }
988
989 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
990 if err != nil {
991 return err
992 }
993
994 if memberId.Handle.IsInvalidHandle() {
995 return fmt.Errorf("invalid handle for member %s", record.Subject)
996 }
997
998 existing, err := db.GetKnotMembers(i.Db,
999 orm.FilterEq("did", did),
1000 orm.FilterEq("rkey", e.Commit.RKey),
1001 )
1002 if err != nil {
1003 return fmt.Errorf("failed to look up existing member: %w", err)
1004 }
1005 if len(existing) > 1 {
1006 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1007 }
1008
1009 tx, err := i.Db.Begin()
1010 if err != nil {
1011 return fmt.Errorf("failed to start txn: %w", err)
1012 }
1013 committed := false
1014 defer func() {
1015 if committed {
1016 return
1017 }
1018 tx.Rollback()
1019 i.Enforcer.E.LoadPolicy()
1020 }()
1021
1022 if len(existing) == 1 {
1023 prev := existing[0]
1024 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1025 if err = db.RemoveKnotMember(tx,
1026 orm.FilterEq("did", did),
1027 orm.FilterEq("rkey", e.Commit.RKey),
1028 ); err != nil {
1029 return fmt.Errorf("failed to remove stale row: %w", err)
1030 }
1031 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1032 return fmt.Errorf("failed to remove stale ACL: %w", err)
1033 }
1034 }
1035 }
1036
1037 if err = db.AddKnotMember(tx, models.KnotMember{
1038 Did: syntax.DID(did),
1039 Rkey: e.Commit.RKey,
1040 Domain: record.Domain,
1041 Subject: memberId.DID,
1042 }); err != nil {
1043 return fmt.Errorf("failed to add to db: %w", err)
1044 }
1045
1046 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1047 return fmt.Errorf("failed to update ACLs: %w", err)
1048 }
1049
1050 if err = tx.Commit(); err != nil {
1051 return fmt.Errorf("failed to commit txn: %w", err)
1052 }
1053
1054 if err = i.Enforcer.E.SavePolicy(); err != nil {
1055 return fmt.Errorf("failed to save ACLs: %w", err)
1056 }
1057 committed = true
1058
1059 l.Info("upserted knot member")
1060 case jmodels.CommitOperationDelete:
1061 rkey := e.Commit.RKey
1062
1063 members, err := db.GetKnotMembers(
1064 i.Db,
1065 orm.FilterEq("did", did),
1066 orm.FilterEq("rkey", rkey),
1067 )
1068 if err != nil {
1069 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1070 }
1071 if len(members) == 0 {
1072 l.Info("knot member already removed", "rkey", rkey)
1073 return nil
1074 }
1075 if len(members) > 1 {
1076 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1077 }
1078 member := members[0]
1079
1080 tx, err := i.Db.Begin()
1081 if err != nil {
1082 return fmt.Errorf("failed to start txn: %w", err)
1083 }
1084 committed := false
1085 defer func() {
1086 if committed {
1087 return
1088 }
1089 tx.Rollback()
1090 i.Enforcer.E.LoadPolicy()
1091 }()
1092
1093 if err = db.RemoveKnotMember(
1094 tx,
1095 orm.FilterEq("did", did),
1096 orm.FilterEq("rkey", rkey),
1097 ); err != nil {
1098 return fmt.Errorf("failed to remove from db: %w", err)
1099 }
1100
1101 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1102 return fmt.Errorf("failed to update ACLs: %w", err)
1103 }
1104
1105 if err = tx.Commit(); err != nil {
1106 return fmt.Errorf("failed to commit txn: %w", err)
1107 }
1108
1109 if err = i.Enforcer.E.SavePolicy(); err != nil {
1110 return fmt.Errorf("failed to save ACLs: %w", err)
1111 }
1112 committed = true
1113
1114 l.Info("removed knot member")
1115 }
1116
1117 return nil
1118}
1119
1120func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error {
1121 did := e.Did
1122 var err error
1123
1124 l := i.Logger.With("handler", "ingestKnot")
1125 l = l.With("nsid", e.Commit.Collection)
1126
1127 switch e.Commit.Operation {
1128 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1129 raw := json.RawMessage(e.Commit.Record)
1130 record := tangled.Knot{}
1131 err = json.Unmarshal(raw, &record)
1132 if err != nil {
1133 l.Error("invalid record", "err", err)
1134 return err
1135 }
1136
1137 domain := e.Commit.RKey
1138
1139 err := db.AddKnot(i.Db, domain, did)
1140 if err != nil {
1141 l.Error("failed to add knot to db", "err", err, "domain", domain)
1142 return err
1143 }
1144
1145 if err := i.verifyKnot(ctx, domain, did); err != nil {
1146 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1147 }
1148
1149 return nil
1150
1151 case jmodels.CommitOperationDelete:
1152 domain := e.Commit.RKey
1153
1154 // get record from db first
1155 registrations, err := db.GetRegistrations(
1156 i.Db,
1157 orm.FilterEq("domain", domain),
1158 orm.FilterEq("did", did),
1159 )
1160 if err != nil {
1161 return fmt.Errorf("failed to get registration: %w", err)
1162 }
1163 if len(registrations) != 1 {
1164 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1165 }
1166 registration := registrations[0]
1167
1168 tx, err := i.Db.Begin()
1169 if err != nil {
1170 return err
1171 }
1172 defer func() {
1173 tx.Rollback()
1174 i.Enforcer.E.LoadPolicy()
1175 }()
1176
1177 err = db.RemoveKnotMember(
1178 tx,
1179 orm.FilterEq("did", did),
1180 orm.FilterEq("domain", domain),
1181 )
1182 if err != nil {
1183 return err
1184 }
1185
1186 err = db.DeleteKnot(
1187 tx,
1188 orm.FilterEq("did", did),
1189 orm.FilterEq("domain", domain),
1190 )
1191 if err != nil {
1192 return err
1193 }
1194
1195 err = db.RemoveReposByKnot(tx, domain)
1196 if err != nil {
1197 return err
1198 }
1199
1200 if registration.Registered != nil {
1201 err = i.Enforcer.RemoveKnot(domain)
1202 if err != nil {
1203 return err
1204 }
1205 }
1206
1207 err = tx.Commit()
1208 if err != nil {
1209 return err
1210 }
1211
1212 err = i.Enforcer.E.SavePolicy()
1213 if err != nil {
1214 return err
1215 }
1216 }
1217
1218 return nil
1219}
1220
1221const (
1222 verifyAttempts = 4
1223 verifyMinDelay = 1 * time.Second
1224 verifyMaxDelay = 5 * time.Second
1225)
1226
1227func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1228 regs, err := db.GetRegistrations(i.Db,
1229 orm.FilterEq("domain", domain),
1230 orm.FilterEq("did", did),
1231 )
1232 if err != nil {
1233 return fmt.Errorf("look up registration: %w", err)
1234 }
1235 if len(regs) != 1 {
1236 return fmt.Errorf("no registration for %s by %s", domain, did)
1237 }
1238 if regs[0].Registered != nil {
1239 return nil
1240 }
1241
1242 err = retry.Do(
1243 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1244 retry.Context(ctx),
1245 retry.Attempts(verifyAttempts),
1246 retry.Delay(verifyMinDelay),
1247 retry.MaxDelay(verifyMaxDelay),
1248 retry.DelayType(retry.BackOffDelay),
1249 retry.LastErrorOnly(true),
1250 )
1251 if err != nil {
1252 return fmt.Errorf("verify: %w", err)
1253 }
1254 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1255}
1256
1257func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1258 spindles, err := db.GetSpindles(ctx, i.Db,
1259 orm.FilterEq("instance", instance),
1260 orm.FilterEq("owner", did),
1261 )
1262 if err != nil {
1263 return fmt.Errorf("look up spindle: %w", err)
1264 }
1265 if len(spindles) != 1 {
1266 return fmt.Errorf("no spindle for %s by %s", instance, did)
1267 }
1268 if spindles[0].Verified != nil {
1269 return nil
1270 }
1271
1272 err = retry.Do(
1273 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1274 retry.Context(ctx),
1275 retry.Attempts(verifyAttempts),
1276 retry.Delay(verifyMinDelay),
1277 retry.MaxDelay(verifyMaxDelay),
1278 retry.DelayType(retry.BackOffDelay),
1279 retry.LastErrorOnly(true),
1280 )
1281 if err != nil {
1282 return fmt.Errorf("verify: %w", err)
1283 }
1284 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1285 return err
1286}
1287
1288const sweepConcurrency = 4
1289
1290func (i *Ingester) SweepPendingVerifications() {
1291 l := i.Logger.With("handler", "SweepPendingVerifications")
1292
1293 var g errgroup.Group
1294 g.SetLimit(sweepConcurrency)
1295
1296 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1297 if err != nil {
1298 l.Error("failed to list unverified knots", "err", err)
1299 } else {
1300 for _, reg := range regs {
1301 g.Go(func() error {
1302 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1303 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1304 }
1305 return nil
1306 })
1307 }
1308 }
1309
1310 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1311 if err != nil {
1312 l.Error("failed to list unverified spindles", "err", err)
1313 g.Wait()
1314 return
1315 }
1316 for _, s := range spindles {
1317 g.Go(func() error {
1318 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1319 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1320 }
1321 return nil
1322 })
1323 }
1324 g.Wait()
1325}
1326
1327func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1328 did := e.Did
1329 rkey := e.Commit.RKey
1330
1331 var err error
1332
1333 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1334 l.Info("ingesting record")
1335
1336 switch e.Commit.Operation {
1337 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1338 raw := json.RawMessage(e.Commit.Record)
1339 record := tangled.RepoIssue{}
1340 err = json.Unmarshal(raw, &record)
1341 if err != nil {
1342 l.Error("invalid record", "err", err)
1343 return err
1344 }
1345
1346 issue := models.IssueFromRecord(did, rkey, record)
1347
1348 if issue.RepoDid == "" {
1349 return fmt.Errorf("issue record has no repo field")
1350 }
1351 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1352 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1353 }
1354
1355 if err := i.Validator.ValidateIssue(&issue); err != nil {
1356 return fmt.Errorf("failed to validate issue: %w", err)
1357 }
1358
1359 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1360 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1361 if repoErr == nil && repo.RepoDid != "" {
1362 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1363 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1364 }
1365 }
1366 }
1367
1368 tx, err := i.Db.BeginTx(ctx, nil)
1369 if err != nil {
1370 l.Error("failed to begin transaction", "err", err)
1371 return err
1372 }
1373 defer tx.Rollback()
1374
1375 err = db.PutIssue(tx, &issue)
1376 if err != nil {
1377 l.Error("failed to create issue", "err", err)
1378 return err
1379 }
1380
1381 err = tx.Commit()
1382 if err != nil {
1383 l.Error("failed to commit txn", "err", err)
1384 return err
1385 }
1386
1387 return nil
1388
1389 case jmodels.CommitOperationDelete:
1390 tx, err := i.Db.BeginTx(ctx, nil)
1391 if err != nil {
1392 l.Error("failed to begin transaction", "err", err)
1393 return err
1394 }
1395 defer tx.Rollback()
1396
1397 if err := db.DeleteIssues(
1398 tx,
1399 did,
1400 rkey,
1401 ); err != nil {
1402 l.Error("failed to delete", "err", err)
1403 return fmt.Errorf("failed to delete issue record: %w", err)
1404 }
1405 if err := tx.Commit(); err != nil {
1406 l.Error("failed to commit txn", "err", err)
1407 return err
1408 }
1409
1410 return nil
1411 }
1412
1413 return nil
1414}
1415
1416func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1417 did := e.Did
1418 rkey := e.Commit.RKey
1419
1420 var err error
1421
1422 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1423 l.Info("ingesting record")
1424
1425 switch e.Commit.Operation {
1426 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1427 raw := json.RawMessage(e.Commit.Record)
1428 record := tangled.RepoPull{}
1429 err = json.Unmarshal(raw, &record)
1430 if err != nil {
1431 l.Error("invalid record", "err", err)
1432 return err
1433 }
1434
1435 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1436 if err != nil {
1437 l.Error("failed to resolve did")
1438 return err
1439 }
1440
1441 // go through and fetch all blobs in parallel
1442 readers := make([]*io.ReadCloser, len(record.Rounds))
1443 var mu sync.Mutex
1444
1445 g, gctx := errgroup.WithContext(ctx)
1446
1447 for idx, b := range record.Rounds {
1448 g.Go(func() error {
1449 // for some reason, a blob is empty
1450 if b.PatchBlob == nil {
1451 return fmt.Errorf("missing patchBlob in round %d", idx)
1452 }
1453
1454 ownerPds := ownerId.PDSEndpoint()
1455 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1456 q := url.Query()
1457 q.Set("cid", b.PatchBlob.Ref.String())
1458 q.Set("did", did)
1459 url.RawQuery = q.Encode()
1460
1461 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1462 if err != nil {
1463 l.Error("failed to create request")
1464 return err
1465 }
1466 req.Header.Set("Content-Type", "application/json")
1467
1468 resp, err := http.DefaultClient.Do(req)
1469 if err != nil {
1470 l.Error("failed to make request")
1471 return err
1472 }
1473
1474 mu.Lock()
1475 readers[idx] = &resp.Body
1476 mu.Unlock()
1477
1478 return nil
1479 })
1480 }
1481
1482 if err := g.Wait(); err != nil {
1483 for _, r := range readers {
1484 if r != nil && *r != nil {
1485 (*r).Close()
1486 }
1487 }
1488 return err
1489 }
1490
1491 defer func() {
1492 for _, r := range readers {
1493 if r != nil && *r != nil {
1494 (*r).Close()
1495 }
1496 }
1497 }()
1498
1499 pull, err := models.PullFromRecord(did, rkey, record, readers)
1500 if err != nil {
1501 return fmt.Errorf("failed to parse pull from record: %w", err)
1502 }
1503 if err := i.Validator.ValidatePull(pull); err != nil {
1504 return fmt.Errorf("failed to validate pull: %w", err)
1505 }
1506
1507 tx, err := i.Db.BeginTx(ctx, nil)
1508 if err != nil {
1509 l.Error("failed to begin transaction", "err", err)
1510 return err
1511 }
1512 defer tx.Rollback()
1513
1514 err = db.PutPull(tx, pull)
1515 if err != nil {
1516 l.Error("failed to create pull", "err", err)
1517 return err
1518 }
1519
1520 err = tx.Commit()
1521 if err != nil {
1522 l.Error("failed to commit txn", "err", err)
1523 return err
1524 }
1525
1526 return nil
1527
1528 case jmodels.CommitOperationDelete:
1529 tx, err := i.Db.BeginTx(ctx, nil)
1530 if err != nil {
1531 l.Error("failed to begin transaction", "err", err)
1532 return err
1533 }
1534 defer tx.Rollback()
1535
1536 if err := db.AbandonPulls(
1537 tx,
1538 orm.FilterEq("owner_did", did),
1539 orm.FilterEq("rkey", rkey),
1540 ); err != nil {
1541 l.Error("failed to abandon", "err", err)
1542 return fmt.Errorf("failed to abandon pull record: %w", err)
1543 }
1544 if err := tx.Commit(); err != nil {
1545 l.Error("failed to commit txn", "err", err)
1546 return err
1547 }
1548
1549 return nil
1550 }
1551
1552 return nil
1553}
1554
1555// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1556func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1557 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1558 l.Info("ingesting record")
1559
1560 switch e.Commit.Operation {
1561 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1562 // no-op. sh.tangled.repo.issue.comment is deprecated
1563
1564 case jmodels.CommitOperationDelete:
1565 if err := db.PurgeComments(
1566 i.Db,
1567 orm.FilterEq("did", e.Did),
1568 orm.FilterEq("collection", e.Commit.Collection),
1569 orm.FilterEq("rkey", e.Commit.RKey),
1570 ); err != nil {
1571 return fmt.Errorf("failed to delete comment record: %w", err)
1572 }
1573 }
1574
1575 return nil
1576}
1577
1578// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1579func (i *Ingester) ingestPullComment(e *jmodels.Event) error {
1580 l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey)
1581 l.Info("ingesting record")
1582
1583 switch e.Commit.Operation {
1584 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1585 // no-op. sh.tangled.repo.pull.comment is deprecated
1586
1587 case jmodels.CommitOperationDelete:
1588 if err := db.PurgeComments(
1589 i.Db,
1590 orm.FilterEq("did", e.Did),
1591 orm.FilterEq("collection", e.Commit.Collection),
1592 orm.FilterEq("rkey", e.Commit.RKey),
1593 ); err != nil {
1594 return fmt.Errorf("failed to delete comment record: %w", err)
1595 }
1596 }
1597
1598 return nil
1599}
1600
1601func (i *Ingester) ingestComment(e *jmodels.Event) error {
1602 did := e.Did
1603 rkey := e.Commit.RKey
1604 cid := e.Commit.CID
1605
1606 var err error
1607
1608 l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1609 l.Info("ingesting record")
1610
1611 ctx := context.Background()
1612
1613 switch e.Commit.Operation {
1614 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1615 raw := json.RawMessage(e.Commit.Record)
1616 record := tangled.FeedComment{}
1617 err = json.Unmarshal(raw, &record)
1618 if err != nil {
1619 return fmt.Errorf("invalid record: %w", err)
1620 }
1621
1622 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1623 if err != nil {
1624 return fmt.Errorf("failed to parse comment from record: %w", err)
1625 }
1626
1627 if err := comment.Validate(); err != nil {
1628 return fmt.Errorf("failed to validate comment: %w", err)
1629 }
1630
1631 var references []syntax.ATURI
1632 if comment.Body.Original != nil {
1633 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1634 }
1635
1636 tx, err := i.Db.Begin()
1637 if err != nil {
1638 return fmt.Errorf("failed to start transaction: %w", err)
1639 }
1640 defer tx.Rollback()
1641
1642 _, err = db.PutComment(tx, comment, references)
1643 if err != nil {
1644 return fmt.Errorf("failed to create comment: %w", err)
1645 }
1646
1647 if err := tx.Commit(); err != nil {
1648 return err
1649 }
1650
1651 case jmodels.CommitOperationDelete:
1652 if err := db.DeleteComments(
1653 i.Db,
1654 orm.FilterEq("did", did),
1655 orm.FilterEq("collection", e.Commit.Collection),
1656 orm.FilterEq("rkey", rkey),
1657 ); err != nil {
1658 return fmt.Errorf("failed to delete comment record: %w", err)
1659 }
1660
1661 return nil
1662 }
1663
1664 return nil
1665}
1666
1667func (i *Ingester) ingestReaction(e *jmodels.Event) error {
1668 did := e.Did
1669 rkey := e.Commit.RKey
1670
1671 l := i.Logger.With("handler", "ingestReaction", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1672 l.Info("ingesting record")
1673
1674 switch e.Commit.Operation {
1675 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1676 raw := json.RawMessage(e.Commit.Record)
1677 record := tangled.FeedReaction{}
1678 if err := json.Unmarshal(raw, &record); err != nil {
1679 return fmt.Errorf("invalid record: %w", err)
1680 }
1681
1682 subjectUri, err := syntax.ParseATURI(record.Subject)
1683 if err != nil {
1684 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1685 }
1686 subjectUri = models.NormalizeReactionSubject(subjectUri)
1687
1688 kind, ok := models.ParseReactionKind(record.Reaction)
1689 if !ok {
1690 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1691 }
1692
1693 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1694 if parseErr != nil {
1695 created = time.Now()
1696 }
1697
1698 tx, err := i.Db.Begin()
1699 if err != nil {
1700 return fmt.Errorf("failed to start transaction: %w", err)
1701 }
1702 defer tx.Rollback()
1703
1704 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil {
1705 return fmt.Errorf("failed to clear existing reaction: %w", err)
1706 }
1707 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil {
1708 return fmt.Errorf("failed to add reaction: %w", err)
1709 }
1710
1711 return tx.Commit()
1712
1713 case jmodels.CommitOperationDelete:
1714 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1715 return fmt.Errorf("failed to delete reaction record: %w", err)
1716 }
1717 }
1718
1719 return nil
1720}
1721
1722func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1723 did := e.Did
1724 rkey := e.Commit.RKey
1725
1726 var err error
1727
1728 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1729 l.Info("ingesting record")
1730
1731 switch e.Commit.Operation {
1732 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1733 raw := json.RawMessage(e.Commit.Record)
1734 record := tangled.LabelDefinition{}
1735 err = json.Unmarshal(raw, &record)
1736 if err != nil {
1737 return fmt.Errorf("invalid record: %w", err)
1738 }
1739
1740 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1741 if err != nil {
1742 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1743 }
1744
1745 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1746 return fmt.Errorf("failed to validate labeldef: %w", err)
1747 }
1748
1749 _, err = db.AddLabelDefinition(i.Db, def)
1750 if err != nil {
1751 return fmt.Errorf("failed to create labeldef: %w", err)
1752 }
1753
1754 return nil
1755
1756 case jmodels.CommitOperationDelete:
1757 if err := db.DeleteLabelDefinition(
1758 i.Db,
1759 orm.FilterEq("did", did),
1760 orm.FilterEq("rkey", rkey),
1761 ); err != nil {
1762 return fmt.Errorf("failed to delete labeldef record: %w", err)
1763 }
1764
1765 return nil
1766 }
1767
1768 return nil
1769}
1770
1771func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1772 did := e.Did
1773 rkey := e.Commit.RKey
1774
1775 var err error
1776
1777 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1778 l.Info("ingesting record")
1779
1780 switch e.Commit.Operation {
1781 case jmodels.CommitOperationCreate:
1782 raw := json.RawMessage(e.Commit.Record)
1783 record := tangled.LabelOp{}
1784 err = json.Unmarshal(raw, &record)
1785 if err != nil {
1786 return fmt.Errorf("invalid record: %w", err)
1787 }
1788
1789 subject := syntax.ATURI(record.Subject)
1790 collection := subject.Collection()
1791
1792 var repo *models.Repo
1793 switch collection {
1794 case tangled.RepoIssueNSID:
1795 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1796 if err != nil || len(i) != 1 {
1797 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1798 }
1799 repo = i[0].Repo
1800 case tangled.RepoPullNSID:
1801 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1802 if err != nil || len(p) != 1 {
1803 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1804 }
1805 repo = p[0].Repo
1806 default:
1807 return fmt.Errorf("unsupported label subject: %s", collection)
1808 }
1809
1810 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1811 if err != nil {
1812 return fmt.Errorf("failed to build label application ctx: %w", err)
1813 }
1814
1815 ops := models.LabelOpsFromRecord(did, rkey, record)
1816
1817 for _, o := range ops {
1818 def, ok := actx.Defs[o.OperandKey]
1819 if !ok {
1820 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1821 }
1822 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1823 return fmt.Errorf("failed to validate labelop: %w", err)
1824 }
1825 }
1826
1827 tx, err := i.Db.Begin()
1828 if err != nil {
1829 return err
1830 }
1831 defer tx.Rollback()
1832
1833 for _, o := range ops {
1834 _, err = db.AddLabelOp(tx, &o)
1835 if err != nil {
1836 return fmt.Errorf("failed to add labelop: %w", err)
1837 }
1838 }
1839
1840 if err = tx.Commit(); err != nil {
1841 return err
1842 }
1843 }
1844
1845 return nil
1846}