Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/models"
31 "tangled.org/core/appview/notify"
32 "tangled.org/core/appview/repoverify"
33 "tangled.org/core/appview/serververify"
34 "tangled.org/core/appview/validator"
35 "tangled.org/core/idresolver"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38)
39
40type Ingester struct {
41 Db *db.DB
42 Enforcer *rbac.Enforcer
43 IdResolver *idresolver.Resolver
44 Cache *cache.Cache
45 Config *config.Config
46 Logger *slog.Logger
47 Validator *validator.Validator
48 Notifier notify.Notifier
49 Verifier repoverify.Verifier
50}
51
52type processFunc func(ctx context.Context, e *jmodels.Event) error
53
54func (i *Ingester) Ingest() processFunc {
55 return func(ctx context.Context, e *jmodels.Event) error {
56 var err error
57
58 l := i.Logger.With("kind", e.Kind)
59 switch e.Kind {
60 case jmodels.EventKindAccount:
61 // TODO: sync account state to db
62 if e.Account.Active {
63 break
64 }
65 // TODO: revoke sessions by DID
66 if *e.Account.Status == "deactivated" {
67 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
68 }
69 case jmodels.EventKindIdentity:
70 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
71 case jmodels.EventKindCommit:
72 switch e.Commit.Collection {
73 case tangled.GraphFollowNSID:
74 err = i.ingestFollow(e)
75 case tangled.GraphVouchNSID:
76 err = i.ingestVouch(ctx, e)
77 case tangled.FeedStarNSID:
78 err = i.ingestStar(ctx, e)
79 case tangled.PublicKeyNSID:
80 err = i.ingestPublicKey(e)
81 case tangled.RepoArtifactNSID:
82 err = i.ingestArtifact(ctx, e)
83 case tangled.ActorProfileNSID:
84 err = i.ingestProfile(ctx, e)
85 case tangled.SpindleMemberNSID:
86 err = i.ingestSpindleMember(ctx, e)
87 case tangled.SpindleNSID:
88 err = i.ingestSpindle(ctx, e)
89 case tangled.KnotMemberNSID:
90 err = i.ingestKnotMember(e)
91 case tangled.KnotNSID:
92 err = i.ingestKnot(e)
93 case tangled.StringNSID:
94 err = i.ingestString(e)
95 case tangled.RepoIssueNSID:
96 err = i.ingestIssue(ctx, e)
97 case tangled.RepoPullNSID:
98 err = i.ingestPull(ctx, e)
99 case tangled.RepoIssueCommentNSID:
100 err = i.ingestIssueComment(e)
101 case tangled.LabelDefinitionNSID:
102 err = i.ingestLabelDefinition(e)
103 case tangled.LabelOpNSID:
104 err = i.ingestLabelOp(e)
105 case tangled.RepoNSID:
106 err = i.ingestRepo(ctx, e)
107 }
108 l = i.Logger.With("nsid", e.Commit.Collection)
109 }
110
111 if err != nil {
112 l.Warn("failed to ingest record, skipping", "err", err)
113 }
114
115 lastTimeUs := e.TimeUS + 1
116 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
117 l.Error("failed to save cursor", "err", saveErr)
118 }
119
120 return nil
121 }
122}
123
124func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
125 if strings.HasPrefix(ref, "did:") {
126 return db.GetRepoByDid(i.Db, ref)
127 }
128 return db.GetRepoByAtUri(i.Db, ref)
129}
130
131func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
132 var legacy struct {
133 Subject *string `json:"subject"`
134 SubjectDid *string `json:"subjectDid"`
135 }
136 if err := json.Unmarshal(raw, &legacy); err != nil {
137 return false, err
138 }
139
140 switch {
141 case legacy.SubjectDid != nil:
142 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
143 if err != nil {
144 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
145 return false, nil
146 }
147 star.SubjectType = models.StarSubjectRepo
148 star.Subject = repo.RepoDid
149 return true, nil
150
151 case legacy.Subject != nil:
152 uri, err := syntax.ParseATURI(*legacy.Subject)
153 if err != nil {
154 return false, fmt.Errorf("invalid old-format star subject: %w", err)
155 }
156 switch uri.Collection().String() {
157 case tangled.RepoNSID:
158 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
159 if err != nil {
160 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
161 return false, nil
162 }
163 star.SubjectType = models.StarSubjectRepo
164 star.Subject = repo.RepoDid
165 return true, nil
166 default:
167 star.SubjectType = models.StarSubjectString
168 star.Subject = *legacy.Subject
169 return true, nil
170 }
171
172 default:
173 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
174 }
175}
176
177func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
178 var err error
179 did := e.Did
180
181 l := i.Logger.With("handler", "ingestStar")
182 l = l.With("nsid", e.Commit.Collection)
183
184 switch e.Commit.Operation {
185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
186 raw := json.RawMessage(e.Commit.Record)
187 record := tangled.FeedStar{}
188 unmarshalErr := json.Unmarshal(raw, &record)
189
190 star := &models.Star{
191 Did: did,
192 Rkey: e.Commit.RKey,
193 }
194
195 switch {
196 case unmarshalErr != nil:
197 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
198 if resolveErr != nil {
199 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
200 return unmarshalErr
201 }
202 if !resolved {
203 return nil
204 }
205
206 case record.Subject == nil:
207 return fmt.Errorf("star record has nil subject")
208
209 case record.Subject.FeedStar_Repo != nil:
210 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
211 if repoErr != nil {
212 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
213 return nil
214 }
215 star.SubjectType = models.StarSubjectRepo
216 star.Subject = repo.RepoDid
217
218 case record.Subject.FeedStar_String != nil:
219 star.SubjectType = models.StarSubjectString
220 star.Subject = record.Subject.FeedStar_String.Uri
221
222 default:
223 return fmt.Errorf("star record has empty subject union")
224 }
225
226 err = db.AddStar(i.Db, star)
227 case jmodels.CommitOperationDelete:
228 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
229 }
230
231 if err != nil {
232 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
233 }
234
235 return nil
236}
237
238func (i *Ingester) ingestFollow(e *jmodels.Event) error {
239 var err error
240 did := e.Did
241
242 l := i.Logger.With("handler", "ingestFollow")
243 l = l.With("nsid", e.Commit.Collection)
244
245 switch e.Commit.Operation {
246 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
247 raw := json.RawMessage(e.Commit.Record)
248 record := tangled.GraphFollow{}
249 err = json.Unmarshal(raw, &record)
250 if err != nil {
251 l.Error("invalid record", "err", err)
252 return err
253 }
254
255 err = db.AddFollow(i.Db, &models.Follow{
256 UserDid: did,
257 SubjectDid: record.Subject,
258 Rkey: e.Commit.RKey,
259 })
260 case jmodels.CommitOperationDelete:
261 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
262 }
263
264 if err != nil {
265 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
266 }
267
268 return nil
269}
270
271func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
272 var err error
273 did := e.Did
274
275 l := i.Logger.With("handler", "ingestVouch")
276 l = l.With("nsid", e.Commit.Collection)
277 l.Info("ingesting vouch")
278
279 switch e.Commit.Operation {
280 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
281 raw := json.RawMessage(e.Commit.Record)
282 record := tangled.GraphVouch{}
283 err = json.Unmarshal(raw, &record)
284 if err != nil {
285 l.Error("invalid record", "err", err)
286 return err
287 }
288
289 // rkey is the subject_did being vouched for/denounced
290 subjectDID := e.Commit.RKey
291
292 _, err = syntax.ParseDID(subjectDID)
293 if err != nil {
294 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
295 return fmt.Errorf("invalid subject_did: %w", err)
296 }
297
298 if did == subjectDID {
299 l.Warn("attempted self-vouch", "did", did)
300 return fmt.Errorf("cannot vouch for self")
301 }
302
303 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
304 if err != nil {
305 return err
306 }
307
308 if subjectId.Handle.IsInvalidHandle() {
309 return err
310 }
311
312 kind, err := models.ParseVouchKind(record.Kind)
313 if err != nil {
314 l.Error("invalid kind", "kind", kind)
315 return fmt.Errorf("invalid kind: %s", kind)
316 }
317
318 recordCid, err := cid.Parse(e.Commit.CID)
319 if err != nil {
320 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
321 return fmt.Errorf("invalid cid: %w", err)
322 }
323
324 var evidences []syntax.ATURI
325 for _, raw := range record.Evidences {
326 uri, parseErr := syntax.ParseATURI(raw)
327 if parseErr != nil {
328 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
329 continue
330 }
331 evidences = append(evidences, uri)
332 }
333
334 tx, txErr := i.Db.Begin()
335 if txErr != nil {
336 return fmt.Errorf("failed to start transaction: %w", txErr)
337 }
338
339 addErr := db.AddVouch(tx, &models.Vouch{
340 Did: syntax.DID(did),
341 SubjectDid: subjectId.DID,
342 Cid: recordCid,
343 Kind: kind,
344 Reason: record.Reason,
345 Evidences: evidences,
346 })
347 if addErr != nil {
348 tx.Rollback()
349 err = addErr
350 } else {
351 err = tx.Commit()
352 }
353
354 case jmodels.CommitOperationDelete:
355 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
356 }
357
358 if err != nil {
359 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
360 }
361
362 return nil
363}
364
365func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
366 did := e.Did
367 var err error
368
369 l := i.Logger.With("handler", "ingestPublicKey")
370 l = l.With("nsid", e.Commit.Collection)
371
372 switch e.Commit.Operation {
373 case jmodels.CommitOperationCreate:
374 l.Debug("processing add of pubkey")
375 raw := json.RawMessage(e.Commit.Record)
376 record := tangled.PublicKey{}
377 err = json.Unmarshal(raw, &record)
378 if err != nil {
379 l.Error("invalid record", "err", err)
380 return err
381 }
382
383 name := record.Name
384 key := record.Key
385 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
386 case jmodels.CommitOperationUpdate:
387 l.Debug("processing update of pubkey")
388 raw := json.RawMessage(e.Commit.Record)
389 record := tangled.PublicKey{}
390 err = json.Unmarshal(raw, &record)
391 if err != nil {
392 l.Error("invalid record", "err", err)
393 return err
394 }
395
396 name := record.Name
397 key := record.Key
398 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
399 case jmodels.CommitOperationDelete:
400 l.Debug("processing delete of pubkey")
401 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
402 }
403
404 if err != nil {
405 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
406 }
407
408 return nil
409}
410
411func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
412 did := e.Did
413 var err error
414
415 l := i.Logger.With("handler", "ingestArtifact")
416 l = l.With("nsid", e.Commit.Collection)
417
418 switch e.Commit.Operation {
419 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
420 raw := json.RawMessage(e.Commit.Record)
421 record := tangled.RepoArtifact{}
422 err = json.Unmarshal(raw, &record)
423 if err != nil {
424 l.Error("invalid record", "err", err)
425 return err
426 }
427
428 var repo *models.Repo
429 if record.RepoDid != nil && *record.RepoDid != "" {
430 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
431 if err != nil && !errors.Is(err, sql.ErrNoRows) {
432 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
433 }
434 }
435 if repo == nil && record.Repo != nil {
436 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
437 if parseErr != nil {
438 return parseErr
439 }
440 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
441 if err != nil {
442 return err
443 }
444 }
445 if repo == nil {
446 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
447 }
448
449 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
450 if err != nil || !ok {
451 return err
452 }
453
454 repoDid := repo.RepoDid
455 if repoDid == "" && record.RepoDid != nil {
456 repoDid = *record.RepoDid
457 }
458 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
459 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
460 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
461 }
462 }
463
464 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
465 if err != nil {
466 createdAt = time.Now()
467 }
468
469 artifact := models.Artifact{
470 Did: did,
471 Rkey: e.Commit.RKey,
472 RepoDid: syntax.DID(repo.RepoDid),
473 Tag: plumbing.Hash(record.Tag),
474 CreatedAt: createdAt,
475 BlobCid: cid.Cid(record.Artifact.Ref),
476 Name: record.Name,
477 Size: uint64(record.Artifact.Size),
478 MimeType: record.Artifact.MimeType,
479 }
480
481 err = db.AddArtifact(i.Db, artifact)
482 case jmodels.CommitOperationDelete:
483 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
484 }
485
486 if err != nil {
487 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
488 }
489
490 return nil
491}
492
493func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
494 did := e.Did
495 var err error
496
497 l := i.Logger.With("handler", "ingestProfile")
498 l = l.With("nsid", e.Commit.Collection)
499
500 if e.Commit.RKey != "self" {
501 return fmt.Errorf("ingestProfile only ingests `self` record")
502 }
503
504 switch e.Commit.Operation {
505 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
506 raw := json.RawMessage(e.Commit.Record)
507 record := tangled.ActorProfile{}
508 err = json.Unmarshal(raw, &record)
509 if err != nil {
510 l.Error("invalid record", "err", err)
511 return err
512 }
513
514 avatar := ""
515 if record.Avatar != nil {
516 avatar = record.Avatar.Ref.String()
517 }
518
519 description := ""
520 if record.Description != nil {
521 description = *record.Description
522 }
523
524 includeBluesky := record.Bluesky
525
526 pronouns := ""
527 if record.Pronouns != nil {
528 pronouns = *record.Pronouns
529 }
530
531 location := ""
532 if record.Location != nil {
533 location = *record.Location
534 }
535
536 var links [5]string
537 for i, l := range record.Links {
538 if i < 5 {
539 links[i] = l
540 }
541 }
542
543 var stats [2]models.VanityStat
544 for i, s := range record.Stats {
545 if i < 2 {
546 stats[i].Kind = models.ParseVanityStatKind(s)
547 }
548 }
549
550 var pinned [6]string
551 for i, r := range record.PinnedRepositories {
552 if i < 6 {
553 pinned[i] = r
554 }
555 }
556
557 var preferredHandle syntax.Handle
558 if record.PreferredHandle != nil {
559 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
560 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
561 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
562 preferredHandle = h
563 }
564 }
565 }
566
567 profile := models.Profile{
568 Did: did,
569 Avatar: avatar,
570 Description: description,
571 IncludeBluesky: includeBluesky,
572 Location: location,
573 Links: links,
574 Stats: stats,
575 PinnedRepos: pinned,
576 Pronouns: pronouns,
577 PreferredHandle: preferredHandle,
578 }
579
580 tx, err := i.Db.Begin()
581 if err != nil {
582 return fmt.Errorf("failed to start transaction")
583 }
584
585 err = db.ValidateProfile(tx, &profile)
586 if err != nil {
587 return fmt.Errorf("invalid profile record")
588 }
589
590 err = db.UpsertProfile(tx, &profile)
591 if err == nil && i.Cache != nil {
592 pipe := i.Cache.Pipeline()
593 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
594 if preferredHandle != "" {
595 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
596 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
597 } else {
598 pipe.Del(ctx, didKey)
599 }
600 if _, execErr := pipe.Exec(ctx); execErr != nil {
601 l.Warn("failed to update preferred handle cache", "err", execErr)
602 }
603 }
604 case jmodels.CommitOperationDelete:
605 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
606 }
607
608 if err != nil {
609 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
610 }
611
612 return nil
613}
614
615func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
616 did := e.Did
617 var err error
618
619 l := i.Logger.With("handler", "ingestSpindleMember")
620 l = l.With("nsid", e.Commit.Collection)
621
622 switch e.Commit.Operation {
623 case jmodels.CommitOperationCreate:
624 raw := json.RawMessage(e.Commit.Record)
625 record := tangled.SpindleMember{}
626 err = json.Unmarshal(raw, &record)
627 if err != nil {
628 l.Error("invalid record", "err", err)
629 return err
630 }
631
632 // only spindle owner can invite to spindles
633 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
634 if err != nil || !ok {
635 return fmt.Errorf("failed to enforce permissions: %w", err)
636 }
637
638 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
639 if err != nil {
640 return err
641 }
642
643 if memberId.Handle.IsInvalidHandle() {
644 return err
645 }
646
647 err = db.AddSpindleMember(i.Db, models.SpindleMember{
648 Did: syntax.DID(did),
649 Rkey: e.Commit.RKey,
650 Instance: record.Instance,
651 Subject: memberId.DID,
652 })
653 if !ok {
654 return fmt.Errorf("failed to add to db: %w", err)
655 }
656
657 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
658 if err != nil {
659 return fmt.Errorf("failed to update ACLs: %w", err)
660 }
661
662 l.Info("added spindle member")
663 case jmodels.CommitOperationDelete:
664 rkey := e.Commit.RKey
665
666 // get record from db first
667 members, err := db.GetSpindleMembers(
668 i.Db,
669 orm.FilterEq("did", did),
670 orm.FilterEq("rkey", rkey),
671 )
672 if err != nil || len(members) != 1 {
673 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
674 }
675 member := members[0]
676
677 tx, err := i.Db.Begin()
678 if err != nil {
679 return fmt.Errorf("failed to start txn: %w", err)
680 }
681
682 // remove record by rkey && update enforcer
683 if err = db.RemoveSpindleMember(
684 tx,
685 orm.FilterEq("did", did),
686 orm.FilterEq("rkey", rkey),
687 ); err != nil {
688 return fmt.Errorf("failed to remove from db: %w", err)
689 }
690
691 // update enforcer
692 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
693 if err != nil {
694 return fmt.Errorf("failed to update ACLs: %w", err)
695 }
696
697 if err = tx.Commit(); err != nil {
698 return fmt.Errorf("failed to commit txn: %w", err)
699 }
700
701 if err = i.Enforcer.E.SavePolicy(); err != nil {
702 return fmt.Errorf("failed to save ACLs: %w", err)
703 }
704
705 l.Info("removed spindle member")
706 }
707
708 return nil
709}
710
711func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
712 did := e.Did
713 var err error
714
715 l := i.Logger.With("handler", "ingestSpindle")
716 l = l.With("nsid", e.Commit.Collection)
717
718 switch e.Commit.Operation {
719 case jmodels.CommitOperationCreate:
720 raw := json.RawMessage(e.Commit.Record)
721 record := tangled.Spindle{}
722 err = json.Unmarshal(raw, &record)
723 if err != nil {
724 l.Error("invalid record", "err", err)
725 return err
726 }
727
728 instance := e.Commit.RKey
729
730 err := db.AddSpindle(i.Db, models.Spindle{
731 Owner: syntax.DID(did),
732 Instance: instance,
733 })
734 if err != nil {
735 l.Error("failed to add spindle to db", "err", err, "instance", instance)
736 return err
737 }
738
739 err = retry.Do(
740 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
741 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
742 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
743 )
744 if err != nil {
745 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
746 return err
747 }
748
749 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
750 if err != nil {
751 return fmt.Errorf("failed to mark verified: %w", err)
752 }
753
754 return nil
755
756 case jmodels.CommitOperationDelete:
757 instance := e.Commit.RKey
758
759 // get record from db first
760 spindles, err := db.GetSpindles(
761 ctx,
762 i.Db,
763 orm.FilterEq("owner", did),
764 orm.FilterEq("instance", instance),
765 )
766 if err != nil || len(spindles) != 1 {
767 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
768 }
769 spindle := spindles[0]
770
771 tx, err := i.Db.Begin()
772 if err != nil {
773 return err
774 }
775 defer func() {
776 tx.Rollback()
777 i.Enforcer.E.LoadPolicy()
778 }()
779
780 // remove spindle members first
781 err = db.RemoveSpindleMember(
782 tx,
783 orm.FilterEq("owner", did),
784 orm.FilterEq("instance", instance),
785 )
786 if err != nil {
787 return err
788 }
789
790 err = db.DeleteSpindle(
791 tx,
792 orm.FilterEq("owner", did),
793 orm.FilterEq("instance", instance),
794 )
795 if err != nil {
796 return err
797 }
798
799 if spindle.Verified != nil {
800 err = i.Enforcer.RemoveSpindle(instance)
801 if err != nil {
802 return err
803 }
804 }
805
806 err = tx.Commit()
807 if err != nil {
808 return err
809 }
810
811 err = i.Enforcer.E.SavePolicy()
812 if err != nil {
813 return err
814 }
815 }
816
817 return nil
818}
819
820func (i *Ingester) ingestString(e *jmodels.Event) error {
821 did := e.Did
822 rkey := e.Commit.RKey
823
824 var err error
825
826 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
827 l.Info("ingesting record")
828
829 switch e.Commit.Operation {
830 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
831 raw := json.RawMessage(e.Commit.Record)
832 record := tangled.String{}
833 err = json.Unmarshal(raw, &record)
834 if err != nil {
835 l.Error("invalid record", "err", err)
836 return err
837 }
838
839 string := models.StringFromRecord(did, rkey, record)
840
841 if err = i.Validator.ValidateString(&string); err != nil {
842 l.Error("invalid record", "err", err)
843 return err
844 }
845
846 if err = db.AddString(i.Db, string); err != nil {
847 l.Error("failed to add string", "err", err)
848 return err
849 }
850
851 return nil
852
853 case jmodels.CommitOperationDelete:
854 if err := db.DeleteString(
855 i.Db,
856 orm.FilterEq("did", did),
857 orm.FilterEq("rkey", rkey),
858 ); err != nil {
859 l.Error("failed to delete", "err", err)
860 return fmt.Errorf("failed to delete string record: %w", err)
861 }
862
863 return nil
864 }
865
866 return nil
867}
868
869func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
870 did := e.Did
871 var err error
872
873 l := i.Logger.With("handler", "ingestKnotMember")
874 l = l.With("nsid", e.Commit.Collection)
875
876 switch e.Commit.Operation {
877 case jmodels.CommitOperationCreate:
878 raw := json.RawMessage(e.Commit.Record)
879 record := tangled.KnotMember{}
880 err = json.Unmarshal(raw, &record)
881 if err != nil {
882 l.Error("invalid record", "err", err)
883 return err
884 }
885
886 // only knot owner can invite to knots
887 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
888 if err != nil || !ok {
889 return fmt.Errorf("failed to enforce permissions: %w", err)
890 }
891
892 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
893 if err != nil {
894 return err
895 }
896
897 if memberId.Handle.IsInvalidHandle() {
898 return err
899 }
900
901 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
902 if err != nil {
903 return fmt.Errorf("failed to update ACLs: %w", err)
904 }
905
906 l.Info("added knot member")
907 case jmodels.CommitOperationDelete:
908 // we don't store knot members in a table (like we do for spindle)
909 // and we can't remove this just yet. possibly fixed if we switch
910 // to either:
911 // 1. a knot_members table like with spindle and store the rkey
912 // 2. use the knot host as the rkey
913 //
914 // TODO: implement member deletion
915 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
916 }
917
918 return nil
919}
920
921func (i *Ingester) ingestKnot(e *jmodels.Event) error {
922 did := e.Did
923 var err error
924
925 l := i.Logger.With("handler", "ingestKnot")
926 l = l.With("nsid", e.Commit.Collection)
927
928 switch e.Commit.Operation {
929 case jmodels.CommitOperationCreate:
930 raw := json.RawMessage(e.Commit.Record)
931 record := tangled.Knot{}
932 err = json.Unmarshal(raw, &record)
933 if err != nil {
934 l.Error("invalid record", "err", err)
935 return err
936 }
937
938 domain := e.Commit.RKey
939
940 err := db.AddKnot(i.Db, domain, did)
941 if err != nil {
942 l.Error("failed to add knot to db", "err", err, "domain", domain)
943 return err
944 }
945
946 err = retry.Do(
947 func() error {
948 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
949 },
950 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
951 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
952 )
953 if err != nil {
954 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
955 return err
956 }
957
958 err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
959 if err != nil {
960 return fmt.Errorf("failed to mark verified: %w", err)
961 }
962
963 return nil
964
965 case jmodels.CommitOperationDelete:
966 domain := e.Commit.RKey
967
968 // get record from db first
969 registrations, err := db.GetRegistrations(
970 i.Db,
971 orm.FilterEq("domain", domain),
972 orm.FilterEq("did", did),
973 )
974 if err != nil {
975 return fmt.Errorf("failed to get registration: %w", err)
976 }
977 if len(registrations) != 1 {
978 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
979 }
980 registration := registrations[0]
981
982 tx, err := i.Db.Begin()
983 if err != nil {
984 return err
985 }
986 defer func() {
987 tx.Rollback()
988 i.Enforcer.E.LoadPolicy()
989 }()
990
991 err = db.DeleteKnot(
992 tx,
993 orm.FilterEq("did", did),
994 orm.FilterEq("domain", domain),
995 )
996 if err != nil {
997 return err
998 }
999
1000 if registration.Registered != nil {
1001 err = i.Enforcer.RemoveKnot(domain)
1002 if err != nil {
1003 return err
1004 }
1005 }
1006
1007 err = tx.Commit()
1008 if err != nil {
1009 return err
1010 }
1011
1012 err = i.Enforcer.E.SavePolicy()
1013 if err != nil {
1014 return err
1015 }
1016 }
1017
1018 return nil
1019}
1020func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1021 did := e.Did
1022 rkey := e.Commit.RKey
1023
1024 var err error
1025
1026 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1027 l.Info("ingesting record")
1028
1029 switch e.Commit.Operation {
1030 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1031 raw := json.RawMessage(e.Commit.Record)
1032 record := tangled.RepoIssue{}
1033 err = json.Unmarshal(raw, &record)
1034 if err != nil {
1035 l.Error("invalid record", "err", err)
1036 return err
1037 }
1038
1039 issue := models.IssueFromRecord(did, rkey, record)
1040
1041 if issue.RepoDid == "" {
1042 return fmt.Errorf("issue record has no repo field")
1043 }
1044 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1045 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1046 }
1047
1048 if err := i.Validator.ValidateIssue(&issue); err != nil {
1049 return fmt.Errorf("failed to validate issue: %w", err)
1050 }
1051
1052 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1053 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1054 if repoErr == nil && repo.RepoDid != "" {
1055 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1056 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1057 }
1058 }
1059 }
1060
1061 tx, err := i.Db.BeginTx(ctx, nil)
1062 if err != nil {
1063 l.Error("failed to begin transaction", "err", err)
1064 return err
1065 }
1066 defer tx.Rollback()
1067
1068 err = db.PutIssue(tx, &issue)
1069 if err != nil {
1070 l.Error("failed to create issue", "err", err)
1071 return err
1072 }
1073
1074 err = tx.Commit()
1075 if err != nil {
1076 l.Error("failed to commit txn", "err", err)
1077 return err
1078 }
1079
1080 return nil
1081
1082 case jmodels.CommitOperationDelete:
1083 tx, err := i.Db.BeginTx(ctx, nil)
1084 if err != nil {
1085 l.Error("failed to begin transaction", "err", err)
1086 return err
1087 }
1088 defer tx.Rollback()
1089
1090 if err := db.DeleteIssues(
1091 tx,
1092 did,
1093 rkey,
1094 ); err != nil {
1095 l.Error("failed to delete", "err", err)
1096 return fmt.Errorf("failed to delete issue record: %w", err)
1097 }
1098 if err := tx.Commit(); err != nil {
1099 l.Error("failed to commit txn", "err", err)
1100 return err
1101 }
1102
1103 return nil
1104 }
1105
1106 return nil
1107}
1108
1109func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1110 did := e.Did
1111 rkey := e.Commit.RKey
1112
1113 var err error
1114
1115 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1116 l.Info("ingesting record")
1117
1118 switch e.Commit.Operation {
1119 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1120 raw := json.RawMessage(e.Commit.Record)
1121 record := tangled.RepoPull{}
1122 err = json.Unmarshal(raw, &record)
1123 if err != nil {
1124 l.Error("invalid record", "err", err)
1125 return err
1126 }
1127
1128 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1129 if err != nil {
1130 l.Error("failed to resolve did")
1131 return err
1132 }
1133
1134 // go through and fetch all blobs in parallel
1135 readers := make([]*io.ReadCloser, len(record.Rounds))
1136 var mu sync.Mutex
1137
1138 g, gctx := errgroup.WithContext(ctx)
1139
1140 for idx, b := range record.Rounds {
1141 g.Go(func() error {
1142 // for some reason, a blob is empty
1143 if b.PatchBlob == nil {
1144 return fmt.Errorf("missing patchBlob in round %d", idx)
1145 }
1146
1147 ownerPds := ownerId.PDSEndpoint()
1148 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1149 q := url.Query()
1150 q.Set("cid", b.PatchBlob.Ref.String())
1151 q.Set("did", did)
1152 url.RawQuery = q.Encode()
1153
1154 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1155 if err != nil {
1156 l.Error("failed to create request")
1157 return err
1158 }
1159 req.Header.Set("Content-Type", "application/json")
1160
1161 resp, err := http.DefaultClient.Do(req)
1162 if err != nil {
1163 l.Error("failed to make request")
1164 return err
1165 }
1166
1167 mu.Lock()
1168 readers[idx] = &resp.Body
1169 mu.Unlock()
1170
1171 return nil
1172 })
1173 }
1174
1175 if err := g.Wait(); err != nil {
1176 for _, r := range readers {
1177 if r != nil && *r != nil {
1178 (*r).Close()
1179 }
1180 }
1181 return err
1182 }
1183
1184 defer func() {
1185 for _, r := range readers {
1186 if r != nil && *r != nil {
1187 (*r).Close()
1188 }
1189 }
1190 }()
1191
1192 pull, err := models.PullFromRecord(did, rkey, record, readers)
1193 if err != nil {
1194 return fmt.Errorf("failed to parse pull from record: %w", err)
1195 }
1196 if err := i.Validator.ValidatePull(pull); err != nil {
1197 return fmt.Errorf("failed to validate pull: %w", err)
1198 }
1199
1200 tx, err := i.Db.BeginTx(ctx, nil)
1201 if err != nil {
1202 l.Error("failed to begin transaction", "err", err)
1203 return err
1204 }
1205 defer tx.Rollback()
1206
1207 err = db.PutPull(tx, pull)
1208 if err != nil {
1209 l.Error("failed to create pull", "err", err)
1210 return err
1211 }
1212
1213 err = tx.Commit()
1214 if err != nil {
1215 l.Error("failed to commit txn", "err", err)
1216 return err
1217 }
1218
1219 return nil
1220
1221 case jmodels.CommitOperationDelete:
1222 tx, err := i.Db.BeginTx(ctx, nil)
1223 if err != nil {
1224 l.Error("failed to begin transaction", "err", err)
1225 return err
1226 }
1227 defer tx.Rollback()
1228
1229 if err := db.AbandonPulls(
1230 tx,
1231 orm.FilterEq("owner_did", did),
1232 orm.FilterEq("rkey", rkey),
1233 ); err != nil {
1234 l.Error("failed to abandon", "err", err)
1235 return fmt.Errorf("failed to abandon pull record: %w", err)
1236 }
1237 if err := tx.Commit(); err != nil {
1238 l.Error("failed to commit txn", "err", err)
1239 return err
1240 }
1241
1242 return nil
1243 }
1244
1245 return nil
1246}
1247
1248func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1249 did := e.Did
1250 rkey := e.Commit.RKey
1251
1252 var err error
1253
1254 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1255 l.Info("ingesting record")
1256
1257 switch e.Commit.Operation {
1258 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1259 raw := json.RawMessage(e.Commit.Record)
1260 record := tangled.RepoIssueComment{}
1261 err = json.Unmarshal(raw, &record)
1262 if err != nil {
1263 return fmt.Errorf("invalid record: %w", err)
1264 }
1265
1266 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1267 if err != nil {
1268 return fmt.Errorf("failed to parse comment from record: %w", err)
1269 }
1270
1271 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1272 return fmt.Errorf("failed to validate comment: %w", err)
1273 }
1274
1275 tx, err := i.Db.Begin()
1276 if err != nil {
1277 return fmt.Errorf("failed to start transaction: %w", err)
1278 }
1279 defer tx.Rollback()
1280
1281 _, err = db.AddIssueComment(tx, *comment)
1282 if err != nil {
1283 return fmt.Errorf("failed to create issue comment: %w", err)
1284 }
1285
1286 return tx.Commit()
1287
1288 case jmodels.CommitOperationDelete:
1289 if err := db.DeleteIssueComments(
1290 i.Db,
1291 orm.FilterEq("did", did),
1292 orm.FilterEq("rkey", rkey),
1293 ); err != nil {
1294 return fmt.Errorf("failed to delete issue comment record: %w", err)
1295 }
1296
1297 return nil
1298 }
1299
1300 return nil
1301}
1302
1303func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1304 did := e.Did
1305 rkey := e.Commit.RKey
1306
1307 var err error
1308
1309 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1310 l.Info("ingesting record")
1311
1312 switch e.Commit.Operation {
1313 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1314 raw := json.RawMessage(e.Commit.Record)
1315 record := tangled.LabelDefinition{}
1316 err = json.Unmarshal(raw, &record)
1317 if err != nil {
1318 return fmt.Errorf("invalid record: %w", err)
1319 }
1320
1321 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1322 if err != nil {
1323 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1324 }
1325
1326 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1327 return fmt.Errorf("failed to validate labeldef: %w", err)
1328 }
1329
1330 _, err = db.AddLabelDefinition(i.Db, def)
1331 if err != nil {
1332 return fmt.Errorf("failed to create labeldef: %w", err)
1333 }
1334
1335 return nil
1336
1337 case jmodels.CommitOperationDelete:
1338 if err := db.DeleteLabelDefinition(
1339 i.Db,
1340 orm.FilterEq("did", did),
1341 orm.FilterEq("rkey", rkey),
1342 ); err != nil {
1343 return fmt.Errorf("failed to delete labeldef record: %w", err)
1344 }
1345
1346 return nil
1347 }
1348
1349 return nil
1350}
1351
1352func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1353 did := e.Did
1354 rkey := e.Commit.RKey
1355
1356 var err error
1357
1358 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1359 l.Info("ingesting record")
1360
1361 switch e.Commit.Operation {
1362 case jmodels.CommitOperationCreate:
1363 raw := json.RawMessage(e.Commit.Record)
1364 record := tangled.LabelOp{}
1365 err = json.Unmarshal(raw, &record)
1366 if err != nil {
1367 return fmt.Errorf("invalid record: %w", err)
1368 }
1369
1370 subject := syntax.ATURI(record.Subject)
1371 collection := subject.Collection()
1372
1373 var repo *models.Repo
1374 switch collection {
1375 case tangled.RepoIssueNSID:
1376 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1377 if err != nil || len(i) != 1 {
1378 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1379 }
1380 repo = i[0].Repo
1381 default:
1382 return fmt.Errorf("unsupported label subject: %s", collection)
1383 }
1384
1385 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1386 if err != nil {
1387 return fmt.Errorf("failed to build label application ctx: %w", err)
1388 }
1389
1390 ops := models.LabelOpsFromRecord(did, rkey, record)
1391
1392 for _, o := range ops {
1393 def, ok := actx.Defs[o.OperandKey]
1394 if !ok {
1395 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1396 }
1397 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1398 return fmt.Errorf("failed to validate labelop: %w", err)
1399 }
1400 }
1401
1402 tx, err := i.Db.Begin()
1403 if err != nil {
1404 return err
1405 }
1406 defer tx.Rollback()
1407
1408 for _, o := range ops {
1409 _, err = db.AddLabelOp(tx, &o)
1410 if err != nil {
1411 return fmt.Errorf("failed to add labelop: %w", err)
1412 }
1413 }
1414
1415 if err = tx.Commit(); err != nil {
1416 return err
1417 }
1418 }
1419
1420 return nil
1421}