Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/mentions"
31 "tangled.org/core/appview/models"
32 "tangled.org/core/appview/notify"
33 "tangled.org/core/appview/repoverify"
34 "tangled.org/core/appview/serververify"
35 "tangled.org/core/appview/validator"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/orm"
38 "tangled.org/core/rbac"
39)
40
41type Ingester struct {
42 Ctx context.Context
43 Db *db.DB
44 Enforcer *rbac.Enforcer
45 IdResolver *idresolver.Resolver
46 Cache *cache.Cache
47 Config *config.Config
48 Logger *slog.Logger
49 Validator *validator.Validator
50 MentionsResolver *mentions.Resolver
51 Notifier notify.Notifier
52 Verifier repoverify.Verifier
53}
54
55type processFunc func(ctx context.Context, e *jmodels.Event) error
56
57func (i *Ingester) Ingest() processFunc {
58 return func(ctx context.Context, e *jmodels.Event) error {
59 var err error
60
61 l := i.Logger.With("kind", e.Kind)
62 switch e.Kind {
63 case jmodels.EventKindAccount:
64 // TODO: sync account state to db
65 if e.Account.Active {
66 break
67 }
68 // TODO: revoke sessions by DID
69 if *e.Account.Status == "deactivated" {
70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
71 }
72 case jmodels.EventKindIdentity:
73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
74 case jmodels.EventKindCommit:
75 l = l.With(
76 "nsid", e.Commit.Collection,
77 "did", e.Did,
78 "rkey", e.Commit.RKey,
79 "op", e.Commit.Operation,
80 )
81 switch e.Commit.Collection {
82 case tangled.GraphFollowNSID:
83 err = i.ingestFollow(e, l)
84 case tangled.GraphVouchNSID:
85 err = i.ingestVouch(ctx, e, l)
86 case tangled.FeedStarNSID:
87 err = i.ingestStar(ctx, e, l)
88 case tangled.FeedReactionNSID:
89 err = i.ingestReaction(e, l)
90 case tangled.PublicKeyNSID:
91 err = i.ingestPublicKey(e, l)
92 case tangled.RepoArtifactNSID:
93 err = i.ingestArtifact(ctx, e, l)
94 case tangled.ActorProfileNSID:
95 err = i.ingestProfile(ctx, e, l)
96 case tangled.SpindleMemberNSID:
97 err = i.ingestSpindleMember(ctx, e, l)
98 case tangled.SpindleNSID:
99 err = i.ingestSpindle(ctx, e, l)
100 case tangled.KnotMemberNSID:
101 err = i.ingestKnotMember(ctx, e, l)
102 case tangled.KnotNSID:
103 err = i.ingestKnot(ctx, e, l)
104 case tangled.StringNSID:
105 err = i.ingestString(e, l)
106 case tangled.RepoIssueNSID:
107 err = i.ingestIssue(ctx, e, l)
108 case tangled.RepoPullNSID:
109 err = i.ingestPull(ctx, e, l)
110 case tangled.FeedCommentNSID:
111 err = i.ingestComment(e, l)
112 case tangled.RepoIssueCommentNSID:
113 err = i.ingestIssueComment(e, l)
114 case tangled.RepoPullCommentNSID:
115 err = i.ingestPullComment(e, l)
116 case tangled.LabelDefinitionNSID:
117 err = i.ingestLabelDefinition(e, l)
118 case tangled.LabelOpNSID:
119 err = i.ingestLabelOp(e, l)
120 case tangled.RepoNSID:
121 err = i.ingestRepo(ctx, e, l)
122 }
123 }
124
125 if err != nil {
126 l.Warn("failed to ingest record, skipping", "err", err)
127 }
128
129 lastTimeUs := e.TimeUS + 1
130 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
131 l.Error("failed to save cursor", "err", saveErr)
132 }
133
134 return nil
135 }
136}
137
138func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
139 if strings.HasPrefix(ref, "did:") {
140 return db.GetRepoByDid(i.Db, ref)
141 }
142 return db.GetRepoByAtUri(i.Db, ref)
143}
144
145func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
146 var legacy struct {
147 Subject *string `json:"subject"`
148 SubjectDid *string `json:"subjectDid"`
149 }
150 if err := json.Unmarshal(raw, &legacy); err != nil {
151 return false, err
152 }
153
154 switch {
155 case legacy.SubjectDid != nil:
156 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
157 if err != nil {
158 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
159 return false, nil
160 }
161 star.SubjectType = models.StarSubjectRepo
162 star.Subject = repo.RepoDid
163 return true, nil
164
165 case legacy.Subject != nil:
166 uri, err := syntax.ParseATURI(*legacy.Subject)
167 if err != nil {
168 return false, fmt.Errorf("invalid old-format star subject: %w", err)
169 }
170 switch uri.Collection().String() {
171 case tangled.RepoNSID:
172 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
173 if err != nil {
174 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
175 return false, nil
176 }
177 star.SubjectType = models.StarSubjectRepo
178 star.Subject = repo.RepoDid
179 return true, nil
180 default:
181 star.SubjectType = models.StarSubjectString
182 star.Subject = *legacy.Subject
183 return true, nil
184 }
185
186 default:
187 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
188 }
189}
190
191func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
192 var err error
193 did := e.Did
194
195 l = l.With("handler", "ingestStar")
196
197 switch e.Commit.Operation {
198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
199 raw := json.RawMessage(e.Commit.Record)
200 record := tangled.FeedStar{}
201 unmarshalErr := json.Unmarshal(raw, &record)
202
203 star := &models.Star{
204 Did: did,
205 Rkey: e.Commit.RKey,
206 }
207
208 switch {
209 case unmarshalErr != nil:
210 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
211 if resolveErr != nil {
212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
213 return unmarshalErr
214 }
215 if !resolved {
216 return nil
217 }
218
219 case record.Subject == nil:
220 return fmt.Errorf("star record has nil subject")
221
222 case record.Subject.FeedStar_Repo != nil:
223 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
224 if repoErr != nil {
225 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
226 return nil
227 }
228 star.SubjectType = models.StarSubjectRepo
229 star.Subject = repo.RepoDid
230
231 case record.Subject.FeedStar_String != nil:
232 star.SubjectType = models.StarSubjectString
233 star.Subject = record.Subject.FeedStar_String.Uri
234
235 default:
236 return fmt.Errorf("star record has empty subject union")
237 }
238
239 err = db.AddStar(i.Db, star)
240 case jmodels.CommitOperationDelete:
241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
242 }
243
244 if err != nil {
245 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
246 }
247
248 l.Info("ingested record")
249 return nil
250}
251
252func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error {
253 var err error
254 did := e.Did
255
256 l = l.With("handler", "ingestFollow")
257
258 switch e.Commit.Operation {
259 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
260 raw := json.RawMessage(e.Commit.Record)
261 record := tangled.GraphFollow{}
262 err = json.Unmarshal(raw, &record)
263 if err != nil {
264 l.Error("invalid record", "err", err)
265 return err
266 }
267
268 err = db.AddFollow(i.Db, &models.Follow{
269 UserDid: did,
270 SubjectDid: record.Subject,
271 Rkey: e.Commit.RKey,
272 })
273 case jmodels.CommitOperationDelete:
274 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
275 }
276
277 if err != nil {
278 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
279 }
280
281 l.Info("ingested record")
282 return nil
283}
284
285func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
286 var err error
287 did := e.Did
288
289 l = l.With("handler", "ingestVouch")
290
291 switch e.Commit.Operation {
292 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
293 raw := json.RawMessage(e.Commit.Record)
294 record := tangled.GraphVouch{}
295 err = json.Unmarshal(raw, &record)
296 if err != nil {
297 l.Error("invalid record", "err", err)
298 return err
299 }
300
301 // rkey is the subject_did being vouched for/denounced
302 subjectDID := e.Commit.RKey
303
304 _, err = syntax.ParseDID(subjectDID)
305 if err != nil {
306 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
307 return fmt.Errorf("invalid subject_did: %w", err)
308 }
309
310 if did == subjectDID {
311 l.Warn("attempted self-vouch", "did", did)
312 return fmt.Errorf("cannot vouch for self")
313 }
314
315 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
316 if err != nil {
317 return err
318 }
319
320 if subjectId.Handle.IsInvalidHandle() {
321 return err
322 }
323
324 kind, err := models.ParseVouchKind(record.Kind)
325 if err != nil {
326 l.Error("invalid kind", "kind", kind)
327 return fmt.Errorf("invalid kind: %s", kind)
328 }
329
330 recordCid, err := cid.Parse(e.Commit.CID)
331 if err != nil {
332 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
333 return fmt.Errorf("invalid cid: %w", err)
334 }
335
336 var evidences []syntax.ATURI
337 for _, raw := range record.Evidences {
338 uri, parseErr := syntax.ParseATURI(raw)
339 if parseErr != nil {
340 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
341 continue
342 }
343 evidences = append(evidences, uri)
344 }
345
346 tx, txErr := i.Db.Begin()
347 if txErr != nil {
348 return fmt.Errorf("failed to start transaction: %w", txErr)
349 }
350
351 addErr := db.AddVouch(tx, &models.Vouch{
352 Did: syntax.DID(did),
353 SubjectDid: subjectId.DID,
354 Cid: recordCid,
355 Kind: kind,
356 Reason: record.Reason,
357 Evidences: evidences,
358 })
359 if addErr != nil {
360 tx.Rollback()
361 err = addErr
362 } else {
363 err = tx.Commit()
364 }
365
366 case jmodels.CommitOperationDelete:
367 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
368 }
369
370 if err != nil {
371 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
372 }
373
374 l.Info("ingested record")
375 return nil
376}
377
378func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error {
379 did := e.Did
380 var err error
381
382 l = l.With("handler", "ingestPublicKey")
383
384 switch e.Commit.Operation {
385 case jmodels.CommitOperationCreate:
386 l.Debug("processing add of pubkey")
387 raw := json.RawMessage(e.Commit.Record)
388 record := tangled.PublicKey{}
389 err = json.Unmarshal(raw, &record)
390 if err != nil {
391 l.Error("invalid record", "err", err)
392 return err
393 }
394
395 name := record.Name
396 key := record.Key
397 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
398 case jmodels.CommitOperationUpdate:
399 l.Debug("processing update of pubkey")
400 raw := json.RawMessage(e.Commit.Record)
401 record := tangled.PublicKey{}
402 err = json.Unmarshal(raw, &record)
403 if err != nil {
404 l.Error("invalid record", "err", err)
405 return err
406 }
407
408 name := record.Name
409 key := record.Key
410 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
411 case jmodels.CommitOperationDelete:
412 l.Debug("processing delete of pubkey")
413 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
414 }
415
416 if err != nil {
417 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
418 }
419
420 l.Info("ingested record")
421 return nil
422}
423
424func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
425 did := e.Did
426 var err error
427
428 l = l.With("handler", "ingestArtifact")
429
430 switch e.Commit.Operation {
431 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
432 raw := json.RawMessage(e.Commit.Record)
433 record := tangled.RepoArtifact{}
434 err = json.Unmarshal(raw, &record)
435 if err != nil {
436 l.Error("invalid record", "err", err)
437 return err
438 }
439
440 var repo *models.Repo
441 if record.RepoDid != nil && *record.RepoDid != "" {
442 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
443 if err != nil && !errors.Is(err, sql.ErrNoRows) {
444 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
445 }
446 }
447 if repo == nil && record.Repo != nil {
448 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
449 if parseErr != nil {
450 return parseErr
451 }
452 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
453 if err != nil {
454 return err
455 }
456 }
457 if repo == nil {
458 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
459 }
460
461 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
462 if err != nil || !ok {
463 return err
464 }
465
466 repoDid := repo.RepoDid
467 if repoDid == "" && record.RepoDid != nil {
468 repoDid = *record.RepoDid
469 }
470 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
471 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
472 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
473 }
474 }
475
476 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
477 if err != nil {
478 createdAt = time.Now()
479 }
480
481 artifact := models.Artifact{
482 Did: did,
483 Rkey: e.Commit.RKey,
484 RepoDid: syntax.DID(repo.RepoDid),
485 Tag: plumbing.Hash(record.Tag),
486 CreatedAt: createdAt,
487 BlobCid: cid.Cid(record.Artifact.Ref),
488 Name: record.Name,
489 Size: uint64(record.Artifact.Size),
490 MimeType: record.Artifact.MimeType,
491 }
492
493 err = db.AddArtifact(i.Db, artifact)
494 case jmodels.CommitOperationDelete:
495 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
496 }
497
498 if err != nil {
499 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
500 }
501
502 l.Info("ingested record")
503 return nil
504}
505
506func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
507 did := e.Did
508 var err error
509
510 l = l.With("handler", "ingestProfile")
511
512 if e.Commit.RKey != "self" {
513 return fmt.Errorf("ingestProfile only ingests `self` record")
514 }
515
516 switch e.Commit.Operation {
517 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
518 raw := json.RawMessage(e.Commit.Record)
519 record := tangled.ActorProfile{}
520 err = json.Unmarshal(raw, &record)
521 if err != nil {
522 l.Error("invalid record", "err", err)
523 return err
524 }
525
526 avatar := ""
527 if record.Avatar != nil {
528 avatar = record.Avatar.Ref.String()
529 }
530
531 description := ""
532 if record.Description != nil {
533 description = *record.Description
534 }
535
536 includeBluesky := record.Bluesky
537
538 pronouns := ""
539 if record.Pronouns != nil {
540 pronouns = *record.Pronouns
541 }
542
543 location := ""
544 if record.Location != nil {
545 location = *record.Location
546 }
547
548 var links [5]string
549 for i, l := range record.Links {
550 if i < 5 {
551 links[i] = l
552 }
553 }
554
555 var stats [2]models.VanityStat
556 for i, s := range record.Stats {
557 if i < 2 {
558 stats[i].Kind = models.ParseVanityStatKind(s)
559 }
560 }
561
562 var pinned [6]string
563 for i, r := range record.PinnedRepositories {
564 if i < 6 {
565 pinned[i] = r
566 }
567 }
568
569 var preferredHandle syntax.Handle
570 if record.PreferredHandle != nil {
571 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
572 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
573 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
574 preferredHandle = h
575 }
576 }
577 }
578
579 profile := models.Profile{
580 Did: did,
581 Avatar: avatar,
582 Description: description,
583 IncludeBluesky: includeBluesky,
584 Location: location,
585 Links: links,
586 Stats: stats,
587 PinnedRepos: pinned,
588 Pronouns: pronouns,
589 PreferredHandle: preferredHandle,
590 }
591
592 tx, err := i.Db.Begin()
593 if err != nil {
594 return fmt.Errorf("failed to start transaction: %w", err)
595 }
596
597 err = db.ValidateProfile(tx, &profile)
598 if err != nil {
599 return fmt.Errorf("invalid profile record")
600 }
601
602 err = db.UpsertProfile(tx, &profile)
603 if err == nil && i.Cache != nil {
604 pipe := i.Cache.Pipeline()
605 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
606 if preferredHandle != "" {
607 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
608 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
609 } else {
610 pipe.Del(ctx, didKey)
611 }
612 if _, execErr := pipe.Exec(ctx); execErr != nil {
613 l.Warn("failed to update preferred handle cache", "err", execErr)
614 }
615 }
616 case jmodels.CommitOperationDelete:
617 tx, beginErr := i.Db.Begin()
618 if beginErr != nil {
619 return fmt.Errorf("failed to start transaction: %w", beginErr)
620 }
621
622 priorHandle, phErr := db.GetPreferredHandle(tx, did)
623 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
624 l.Warn("failed to read prior preferred handle", "err", phErr)
625 }
626
627 err = db.DeleteProfile(tx, did)
628 if err == nil && i.Cache != nil {
629 pipe := i.Cache.Pipeline()
630 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
631 if priorHandle != "" {
632 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
633 }
634 if _, execErr := pipe.Exec(ctx); execErr != nil {
635 l.Warn("failed to evict preferred handle cache", "err", execErr)
636 }
637 }
638 }
639
640 if err != nil {
641 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
642 }
643
644 l.Info("ingested record")
645 return nil
646}
647
648func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
649 did := e.Did
650 var err error
651
652 l = l.With("handler", "ingestSpindleMember")
653
654 switch e.Commit.Operation {
655 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
656 raw := json.RawMessage(e.Commit.Record)
657 record := tangled.SpindleMember{}
658 err = json.Unmarshal(raw, &record)
659 if err != nil {
660 l.Error("invalid record", "err", err)
661 return err
662 }
663
664 // only spindle owner can invite to spindles
665 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
666 if err != nil {
667 return fmt.Errorf("failed to check invite permission: %w", err)
668 }
669 if !ok {
670 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
671 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
672 }
673 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
674 if err != nil {
675 return fmt.Errorf("failed to re-check invite permission: %w", err)
676 }
677 if !ok {
678 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
679 }
680 }
681
682 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
683 if err != nil {
684 return err
685 }
686
687 if memberId.Handle.IsInvalidHandle() {
688 return fmt.Errorf("invalid handle for member %s", record.Subject)
689 }
690
691 existing, err := db.GetSpindleMembers(i.Db,
692 orm.FilterEq("did", did),
693 orm.FilterEq("rkey", e.Commit.RKey),
694 )
695 if err != nil {
696 return fmt.Errorf("failed to look up existing member: %w", err)
697 }
698 if len(existing) > 1 {
699 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
700 }
701
702 tx, err := i.Db.Begin()
703 if err != nil {
704 return fmt.Errorf("failed to start txn: %w", err)
705 }
706 committed := false
707 defer func() {
708 if committed {
709 return
710 }
711 tx.Rollback()
712 i.Enforcer.E.LoadPolicy()
713 }()
714
715 if len(existing) == 1 {
716 prev := existing[0]
717 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
718 if err = db.RemoveSpindleMember(tx,
719 orm.FilterEq("did", did),
720 orm.FilterEq("rkey", e.Commit.RKey),
721 ); err != nil {
722 return fmt.Errorf("failed to remove stale row: %w", err)
723 }
724 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
725 return fmt.Errorf("failed to remove stale ACL: %w", err)
726 }
727 }
728 }
729
730 if err = db.AddSpindleMember(tx, models.SpindleMember{
731 Did: syntax.DID(did),
732 Rkey: e.Commit.RKey,
733 Instance: record.Instance,
734 Subject: memberId.DID,
735 }); err != nil {
736 return fmt.Errorf("failed to add to db: %w", err)
737 }
738
739 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
740 return fmt.Errorf("failed to update ACLs: %w", err)
741 }
742
743 if err = tx.Commit(); err != nil {
744 return fmt.Errorf("failed to commit txn: %w", err)
745 }
746
747 if err = i.Enforcer.E.SavePolicy(); err != nil {
748 return fmt.Errorf("failed to save ACLs: %w", err)
749 }
750 committed = true
751
752 l.Info("upserted spindle member")
753 case jmodels.CommitOperationDelete:
754 rkey := e.Commit.RKey
755
756 // get record from db first
757 members, err := db.GetSpindleMembers(
758 i.Db,
759 orm.FilterEq("did", did),
760 orm.FilterEq("rkey", rkey),
761 )
762 if err != nil || len(members) != 1 {
763 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
764 }
765 member := members[0]
766
767 tx, err := i.Db.Begin()
768 if err != nil {
769 return fmt.Errorf("failed to start txn: %w", err)
770 }
771 committed := false
772 defer func() {
773 if committed {
774 return
775 }
776 tx.Rollback()
777 i.Enforcer.E.LoadPolicy()
778 }()
779
780 // remove record by rkey && update enforcer
781 if err = db.RemoveSpindleMember(
782 tx,
783 orm.FilterEq("did", did),
784 orm.FilterEq("rkey", rkey),
785 ); err != nil {
786 return fmt.Errorf("failed to remove from db: %w", err)
787 }
788
789 // update enforcer
790 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
791 if err != nil {
792 return fmt.Errorf("failed to update ACLs: %w", err)
793 }
794
795 if err = tx.Commit(); err != nil {
796 return fmt.Errorf("failed to commit txn: %w", err)
797 }
798
799 if err = i.Enforcer.E.SavePolicy(); err != nil {
800 return fmt.Errorf("failed to save ACLs: %w", err)
801 }
802 committed = true
803
804 l.Info("removed spindle member")
805 }
806
807 return nil
808}
809
810func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
811 did := e.Did
812 var err error
813
814 l = l.With("handler", "ingestSpindle")
815
816 switch e.Commit.Operation {
817 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
818 raw := json.RawMessage(e.Commit.Record)
819 record := tangled.Spindle{}
820 err = json.Unmarshal(raw, &record)
821 if err != nil {
822 l.Error("invalid record", "err", err)
823 return err
824 }
825
826 instance := e.Commit.RKey
827
828 err := db.AddSpindle(i.Db, models.Spindle{
829 Owner: syntax.DID(did),
830 Instance: instance,
831 })
832 if err != nil {
833 l.Error("failed to add spindle to db", "err", err, "instance", instance)
834 return err
835 }
836
837 if err := i.verifySpindle(ctx, instance, did); err != nil {
838 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
839 }
840
841 l.Info("ingested record", "instance", instance)
842 return nil
843
844 case jmodels.CommitOperationDelete:
845 instance := e.Commit.RKey
846
847 // get record from db first
848 spindles, err := db.GetSpindles(
849 ctx,
850 i.Db,
851 orm.FilterEq("owner", did),
852 orm.FilterEq("instance", instance),
853 )
854 if err != nil || len(spindles) != 1 {
855 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
856 }
857 spindle := spindles[0]
858
859 tx, err := i.Db.Begin()
860 if err != nil {
861 return fmt.Errorf("failed to start txn: %w", err)
862 }
863 defer func() {
864 tx.Rollback()
865 i.Enforcer.E.LoadPolicy()
866 }()
867
868 // remove spindle members first
869 err = db.RemoveSpindleMember(
870 tx,
871 orm.FilterEq("owner", did),
872 orm.FilterEq("instance", instance),
873 )
874 if err != nil {
875 return fmt.Errorf("failed to remove spindle members: %w", err)
876 }
877
878 err = db.DeleteSpindle(
879 tx,
880 orm.FilterEq("owner", did),
881 orm.FilterEq("instance", instance),
882 )
883 if err != nil {
884 return fmt.Errorf("failed to delete spindle: %w", err)
885 }
886
887 if spindle.Verified != nil {
888 err = i.Enforcer.RemoveSpindle(instance)
889 if err != nil {
890 return fmt.Errorf("failed to remove spindle from enforcer: %w", err)
891 }
892 }
893
894 err = tx.Commit()
895 if err != nil {
896 return fmt.Errorf("failed to commit txn: %w", err)
897 }
898
899 err = i.Enforcer.E.SavePolicy()
900 if err != nil {
901 return fmt.Errorf("failed to save ACLs: %w", err)
902 }
903
904 l.Info("ingested record", "instance", instance)
905 }
906
907 return nil
908}
909
910func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error {
911 did := e.Did
912 rkey := e.Commit.RKey
913
914 var err error
915
916 l = l.With("handler", "ingestString")
917
918 switch e.Commit.Operation {
919 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
920 raw := json.RawMessage(e.Commit.Record)
921 record := tangled.String{}
922 err = json.Unmarshal(raw, &record)
923 if err != nil {
924 l.Error("invalid record", "err", err)
925 return err
926 }
927
928 string := models.StringFromRecord(did, rkey, record)
929
930 if err = i.Validator.ValidateString(&string); err != nil {
931 l.Error("invalid record", "err", err)
932 return err
933 }
934
935 if err = db.AddString(i.Db, string); err != nil {
936 l.Error("failed to add string", "err", err)
937 return err
938 }
939
940 l.Info("ingested record")
941 return nil
942
943 case jmodels.CommitOperationDelete:
944 if err := db.DeleteString(
945 i.Db,
946 orm.FilterEq("did", did),
947 orm.FilterEq("rkey", rkey),
948 ); err != nil {
949 l.Error("failed to delete", "err", err)
950 return fmt.Errorf("failed to delete string record: %w", err)
951 }
952
953 l.Info("ingested record")
954 return nil
955 }
956
957 return nil
958}
959
960func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
961 did := e.Did
962 var err error
963
964 l = l.With("handler", "ingestKnotMember")
965
966 switch e.Commit.Operation {
967 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
968 raw := json.RawMessage(e.Commit.Record)
969 record := tangled.KnotMember{}
970 err = json.Unmarshal(raw, &record)
971 if err != nil {
972 l.Error("invalid record", "err", err)
973 return err
974 }
975
976 // only knot owner can invite to knots
977 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
978 if err != nil {
979 return fmt.Errorf("failed to check invite permission: %w", err)
980 }
981 if !ok {
982 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
983 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
984 }
985 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
986 if err != nil {
987 return fmt.Errorf("failed to re-check invite permission: %w", err)
988 }
989 if !ok {
990 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
991 }
992 }
993
994 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
995 if err != nil {
996 return err
997 }
998
999 if memberId.Handle.IsInvalidHandle() {
1000 return fmt.Errorf("invalid handle for member %s", record.Subject)
1001 }
1002
1003 existing, err := db.GetKnotMembers(i.Db,
1004 orm.FilterEq("did", did),
1005 orm.FilterEq("rkey", e.Commit.RKey),
1006 )
1007 if err != nil {
1008 return fmt.Errorf("failed to look up existing member: %w", err)
1009 }
1010 if len(existing) > 1 {
1011 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1012 }
1013
1014 tx, err := i.Db.Begin()
1015 if err != nil {
1016 return fmt.Errorf("failed to start txn: %w", err)
1017 }
1018 committed := false
1019 defer func() {
1020 if committed {
1021 return
1022 }
1023 tx.Rollback()
1024 i.Enforcer.E.LoadPolicy()
1025 }()
1026
1027 if len(existing) == 1 {
1028 prev := existing[0]
1029 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1030 if err = db.RemoveKnotMember(tx,
1031 orm.FilterEq("did", did),
1032 orm.FilterEq("rkey", e.Commit.RKey),
1033 ); err != nil {
1034 return fmt.Errorf("failed to remove stale row: %w", err)
1035 }
1036 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1037 return fmt.Errorf("failed to remove stale ACL: %w", err)
1038 }
1039 }
1040 }
1041
1042 if err = db.AddKnotMember(tx, models.KnotMember{
1043 Did: syntax.DID(did),
1044 Rkey: e.Commit.RKey,
1045 Domain: record.Domain,
1046 Subject: memberId.DID,
1047 }); err != nil {
1048 return fmt.Errorf("failed to add to db: %w", err)
1049 }
1050
1051 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1052 return fmt.Errorf("failed to update ACLs: %w", err)
1053 }
1054
1055 if err = tx.Commit(); err != nil {
1056 return fmt.Errorf("failed to commit txn: %w", err)
1057 }
1058
1059 if err = i.Enforcer.E.SavePolicy(); err != nil {
1060 return fmt.Errorf("failed to save ACLs: %w", err)
1061 }
1062 committed = true
1063
1064 l.Info("upserted knot member")
1065 case jmodels.CommitOperationDelete:
1066 rkey := e.Commit.RKey
1067
1068 members, err := db.GetKnotMembers(
1069 i.Db,
1070 orm.FilterEq("did", did),
1071 orm.FilterEq("rkey", rkey),
1072 )
1073 if err != nil {
1074 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1075 }
1076 if len(members) == 0 {
1077 l.Info("knot member already removed", "rkey", rkey)
1078 return nil
1079 }
1080 if len(members) > 1 {
1081 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1082 }
1083 member := members[0]
1084
1085 tx, err := i.Db.Begin()
1086 if err != nil {
1087 return fmt.Errorf("failed to start txn: %w", err)
1088 }
1089 committed := false
1090 defer func() {
1091 if committed {
1092 return
1093 }
1094 tx.Rollback()
1095 i.Enforcer.E.LoadPolicy()
1096 }()
1097
1098 if err = db.RemoveKnotMember(
1099 tx,
1100 orm.FilterEq("did", did),
1101 orm.FilterEq("rkey", rkey),
1102 ); err != nil {
1103 return fmt.Errorf("failed to remove from db: %w", err)
1104 }
1105
1106 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1107 return fmt.Errorf("failed to update ACLs: %w", err)
1108 }
1109
1110 if err = tx.Commit(); err != nil {
1111 return fmt.Errorf("failed to commit txn: %w", err)
1112 }
1113
1114 if err = i.Enforcer.E.SavePolicy(); err != nil {
1115 return fmt.Errorf("failed to save ACLs: %w", err)
1116 }
1117 committed = true
1118
1119 l.Info("removed knot member")
1120 }
1121
1122 return nil
1123}
1124
1125func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1126 did := e.Did
1127 var err error
1128
1129 l = l.With("handler", "ingestKnot")
1130
1131 switch e.Commit.Operation {
1132 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1133 raw := json.RawMessage(e.Commit.Record)
1134 record := tangled.Knot{}
1135 err = json.Unmarshal(raw, &record)
1136 if err != nil {
1137 l.Error("invalid record", "err", err)
1138 return err
1139 }
1140
1141 domain := e.Commit.RKey
1142
1143 err := db.AddKnot(i.Db, domain, did)
1144 if err != nil {
1145 l.Error("failed to add knot to db", "err", err, "domain", domain)
1146 return err
1147 }
1148
1149 if err := i.verifyKnot(ctx, domain, did); err != nil {
1150 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1151 }
1152
1153 l.Info("ingested record", "domain", domain)
1154 return nil
1155
1156 case jmodels.CommitOperationDelete:
1157 domain := e.Commit.RKey
1158
1159 // get record from db first
1160 registrations, err := db.GetRegistrations(
1161 i.Db,
1162 orm.FilterEq("domain", domain),
1163 orm.FilterEq("did", did),
1164 )
1165 if err != nil {
1166 return fmt.Errorf("failed to get registration: %w", err)
1167 }
1168 if len(registrations) != 1 {
1169 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1170 }
1171 registration := registrations[0]
1172
1173 tx, err := i.Db.Begin()
1174 if err != nil {
1175 return fmt.Errorf("failed to start txn: %w", err)
1176 }
1177 defer func() {
1178 tx.Rollback()
1179 i.Enforcer.E.LoadPolicy()
1180 }()
1181
1182 err = db.RemoveKnotMember(
1183 tx,
1184 orm.FilterEq("did", did),
1185 orm.FilterEq("domain", domain),
1186 )
1187 if err != nil {
1188 return fmt.Errorf("failed to remove knot members: %w", err)
1189 }
1190
1191 err = db.DeleteKnot(
1192 tx,
1193 orm.FilterEq("did", did),
1194 orm.FilterEq("domain", domain),
1195 )
1196 if err != nil {
1197 return fmt.Errorf("failed to delete knot: %w", err)
1198 }
1199
1200 err = db.RemoveReposByKnot(tx, domain)
1201 if err != nil {
1202 return fmt.Errorf("failed to remove repos by knot: %w", err)
1203 }
1204
1205 if registration.Registered != nil {
1206 err = i.Enforcer.RemoveKnot(domain)
1207 if err != nil {
1208 return fmt.Errorf("failed to remove knot from enforcer: %w", err)
1209 }
1210 }
1211
1212 err = tx.Commit()
1213 if err != nil {
1214 return fmt.Errorf("failed to commit txn: %w", err)
1215 }
1216
1217 err = i.Enforcer.E.SavePolicy()
1218 if err != nil {
1219 return fmt.Errorf("failed to save ACLs: %w", err)
1220 }
1221
1222 l.Info("ingested record", "domain", domain)
1223 }
1224
1225 return nil
1226}
1227
1228const (
1229 verifyAttempts = 4
1230 verifyMinDelay = 1 * time.Second
1231 verifyMaxDelay = 5 * time.Second
1232)
1233
1234func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1235 regs, err := db.GetRegistrations(i.Db,
1236 orm.FilterEq("domain", domain),
1237 orm.FilterEq("did", did),
1238 )
1239 if err != nil {
1240 return fmt.Errorf("look up registration: %w", err)
1241 }
1242 if len(regs) != 1 {
1243 return fmt.Errorf("no registration for %s by %s", domain, did)
1244 }
1245 if regs[0].Registered != nil {
1246 return nil
1247 }
1248
1249 err = retry.Do(
1250 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1251 retry.Context(ctx),
1252 retry.Attempts(verifyAttempts),
1253 retry.Delay(verifyMinDelay),
1254 retry.MaxDelay(verifyMaxDelay),
1255 retry.DelayType(retry.BackOffDelay),
1256 retry.LastErrorOnly(true),
1257 )
1258 if err != nil {
1259 return fmt.Errorf("verify: %w", err)
1260 }
1261 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1262}
1263
1264func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1265 spindles, err := db.GetSpindles(ctx, i.Db,
1266 orm.FilterEq("instance", instance),
1267 orm.FilterEq("owner", did),
1268 )
1269 if err != nil {
1270 return fmt.Errorf("look up spindle: %w", err)
1271 }
1272 if len(spindles) != 1 {
1273 return fmt.Errorf("no spindle for %s by %s", instance, did)
1274 }
1275 if spindles[0].Verified != nil {
1276 return nil
1277 }
1278
1279 err = retry.Do(
1280 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1281 retry.Context(ctx),
1282 retry.Attempts(verifyAttempts),
1283 retry.Delay(verifyMinDelay),
1284 retry.MaxDelay(verifyMaxDelay),
1285 retry.DelayType(retry.BackOffDelay),
1286 retry.LastErrorOnly(true),
1287 )
1288 if err != nil {
1289 return fmt.Errorf("verify: %w", err)
1290 }
1291 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1292 return err
1293}
1294
1295const sweepConcurrency = 4
1296
1297func (i *Ingester) SweepPendingVerifications() {
1298 l := i.Logger.With("handler", "SweepPendingVerifications")
1299
1300 var g errgroup.Group
1301 g.SetLimit(sweepConcurrency)
1302
1303 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1304 if err != nil {
1305 l.Error("failed to list unverified knots", "err", err)
1306 } else {
1307 for _, reg := range regs {
1308 g.Go(func() error {
1309 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1310 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1311 }
1312 return nil
1313 })
1314 }
1315 }
1316
1317 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1318 if err != nil {
1319 l.Error("failed to list unverified spindles", "err", err)
1320 g.Wait()
1321 return
1322 }
1323 for _, s := range spindles {
1324 g.Go(func() error {
1325 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1326 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1327 }
1328 return nil
1329 })
1330 }
1331 g.Wait()
1332}
1333
1334func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1335 did := e.Did
1336 rkey := e.Commit.RKey
1337
1338 var err error
1339
1340 l = l.With("handler", "ingestIssue")
1341
1342 switch e.Commit.Operation {
1343 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1344 raw := json.RawMessage(e.Commit.Record)
1345 record := tangled.RepoIssue{}
1346 err = json.Unmarshal(raw, &record)
1347 if err != nil {
1348 l.Error("invalid record", "err", err)
1349 return err
1350 }
1351
1352 issue := models.IssueFromRecord(did, rkey, record)
1353
1354 if issue.RepoDid == "" {
1355 return fmt.Errorf("issue record has no repo field")
1356 }
1357 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1358 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1359 }
1360
1361 if err := i.Validator.ValidateIssue(&issue); err != nil {
1362 return fmt.Errorf("failed to validate issue: %w", err)
1363 }
1364
1365 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1366 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1367 if repoErr == nil && repo.RepoDid != "" {
1368 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1369 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1370 }
1371 }
1372 }
1373
1374 tx, err := i.Db.BeginTx(ctx, nil)
1375 if err != nil {
1376 l.Error("failed to begin transaction", "err", err)
1377 return err
1378 }
1379 defer tx.Rollback()
1380
1381 err = db.PutIssue(tx, &issue)
1382 if err != nil {
1383 l.Error("failed to create issue", "err", err)
1384 return err
1385 }
1386
1387 err = tx.Commit()
1388 if err != nil {
1389 l.Error("failed to commit txn", "err", err)
1390 return err
1391 }
1392
1393 l.Info("ingested record")
1394 return nil
1395
1396 case jmodels.CommitOperationDelete:
1397 tx, err := i.Db.BeginTx(ctx, nil)
1398 if err != nil {
1399 l.Error("failed to begin transaction", "err", err)
1400 return err
1401 }
1402 defer tx.Rollback()
1403
1404 if err := db.DeleteIssues(
1405 tx,
1406 did,
1407 rkey,
1408 ); err != nil {
1409 l.Error("failed to delete", "err", err)
1410 return fmt.Errorf("failed to delete issue record: %w", err)
1411 }
1412 if err := tx.Commit(); err != nil {
1413 l.Error("failed to commit txn", "err", err)
1414 return err
1415 }
1416
1417 l.Info("ingested record")
1418 return nil
1419 }
1420
1421 return nil
1422}
1423
1424func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1425 did := e.Did
1426 rkey := e.Commit.RKey
1427
1428 var err error
1429
1430 l = l.With("handler", "ingestPull")
1431
1432 switch e.Commit.Operation {
1433 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1434 raw := json.RawMessage(e.Commit.Record)
1435 record := tangled.RepoPull{}
1436 err = json.Unmarshal(raw, &record)
1437 if err != nil {
1438 l.Error("invalid record", "err", err)
1439 return err
1440 }
1441
1442 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1443 if err != nil {
1444 l.Error("failed to resolve did", "err", err)
1445 return err
1446 }
1447
1448 // go through and fetch all blobs in parallel
1449 readers := make([]*io.ReadCloser, len(record.Rounds))
1450 var mu sync.Mutex
1451
1452 g, gctx := errgroup.WithContext(ctx)
1453
1454 for idx, b := range record.Rounds {
1455 g.Go(func() error {
1456 // for some reason, a blob is empty
1457 if b.PatchBlob == nil {
1458 return fmt.Errorf("missing patchBlob in round %d", idx)
1459 }
1460
1461 ownerPds := ownerId.PDSEndpoint()
1462 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1463 q := url.Query()
1464 q.Set("cid", b.PatchBlob.Ref.String())
1465 q.Set("did", did)
1466 url.RawQuery = q.Encode()
1467
1468 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1469 if err != nil {
1470 l.Error("failed to create request")
1471 return err
1472 }
1473 req.Header.Set("Content-Type", "application/json")
1474
1475 resp, err := http.DefaultClient.Do(req)
1476 if err != nil {
1477 l.Error("failed to make request")
1478 return err
1479 }
1480
1481 mu.Lock()
1482 readers[idx] = &resp.Body
1483 mu.Unlock()
1484
1485 return nil
1486 })
1487 }
1488
1489 if err := g.Wait(); err != nil {
1490 for _, r := range readers {
1491 if r != nil && *r != nil {
1492 (*r).Close()
1493 }
1494 }
1495 return err
1496 }
1497
1498 defer func() {
1499 for _, r := range readers {
1500 if r != nil && *r != nil {
1501 (*r).Close()
1502 }
1503 }
1504 }()
1505
1506 pull, err := models.PullFromRecord(did, rkey, record, readers)
1507 if err != nil {
1508 return fmt.Errorf("failed to parse pull from record: %w", err)
1509 }
1510 if err := i.Validator.ValidatePull(pull); err != nil {
1511 return fmt.Errorf("failed to validate pull: %w", err)
1512 }
1513
1514 tx, err := i.Db.BeginTx(ctx, nil)
1515 if err != nil {
1516 l.Error("failed to begin transaction", "err", err)
1517 return err
1518 }
1519 defer tx.Rollback()
1520
1521 err = db.PutPull(tx, pull)
1522 if err != nil {
1523 l.Error("failed to create pull", "err", err)
1524 return err
1525 }
1526
1527 err = tx.Commit()
1528 if err != nil {
1529 l.Error("failed to commit txn", "err", err)
1530 return err
1531 }
1532
1533 l.Info("ingested record")
1534 return nil
1535
1536 case jmodels.CommitOperationDelete:
1537 tx, err := i.Db.BeginTx(ctx, nil)
1538 if err != nil {
1539 l.Error("failed to begin transaction", "err", err)
1540 return err
1541 }
1542 defer tx.Rollback()
1543
1544 if err := db.AbandonPulls(
1545 tx,
1546 orm.FilterEq("owner_did", did),
1547 orm.FilterEq("rkey", rkey),
1548 ); err != nil {
1549 l.Error("failed to abandon", "err", err)
1550 return fmt.Errorf("failed to abandon pull record: %w", err)
1551 }
1552 if err := tx.Commit(); err != nil {
1553 l.Error("failed to commit txn", "err", err)
1554 return err
1555 }
1556
1557 l.Info("ingested record")
1558 return nil
1559 }
1560
1561 return nil
1562}
1563
1564// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1565func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error {
1566 l = l.With("handler", "ingestIssueComment")
1567
1568 switch e.Commit.Operation {
1569 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1570 // no-op. sh.tangled.repo.issue.comment is deprecated
1571
1572 case jmodels.CommitOperationDelete:
1573 if err := db.PurgeComments(
1574 i.Db,
1575 orm.FilterEq("did", e.Did),
1576 orm.FilterEq("collection", e.Commit.Collection),
1577 orm.FilterEq("rkey", e.Commit.RKey),
1578 ); err != nil {
1579 return fmt.Errorf("failed to delete comment record: %w", err)
1580 }
1581 }
1582
1583 l.Info("ingested record")
1584 return nil
1585}
1586
1587// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1588func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error {
1589 l = l.With("handler", "ingestPullComment")
1590
1591 switch e.Commit.Operation {
1592 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1593 // no-op. sh.tangled.repo.pull.comment is deprecated
1594
1595 case jmodels.CommitOperationDelete:
1596 if err := db.PurgeComments(
1597 i.Db,
1598 orm.FilterEq("did", e.Did),
1599 orm.FilterEq("collection", e.Commit.Collection),
1600 orm.FilterEq("rkey", e.Commit.RKey),
1601 ); err != nil {
1602 return fmt.Errorf("failed to delete comment record: %w", err)
1603 }
1604 }
1605
1606 l.Info("ingested record")
1607 return nil
1608}
1609
1610func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error {
1611 did := e.Did
1612 rkey := e.Commit.RKey
1613 cid := e.Commit.CID
1614
1615 var err error
1616
1617 l = l.With("handler", "ingestComment")
1618
1619 ctx := context.Background()
1620
1621 switch e.Commit.Operation {
1622 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1623 raw := json.RawMessage(e.Commit.Record)
1624 record := tangled.FeedComment{}
1625 err = json.Unmarshal(raw, &record)
1626 if err != nil {
1627 return fmt.Errorf("invalid record: %w", err)
1628 }
1629
1630 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1631 if err != nil {
1632 return fmt.Errorf("failed to parse comment from record: %w", err)
1633 }
1634
1635 if err := comment.Validate(); err != nil {
1636 return fmt.Errorf("failed to validate comment: %w", err)
1637 }
1638
1639 var references []syntax.ATURI
1640 if comment.Body.Original != nil {
1641 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1642 }
1643
1644 tx, err := i.Db.Begin()
1645 if err != nil {
1646 return fmt.Errorf("failed to start transaction: %w", err)
1647 }
1648 defer tx.Rollback()
1649
1650 _, err = db.PutComment(tx, comment, references)
1651 if err != nil {
1652 return fmt.Errorf("failed to create comment: %w", err)
1653 }
1654
1655 if err := tx.Commit(); err != nil {
1656 return err
1657 }
1658
1659 case jmodels.CommitOperationDelete:
1660 if err := db.DeleteComments(
1661 i.Db,
1662 orm.FilterEq("did", did),
1663 orm.FilterEq("collection", e.Commit.Collection),
1664 orm.FilterEq("rkey", rkey),
1665 ); err != nil {
1666 return fmt.Errorf("failed to delete comment record: %w", err)
1667 }
1668 }
1669
1670 l.Info("ingested record")
1671 return nil
1672}
1673
1674func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error {
1675 did := e.Did
1676 rkey := e.Commit.RKey
1677
1678 l = l.With("handler", "ingestReaction")
1679
1680 switch e.Commit.Operation {
1681 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1682 raw := json.RawMessage(e.Commit.Record)
1683 record := tangled.FeedReaction{}
1684 if err := json.Unmarshal(raw, &record); err != nil {
1685 return fmt.Errorf("invalid record: %w", err)
1686 }
1687
1688 subjectUri, err := syntax.ParseATURI(record.Subject)
1689 if err != nil {
1690 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1691 }
1692 subjectUri = models.NormalizeReactionSubject(subjectUri)
1693
1694 kind, ok := models.ParseReactionKind(record.Reaction)
1695 if !ok {
1696 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1697 }
1698
1699 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1700 if parseErr != nil {
1701 created = time.Now()
1702 }
1703
1704 tx, err := i.Db.Begin()
1705 if err != nil {
1706 return fmt.Errorf("failed to start transaction: %w", err)
1707 }
1708 defer tx.Rollback()
1709
1710 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil {
1711 return fmt.Errorf("failed to clear existing reaction: %w", err)
1712 }
1713 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil {
1714 return fmt.Errorf("failed to add reaction: %w", err)
1715 }
1716
1717 if err := tx.Commit(); err != nil {
1718 return err
1719 }
1720
1721 case jmodels.CommitOperationDelete:
1722 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1723 return fmt.Errorf("failed to delete reaction record: %w", err)
1724 }
1725 }
1726
1727 l.Info("ingested record")
1728 return nil
1729}
1730
1731func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error {
1732 did := e.Did
1733 rkey := e.Commit.RKey
1734
1735 var err error
1736
1737 l = l.With("handler", "ingestLabelDefinition")
1738
1739 switch e.Commit.Operation {
1740 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1741 raw := json.RawMessage(e.Commit.Record)
1742 record := tangled.LabelDefinition{}
1743 err = json.Unmarshal(raw, &record)
1744 if err != nil {
1745 return fmt.Errorf("invalid record: %w", err)
1746 }
1747
1748 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1749 if err != nil {
1750 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1751 }
1752
1753 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1754 return fmt.Errorf("failed to validate labeldef: %w", err)
1755 }
1756
1757 _, err = db.AddLabelDefinition(i.Db, def)
1758 if err != nil {
1759 return fmt.Errorf("failed to create labeldef: %w", err)
1760 }
1761
1762 l.Info("ingested record")
1763 return nil
1764
1765 case jmodels.CommitOperationDelete:
1766 if err := db.DeleteLabelDefinition(
1767 i.Db,
1768 orm.FilterEq("did", did),
1769 orm.FilterEq("rkey", rkey),
1770 ); err != nil {
1771 return fmt.Errorf("failed to delete labeldef record: %w", err)
1772 }
1773
1774 l.Info("ingested record")
1775 return nil
1776 }
1777
1778 return nil
1779}
1780
1781func (i *Ingester) ingestLabelOp(e *jmodels.Event, l *slog.Logger) error {
1782 did := e.Did
1783 rkey := e.Commit.RKey
1784
1785 var err error
1786
1787 l = l.With("handler", "ingestLabelOp")
1788
1789 switch e.Commit.Operation {
1790 case jmodels.CommitOperationCreate:
1791 raw := json.RawMessage(e.Commit.Record)
1792 record := tangled.LabelOp{}
1793 err = json.Unmarshal(raw, &record)
1794 if err != nil {
1795 return fmt.Errorf("invalid record: %w", err)
1796 }
1797
1798 subject := syntax.ATURI(record.Subject)
1799 collection := subject.Collection()
1800
1801 var repo *models.Repo
1802 switch collection {
1803 case tangled.RepoIssueNSID:
1804 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1805 if err != nil || len(i) != 1 {
1806 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1807 }
1808 repo = i[0].Repo
1809 case tangled.RepoPullNSID:
1810 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1811 if err != nil || len(p) != 1 {
1812 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1813 }
1814 repo = p[0].Repo
1815 default:
1816 return fmt.Errorf("unsupported label subject: %s", collection)
1817 }
1818
1819 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1820 if err != nil {
1821 return fmt.Errorf("failed to build label application ctx: %w", err)
1822 }
1823
1824 ops := models.LabelOpsFromRecord(did, rkey, record)
1825
1826 for _, o := range ops {
1827 def, ok := actx.Defs[o.OperandKey]
1828 if !ok {
1829 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1830 }
1831 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1832 return fmt.Errorf("failed to validate labelop: %w", err)
1833 }
1834 }
1835
1836 tx, err := i.Db.Begin()
1837 if err != nil {
1838 return err
1839 }
1840 defer tx.Rollback()
1841
1842 for _, o := range ops {
1843 _, err = db.AddLabelOp(tx, &o)
1844 if err != nil {
1845 return fmt.Errorf("failed to add labelop: %w", err)
1846 }
1847 }
1848
1849 if err = tx.Commit(); err != nil {
1850 return err
1851 }
1852
1853 l.Info("ingested record")
1854 }
1855
1856 return nil
1857}