Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/models"
31 "tangled.org/core/appview/notify"
32 "tangled.org/core/appview/repoverify"
33 "tangled.org/core/appview/serververify"
34 "tangled.org/core/appview/validator"
35 "tangled.org/core/idresolver"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38)
39
40type Ingester struct {
41 Db *db.DB
42 Enforcer *rbac.Enforcer
43 IdResolver *idresolver.Resolver
44 Cache *cache.Cache
45 Config *config.Config
46 Logger *slog.Logger
47 Validator *validator.Validator
48 Notifier notify.Notifier
49 Verifier repoverify.Verifier
50}
51
52type processFunc func(ctx context.Context, e *jmodels.Event) error
53
54func (i *Ingester) Ingest() processFunc {
55 return func(ctx context.Context, e *jmodels.Event) error {
56 var err error
57
58 l := i.Logger.With("kind", e.Kind)
59 switch e.Kind {
60 case jmodels.EventKindAccount:
61 // TODO: sync account state to db
62 if e.Account.Active {
63 break
64 }
65 // TODO: revoke sessions by DID
66 if *e.Account.Status == "deactivated" {
67 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
68 }
69 case jmodels.EventKindIdentity:
70 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
71 case jmodels.EventKindCommit:
72 switch e.Commit.Collection {
73 case tangled.GraphFollowNSID:
74 err = i.ingestFollow(e)
75 case tangled.GraphVouchNSID:
76 err = i.ingestVouch(ctx, e)
77 case tangled.FeedStarNSID:
78 err = i.ingestStar(ctx, e)
79 case tangled.PublicKeyNSID:
80 err = i.ingestPublicKey(e)
81 case tangled.RepoArtifactNSID:
82 err = i.ingestArtifact(ctx, e)
83 case tangled.ActorProfileNSID:
84 err = i.ingestProfile(ctx, e)
85 case tangled.SpindleMemberNSID:
86 err = i.ingestSpindleMember(ctx, e)
87 case tangled.SpindleNSID:
88 err = i.ingestSpindle(ctx, e)
89 case tangled.KnotMemberNSID:
90 err = i.ingestKnotMember(e)
91 case tangled.KnotNSID:
92 err = i.ingestKnot(e)
93 case tangled.StringNSID:
94 err = i.ingestString(e)
95 case tangled.RepoIssueNSID:
96 err = i.ingestIssue(ctx, e)
97 case tangled.RepoPullNSID:
98 err = i.ingestPull(ctx, e)
99 case tangled.RepoIssueCommentNSID:
100 err = i.ingestIssueComment(e)
101 case tangled.LabelDefinitionNSID:
102 err = i.ingestLabelDefinition(e)
103 case tangled.LabelOpNSID:
104 err = i.ingestLabelOp(e)
105 case tangled.RepoNSID:
106 err = i.ingestRepo(ctx, e)
107 }
108 l = i.Logger.With("nsid", e.Commit.Collection)
109 }
110
111 if err != nil {
112 l.Warn("failed to ingest record, skipping", "err", err)
113 }
114
115 lastTimeUs := e.TimeUS + 1
116 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
117 l.Error("failed to save cursor", "err", saveErr)
118 }
119
120 return nil
121 }
122}
123
124func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
125 if strings.HasPrefix(ref, "did:") {
126 return db.GetRepoByDid(i.Db, ref)
127 }
128 return db.GetRepoByAtUri(i.Db, ref)
129}
130
131func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
132 var legacy struct {
133 Subject *string `json:"subject"`
134 SubjectDid *string `json:"subjectDid"`
135 }
136 if err := json.Unmarshal(raw, &legacy); err != nil {
137 return false, err
138 }
139
140 switch {
141 case legacy.SubjectDid != nil:
142 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
143 if err != nil {
144 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
145 return false, nil
146 }
147 star.SubjectType = models.StarSubjectRepo
148 star.Subject = repo.RepoDid
149 return true, nil
150
151 case legacy.Subject != nil:
152 uri, err := syntax.ParseATURI(*legacy.Subject)
153 if err != nil {
154 return false, fmt.Errorf("invalid old-format star subject: %w", err)
155 }
156 switch uri.Collection().String() {
157 case tangled.RepoNSID:
158 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
159 if err != nil {
160 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
161 return false, nil
162 }
163 star.SubjectType = models.StarSubjectRepo
164 star.Subject = repo.RepoDid
165 return true, nil
166 default:
167 star.SubjectType = models.StarSubjectString
168 star.Subject = *legacy.Subject
169 return true, nil
170 }
171
172 default:
173 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
174 }
175}
176
177func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
178 var err error
179 did := e.Did
180
181 l := i.Logger.With("handler", "ingestStar")
182 l = l.With("nsid", e.Commit.Collection)
183
184 switch e.Commit.Operation {
185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
186 raw := json.RawMessage(e.Commit.Record)
187 record := tangled.FeedStar{}
188 unmarshalErr := json.Unmarshal(raw, &record)
189
190 star := &models.Star{
191 Did: did,
192 Rkey: e.Commit.RKey,
193 }
194
195 switch {
196 case unmarshalErr != nil:
197 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
198 if resolveErr != nil {
199 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
200 return unmarshalErr
201 }
202 if !resolved {
203 return nil
204 }
205
206 case record.Subject == nil:
207 return fmt.Errorf("star record has nil subject")
208
209 case record.Subject.FeedStar_Repo != nil:
210 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
211 if repoErr != nil {
212 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
213 return nil
214 }
215 star.SubjectType = models.StarSubjectRepo
216 star.Subject = repo.RepoDid
217
218 case record.Subject.FeedStar_String != nil:
219 star.SubjectType = models.StarSubjectString
220 star.Subject = record.Subject.FeedStar_String.Uri
221
222 default:
223 return fmt.Errorf("star record has empty subject union")
224 }
225
226 err = db.AddStar(i.Db, star)
227 case jmodels.CommitOperationDelete:
228 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
229 }
230
231 if err != nil {
232 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
233 }
234
235 return nil
236}
237
238func (i *Ingester) ingestFollow(e *jmodels.Event) error {
239 var err error
240 did := e.Did
241
242 l := i.Logger.With("handler", "ingestFollow")
243 l = l.With("nsid", e.Commit.Collection)
244
245 switch e.Commit.Operation {
246 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
247 raw := json.RawMessage(e.Commit.Record)
248 record := tangled.GraphFollow{}
249 err = json.Unmarshal(raw, &record)
250 if err != nil {
251 l.Error("invalid record", "err", err)
252 return err
253 }
254
255 err = db.AddFollow(i.Db, &models.Follow{
256 UserDid: did,
257 SubjectDid: record.Subject,
258 Rkey: e.Commit.RKey,
259 })
260 case jmodels.CommitOperationDelete:
261 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
262 }
263
264 if err != nil {
265 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
266 }
267
268 return nil
269}
270
271func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
272 var err error
273 did := e.Did
274
275 l := i.Logger.With("handler", "ingestVouch")
276 l = l.With("nsid", e.Commit.Collection)
277 l.Info("ingesting vouch")
278
279 switch e.Commit.Operation {
280 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
281 raw := json.RawMessage(e.Commit.Record)
282 record := tangled.GraphVouch{}
283 err = json.Unmarshal(raw, &record)
284 if err != nil {
285 l.Error("invalid record", "err", err)
286 return err
287 }
288
289 // rkey is the subject_did being vouched for/denounced
290 subjectDID := e.Commit.RKey
291
292 _, err = syntax.ParseDID(subjectDID)
293 if err != nil {
294 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
295 return fmt.Errorf("invalid subject_did: %w", err)
296 }
297
298 if did == subjectDID {
299 l.Warn("attempted self-vouch", "did", did)
300 return fmt.Errorf("cannot vouch for self")
301 }
302
303 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
304 if err != nil {
305 return err
306 }
307
308 if subjectId.Handle.IsInvalidHandle() {
309 return err
310 }
311
312 kind, err := models.ParseVouchKind(record.Kind)
313 if err != nil {
314 l.Error("invalid kind", "kind", kind)
315 return fmt.Errorf("invalid kind: %s", kind)
316 }
317
318 recordCid, err := cid.Parse(e.Commit.CID)
319 if err != nil {
320 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
321 return fmt.Errorf("invalid cid: %w", err)
322 }
323
324 var evidences []syntax.ATURI
325 for _, raw := range record.Evidences {
326 uri, parseErr := syntax.ParseATURI(raw)
327 if parseErr != nil {
328 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
329 continue
330 }
331 evidences = append(evidences, uri)
332 }
333
334 tx, txErr := i.Db.Begin()
335 if txErr != nil {
336 return fmt.Errorf("failed to start transaction: %w", txErr)
337 }
338
339 addErr := db.AddVouch(tx, &models.Vouch{
340 Did: syntax.DID(did),
341 SubjectDid: subjectId.DID,
342 Cid: recordCid,
343 Kind: kind,
344 Reason: record.Reason,
345 Evidences: evidences,
346 })
347 if addErr != nil {
348 tx.Rollback()
349 err = addErr
350 } else {
351 err = tx.Commit()
352 }
353
354 case jmodels.CommitOperationDelete:
355 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
356 }
357
358 if err != nil {
359 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
360 }
361
362 return nil
363}
364
365func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
366 did := e.Did
367 var err error
368
369 l := i.Logger.With("handler", "ingestPublicKey")
370 l = l.With("nsid", e.Commit.Collection)
371
372 switch e.Commit.Operation {
373 case jmodels.CommitOperationCreate:
374 l.Debug("processing add of pubkey")
375 raw := json.RawMessage(e.Commit.Record)
376 record := tangled.PublicKey{}
377 err = json.Unmarshal(raw, &record)
378 if err != nil {
379 l.Error("invalid record", "err", err)
380 return err
381 }
382
383 name := record.Name
384 key := record.Key
385 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
386 case jmodels.CommitOperationUpdate:
387 l.Debug("processing update of pubkey")
388 raw := json.RawMessage(e.Commit.Record)
389 record := tangled.PublicKey{}
390 err = json.Unmarshal(raw, &record)
391 if err != nil {
392 l.Error("invalid record", "err", err)
393 return err
394 }
395
396 name := record.Name
397 key := record.Key
398 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
399 case jmodels.CommitOperationDelete:
400 l.Debug("processing delete of pubkey")
401 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
402 }
403
404 if err != nil {
405 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
406 }
407
408 return nil
409}
410
411func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
412 did := e.Did
413 var err error
414
415 l := i.Logger.With("handler", "ingestArtifact")
416 l = l.With("nsid", e.Commit.Collection)
417
418 switch e.Commit.Operation {
419 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
420 raw := json.RawMessage(e.Commit.Record)
421 record := tangled.RepoArtifact{}
422 err = json.Unmarshal(raw, &record)
423 if err != nil {
424 l.Error("invalid record", "err", err)
425 return err
426 }
427
428 var repo *models.Repo
429 if record.RepoDid != nil && *record.RepoDid != "" {
430 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
431 if err != nil && !errors.Is(err, sql.ErrNoRows) {
432 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
433 }
434 }
435 if repo == nil && record.Repo != nil {
436 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
437 if parseErr != nil {
438 return parseErr
439 }
440 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
441 if err != nil {
442 return err
443 }
444 }
445 if repo == nil {
446 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
447 }
448
449 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
450 if err != nil || !ok {
451 return err
452 }
453
454 repoDid := repo.RepoDid
455 if repoDid == "" && record.RepoDid != nil {
456 repoDid = *record.RepoDid
457 }
458 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
459 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
460 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
461 }
462 }
463
464 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
465 if err != nil {
466 createdAt = time.Now()
467 }
468
469 artifact := models.Artifact{
470 Did: did,
471 Rkey: e.Commit.RKey,
472 RepoDid: syntax.DID(repo.RepoDid),
473 Tag: plumbing.Hash(record.Tag),
474 CreatedAt: createdAt,
475 BlobCid: cid.Cid(record.Artifact.Ref),
476 Name: record.Name,
477 Size: uint64(record.Artifact.Size),
478 MimeType: record.Artifact.MimeType,
479 }
480
481 err = db.AddArtifact(i.Db, artifact)
482 case jmodels.CommitOperationDelete:
483 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
484 }
485
486 if err != nil {
487 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
488 }
489
490 return nil
491}
492
493func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
494 did := e.Did
495 var err error
496
497 l := i.Logger.With("handler", "ingestProfile")
498 l = l.With("nsid", e.Commit.Collection)
499
500 if e.Commit.RKey != "self" {
501 return fmt.Errorf("ingestProfile only ingests `self` record")
502 }
503
504 switch e.Commit.Operation {
505 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
506 raw := json.RawMessage(e.Commit.Record)
507 record := tangled.ActorProfile{}
508 err = json.Unmarshal(raw, &record)
509 if err != nil {
510 l.Error("invalid record", "err", err)
511 return err
512 }
513
514 avatar := ""
515 if record.Avatar != nil {
516 avatar = record.Avatar.Ref.String()
517 }
518
519 description := ""
520 if record.Description != nil {
521 description = *record.Description
522 }
523
524 includeBluesky := record.Bluesky
525
526 pronouns := ""
527 if record.Pronouns != nil {
528 pronouns = *record.Pronouns
529 }
530
531 location := ""
532 if record.Location != nil {
533 location = *record.Location
534 }
535
536 var links [5]string
537 for i, l := range record.Links {
538 if i < 5 {
539 links[i] = l
540 }
541 }
542
543 var stats [2]models.VanityStat
544 for i, s := range record.Stats {
545 if i < 2 {
546 stats[i].Kind = models.ParseVanityStatKind(s)
547 }
548 }
549
550 var pinned [6]string
551 for i, r := range record.PinnedRepositories {
552 if i < 6 {
553 pinned[i] = r
554 }
555 }
556
557 var preferredHandle syntax.Handle
558 if record.PreferredHandle != nil {
559 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
560 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
561 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
562 preferredHandle = h
563 }
564 }
565 }
566
567 profile := models.Profile{
568 Did: did,
569 Avatar: avatar,
570 Description: description,
571 IncludeBluesky: includeBluesky,
572 Location: location,
573 Links: links,
574 Stats: stats,
575 PinnedRepos: pinned,
576 Pronouns: pronouns,
577 PreferredHandle: preferredHandle,
578 }
579
580 tx, err := i.Db.Begin()
581 if err != nil {
582 return fmt.Errorf("failed to start transaction: %w", err)
583 }
584
585 err = db.ValidateProfile(tx, &profile)
586 if err != nil {
587 return fmt.Errorf("invalid profile record")
588 }
589
590 err = db.UpsertProfile(tx, &profile)
591 if err == nil && i.Cache != nil {
592 pipe := i.Cache.Pipeline()
593 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
594 if preferredHandle != "" {
595 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
596 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
597 } else {
598 pipe.Del(ctx, didKey)
599 }
600 if _, execErr := pipe.Exec(ctx); execErr != nil {
601 l.Warn("failed to update preferred handle cache", "err", execErr)
602 }
603 }
604 case jmodels.CommitOperationDelete:
605 tx, beginErr := i.Db.Begin()
606 if beginErr != nil {
607 return fmt.Errorf("failed to start transaction: %w", beginErr)
608 }
609
610 priorHandle, phErr := db.GetPreferredHandle(tx, did)
611 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
612 l.Warn("failed to read prior preferred handle", "err", phErr)
613 }
614
615 err = db.DeleteProfile(tx, did)
616 if err == nil && i.Cache != nil {
617 pipe := i.Cache.Pipeline()
618 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
619 if priorHandle != "" {
620 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
621 }
622 if _, execErr := pipe.Exec(ctx); execErr != nil {
623 l.Warn("failed to evict preferred handle cache", "err", execErr)
624 }
625 }
626 }
627
628 if err != nil {
629 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
630 }
631
632 return nil
633}
634
635func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
636 did := e.Did
637 var err error
638
639 l := i.Logger.With("handler", "ingestSpindleMember")
640 l = l.With("nsid", e.Commit.Collection)
641
642 switch e.Commit.Operation {
643 case jmodels.CommitOperationCreate:
644 raw := json.RawMessage(e.Commit.Record)
645 record := tangled.SpindleMember{}
646 err = json.Unmarshal(raw, &record)
647 if err != nil {
648 l.Error("invalid record", "err", err)
649 return err
650 }
651
652 // only spindle owner can invite to spindles
653 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
654 if err != nil || !ok {
655 return fmt.Errorf("failed to enforce permissions: %w", err)
656 }
657
658 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
659 if err != nil {
660 return err
661 }
662
663 if memberId.Handle.IsInvalidHandle() {
664 return err
665 }
666
667 err = db.AddSpindleMember(i.Db, models.SpindleMember{
668 Did: syntax.DID(did),
669 Rkey: e.Commit.RKey,
670 Instance: record.Instance,
671 Subject: memberId.DID,
672 })
673 if !ok {
674 return fmt.Errorf("failed to add to db: %w", err)
675 }
676
677 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
678 if err != nil {
679 return fmt.Errorf("failed to update ACLs: %w", err)
680 }
681
682 l.Info("added spindle member")
683 case jmodels.CommitOperationDelete:
684 rkey := e.Commit.RKey
685
686 // get record from db first
687 members, err := db.GetSpindleMembers(
688 i.Db,
689 orm.FilterEq("did", did),
690 orm.FilterEq("rkey", rkey),
691 )
692 if err != nil || len(members) != 1 {
693 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
694 }
695 member := members[0]
696
697 tx, err := i.Db.Begin()
698 if err != nil {
699 return fmt.Errorf("failed to start txn: %w", err)
700 }
701
702 // remove record by rkey && update enforcer
703 if err = db.RemoveSpindleMember(
704 tx,
705 orm.FilterEq("did", did),
706 orm.FilterEq("rkey", rkey),
707 ); err != nil {
708 return fmt.Errorf("failed to remove from db: %w", err)
709 }
710
711 // update enforcer
712 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
713 if err != nil {
714 return fmt.Errorf("failed to update ACLs: %w", err)
715 }
716
717 if err = tx.Commit(); err != nil {
718 return fmt.Errorf("failed to commit txn: %w", err)
719 }
720
721 if err = i.Enforcer.E.SavePolicy(); err != nil {
722 return fmt.Errorf("failed to save ACLs: %w", err)
723 }
724
725 l.Info("removed spindle member")
726 }
727
728 return nil
729}
730
731func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
732 did := e.Did
733 var err error
734
735 l := i.Logger.With("handler", "ingestSpindle")
736 l = l.With("nsid", e.Commit.Collection)
737
738 switch e.Commit.Operation {
739 case jmodels.CommitOperationCreate:
740 raw := json.RawMessage(e.Commit.Record)
741 record := tangled.Spindle{}
742 err = json.Unmarshal(raw, &record)
743 if err != nil {
744 l.Error("invalid record", "err", err)
745 return err
746 }
747
748 instance := e.Commit.RKey
749
750 err := db.AddSpindle(i.Db, models.Spindle{
751 Owner: syntax.DID(did),
752 Instance: instance,
753 })
754 if err != nil {
755 l.Error("failed to add spindle to db", "err", err, "instance", instance)
756 return err
757 }
758
759 err = retry.Do(
760 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
761 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
762 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
763 )
764 if err != nil {
765 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
766 return err
767 }
768
769 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
770 if err != nil {
771 return fmt.Errorf("failed to mark verified: %w", err)
772 }
773
774 return nil
775
776 case jmodels.CommitOperationDelete:
777 instance := e.Commit.RKey
778
779 // get record from db first
780 spindles, err := db.GetSpindles(
781 ctx,
782 i.Db,
783 orm.FilterEq("owner", did),
784 orm.FilterEq("instance", instance),
785 )
786 if err != nil || len(spindles) != 1 {
787 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
788 }
789 spindle := spindles[0]
790
791 tx, err := i.Db.Begin()
792 if err != nil {
793 return err
794 }
795 defer func() {
796 tx.Rollback()
797 i.Enforcer.E.LoadPolicy()
798 }()
799
800 // remove spindle members first
801 err = db.RemoveSpindleMember(
802 tx,
803 orm.FilterEq("owner", did),
804 orm.FilterEq("instance", instance),
805 )
806 if err != nil {
807 return err
808 }
809
810 err = db.DeleteSpindle(
811 tx,
812 orm.FilterEq("owner", did),
813 orm.FilterEq("instance", instance),
814 )
815 if err != nil {
816 return err
817 }
818
819 if spindle.Verified != nil {
820 err = i.Enforcer.RemoveSpindle(instance)
821 if err != nil {
822 return err
823 }
824 }
825
826 err = tx.Commit()
827 if err != nil {
828 return err
829 }
830
831 err = i.Enforcer.E.SavePolicy()
832 if err != nil {
833 return err
834 }
835 }
836
837 return nil
838}
839
840func (i *Ingester) ingestString(e *jmodels.Event) error {
841 did := e.Did
842 rkey := e.Commit.RKey
843
844 var err error
845
846 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
847 l.Info("ingesting record")
848
849 switch e.Commit.Operation {
850 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
851 raw := json.RawMessage(e.Commit.Record)
852 record := tangled.String{}
853 err = json.Unmarshal(raw, &record)
854 if err != nil {
855 l.Error("invalid record", "err", err)
856 return err
857 }
858
859 string := models.StringFromRecord(did, rkey, record)
860
861 if err = i.Validator.ValidateString(&string); err != nil {
862 l.Error("invalid record", "err", err)
863 return err
864 }
865
866 if err = db.AddString(i.Db, string); err != nil {
867 l.Error("failed to add string", "err", err)
868 return err
869 }
870
871 return nil
872
873 case jmodels.CommitOperationDelete:
874 if err := db.DeleteString(
875 i.Db,
876 orm.FilterEq("did", did),
877 orm.FilterEq("rkey", rkey),
878 ); err != nil {
879 l.Error("failed to delete", "err", err)
880 return fmt.Errorf("failed to delete string record: %w", err)
881 }
882
883 return nil
884 }
885
886 return nil
887}
888
889func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
890 did := e.Did
891 var err error
892
893 l := i.Logger.With("handler", "ingestKnotMember")
894 l = l.With("nsid", e.Commit.Collection)
895
896 switch e.Commit.Operation {
897 case jmodels.CommitOperationCreate:
898 raw := json.RawMessage(e.Commit.Record)
899 record := tangled.KnotMember{}
900 err = json.Unmarshal(raw, &record)
901 if err != nil {
902 l.Error("invalid record", "err", err)
903 return err
904 }
905
906 // only knot owner can invite to knots
907 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
908 if err != nil || !ok {
909 return fmt.Errorf("failed to enforce permissions: %w", err)
910 }
911
912 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
913 if err != nil {
914 return err
915 }
916
917 if memberId.Handle.IsInvalidHandle() {
918 return err
919 }
920
921 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
922 if err != nil {
923 return fmt.Errorf("failed to update ACLs: %w", err)
924 }
925
926 l.Info("added knot member")
927 case jmodels.CommitOperationDelete:
928 // we don't store knot members in a table (like we do for spindle)
929 // and we can't remove this just yet. possibly fixed if we switch
930 // to either:
931 // 1. a knot_members table like with spindle and store the rkey
932 // 2. use the knot host as the rkey
933 //
934 // TODO: implement member deletion
935 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
936 }
937
938 return nil
939}
940
941func (i *Ingester) ingestKnot(e *jmodels.Event) error {
942 did := e.Did
943 var err error
944
945 l := i.Logger.With("handler", "ingestKnot")
946 l = l.With("nsid", e.Commit.Collection)
947
948 switch e.Commit.Operation {
949 case jmodels.CommitOperationCreate:
950 raw := json.RawMessage(e.Commit.Record)
951 record := tangled.Knot{}
952 err = json.Unmarshal(raw, &record)
953 if err != nil {
954 l.Error("invalid record", "err", err)
955 return err
956 }
957
958 domain := e.Commit.RKey
959
960 err := db.AddKnot(i.Db, domain, did)
961 if err != nil {
962 l.Error("failed to add knot to db", "err", err, "domain", domain)
963 return err
964 }
965
966 err = retry.Do(
967 func() error {
968 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
969 },
970 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
971 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
972 )
973 if err != nil {
974 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
975 return err
976 }
977
978 err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
979 if err != nil {
980 return fmt.Errorf("failed to mark verified: %w", err)
981 }
982
983 return nil
984
985 case jmodels.CommitOperationDelete:
986 domain := e.Commit.RKey
987
988 // get record from db first
989 registrations, err := db.GetRegistrations(
990 i.Db,
991 orm.FilterEq("domain", domain),
992 orm.FilterEq("did", did),
993 )
994 if err != nil {
995 return fmt.Errorf("failed to get registration: %w", err)
996 }
997 if len(registrations) != 1 {
998 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
999 }
1000 registration := registrations[0]
1001
1002 tx, err := i.Db.Begin()
1003 if err != nil {
1004 return err
1005 }
1006 defer func() {
1007 tx.Rollback()
1008 i.Enforcer.E.LoadPolicy()
1009 }()
1010
1011 err = db.DeleteKnot(
1012 tx,
1013 orm.FilterEq("did", did),
1014 orm.FilterEq("domain", domain),
1015 )
1016 if err != nil {
1017 return err
1018 }
1019
1020 if registration.Registered != nil {
1021 err = i.Enforcer.RemoveKnot(domain)
1022 if err != nil {
1023 return err
1024 }
1025 }
1026
1027 err = tx.Commit()
1028 if err != nil {
1029 return err
1030 }
1031
1032 err = i.Enforcer.E.SavePolicy()
1033 if err != nil {
1034 return err
1035 }
1036 }
1037
1038 return nil
1039}
1040func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1041 did := e.Did
1042 rkey := e.Commit.RKey
1043
1044 var err error
1045
1046 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1047 l.Info("ingesting record")
1048
1049 switch e.Commit.Operation {
1050 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1051 raw := json.RawMessage(e.Commit.Record)
1052 record := tangled.RepoIssue{}
1053 err = json.Unmarshal(raw, &record)
1054 if err != nil {
1055 l.Error("invalid record", "err", err)
1056 return err
1057 }
1058
1059 issue := models.IssueFromRecord(did, rkey, record)
1060
1061 if issue.RepoDid == "" {
1062 return fmt.Errorf("issue record has no repo field")
1063 }
1064 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1065 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1066 }
1067
1068 if err := i.Validator.ValidateIssue(&issue); err != nil {
1069 return fmt.Errorf("failed to validate issue: %w", err)
1070 }
1071
1072 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1073 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1074 if repoErr == nil && repo.RepoDid != "" {
1075 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1076 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1077 }
1078 }
1079 }
1080
1081 tx, err := i.Db.BeginTx(ctx, nil)
1082 if err != nil {
1083 l.Error("failed to begin transaction", "err", err)
1084 return err
1085 }
1086 defer tx.Rollback()
1087
1088 err = db.PutIssue(tx, &issue)
1089 if err != nil {
1090 l.Error("failed to create issue", "err", err)
1091 return err
1092 }
1093
1094 err = tx.Commit()
1095 if err != nil {
1096 l.Error("failed to commit txn", "err", err)
1097 return err
1098 }
1099
1100 return nil
1101
1102 case jmodels.CommitOperationDelete:
1103 tx, err := i.Db.BeginTx(ctx, nil)
1104 if err != nil {
1105 l.Error("failed to begin transaction", "err", err)
1106 return err
1107 }
1108 defer tx.Rollback()
1109
1110 if err := db.DeleteIssues(
1111 tx,
1112 did,
1113 rkey,
1114 ); err != nil {
1115 l.Error("failed to delete", "err", err)
1116 return fmt.Errorf("failed to delete issue record: %w", err)
1117 }
1118 if err := tx.Commit(); err != nil {
1119 l.Error("failed to commit txn", "err", err)
1120 return err
1121 }
1122
1123 return nil
1124 }
1125
1126 return nil
1127}
1128
1129func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1130 did := e.Did
1131 rkey := e.Commit.RKey
1132
1133 var err error
1134
1135 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1136 l.Info("ingesting record")
1137
1138 switch e.Commit.Operation {
1139 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1140 raw := json.RawMessage(e.Commit.Record)
1141 record := tangled.RepoPull{}
1142 err = json.Unmarshal(raw, &record)
1143 if err != nil {
1144 l.Error("invalid record", "err", err)
1145 return err
1146 }
1147
1148 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1149 if err != nil {
1150 l.Error("failed to resolve did")
1151 return err
1152 }
1153
1154 // go through and fetch all blobs in parallel
1155 readers := make([]*io.ReadCloser, len(record.Rounds))
1156 var mu sync.Mutex
1157
1158 g, gctx := errgroup.WithContext(ctx)
1159
1160 for idx, b := range record.Rounds {
1161 g.Go(func() error {
1162 // for some reason, a blob is empty
1163 if b.PatchBlob == nil {
1164 return fmt.Errorf("missing patchBlob in round %d", idx)
1165 }
1166
1167 ownerPds := ownerId.PDSEndpoint()
1168 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1169 q := url.Query()
1170 q.Set("cid", b.PatchBlob.Ref.String())
1171 q.Set("did", did)
1172 url.RawQuery = q.Encode()
1173
1174 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1175 if err != nil {
1176 l.Error("failed to create request")
1177 return err
1178 }
1179 req.Header.Set("Content-Type", "application/json")
1180
1181 resp, err := http.DefaultClient.Do(req)
1182 if err != nil {
1183 l.Error("failed to make request")
1184 return err
1185 }
1186
1187 mu.Lock()
1188 readers[idx] = &resp.Body
1189 mu.Unlock()
1190
1191 return nil
1192 })
1193 }
1194
1195 if err := g.Wait(); err != nil {
1196 for _, r := range readers {
1197 if r != nil && *r != nil {
1198 (*r).Close()
1199 }
1200 }
1201 return err
1202 }
1203
1204 defer func() {
1205 for _, r := range readers {
1206 if r != nil && *r != nil {
1207 (*r).Close()
1208 }
1209 }
1210 }()
1211
1212 pull, err := models.PullFromRecord(did, rkey, record, readers)
1213 if err != nil {
1214 return fmt.Errorf("failed to parse pull from record: %w", err)
1215 }
1216 if err := i.Validator.ValidatePull(pull); err != nil {
1217 return fmt.Errorf("failed to validate pull: %w", err)
1218 }
1219
1220 tx, err := i.Db.BeginTx(ctx, nil)
1221 if err != nil {
1222 l.Error("failed to begin transaction", "err", err)
1223 return err
1224 }
1225 defer tx.Rollback()
1226
1227 err = db.PutPull(tx, pull)
1228 if err != nil {
1229 l.Error("failed to create pull", "err", err)
1230 return err
1231 }
1232
1233 err = tx.Commit()
1234 if err != nil {
1235 l.Error("failed to commit txn", "err", err)
1236 return err
1237 }
1238
1239 return nil
1240
1241 case jmodels.CommitOperationDelete:
1242 tx, err := i.Db.BeginTx(ctx, nil)
1243 if err != nil {
1244 l.Error("failed to begin transaction", "err", err)
1245 return err
1246 }
1247 defer tx.Rollback()
1248
1249 if err := db.AbandonPulls(
1250 tx,
1251 orm.FilterEq("owner_did", did),
1252 orm.FilterEq("rkey", rkey),
1253 ); err != nil {
1254 l.Error("failed to abandon", "err", err)
1255 return fmt.Errorf("failed to abandon pull record: %w", err)
1256 }
1257 if err := tx.Commit(); err != nil {
1258 l.Error("failed to commit txn", "err", err)
1259 return err
1260 }
1261
1262 return nil
1263 }
1264
1265 return nil
1266}
1267
1268func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1269 did := e.Did
1270 rkey := e.Commit.RKey
1271
1272 var err error
1273
1274 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1275 l.Info("ingesting record")
1276
1277 switch e.Commit.Operation {
1278 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1279 raw := json.RawMessage(e.Commit.Record)
1280 record := tangled.RepoIssueComment{}
1281 err = json.Unmarshal(raw, &record)
1282 if err != nil {
1283 return fmt.Errorf("invalid record: %w", err)
1284 }
1285
1286 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1287 if err != nil {
1288 return fmt.Errorf("failed to parse comment from record: %w", err)
1289 }
1290
1291 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1292 return fmt.Errorf("failed to validate comment: %w", err)
1293 }
1294
1295 tx, err := i.Db.Begin()
1296 if err != nil {
1297 return fmt.Errorf("failed to start transaction: %w", err)
1298 }
1299 defer tx.Rollback()
1300
1301 _, err = db.AddIssueComment(tx, *comment)
1302 if err != nil {
1303 return fmt.Errorf("failed to create issue comment: %w", err)
1304 }
1305
1306 return tx.Commit()
1307
1308 case jmodels.CommitOperationDelete:
1309 if err := db.DeleteIssueComments(
1310 i.Db,
1311 orm.FilterEq("did", did),
1312 orm.FilterEq("rkey", rkey),
1313 ); err != nil {
1314 return fmt.Errorf("failed to delete issue comment record: %w", err)
1315 }
1316
1317 return nil
1318 }
1319
1320 return nil
1321}
1322
1323func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1324 did := e.Did
1325 rkey := e.Commit.RKey
1326
1327 var err error
1328
1329 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1330 l.Info("ingesting record")
1331
1332 switch e.Commit.Operation {
1333 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1334 raw := json.RawMessage(e.Commit.Record)
1335 record := tangled.LabelDefinition{}
1336 err = json.Unmarshal(raw, &record)
1337 if err != nil {
1338 return fmt.Errorf("invalid record: %w", err)
1339 }
1340
1341 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1342 if err != nil {
1343 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1344 }
1345
1346 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1347 return fmt.Errorf("failed to validate labeldef: %w", err)
1348 }
1349
1350 _, err = db.AddLabelDefinition(i.Db, def)
1351 if err != nil {
1352 return fmt.Errorf("failed to create labeldef: %w", err)
1353 }
1354
1355 return nil
1356
1357 case jmodels.CommitOperationDelete:
1358 if err := db.DeleteLabelDefinition(
1359 i.Db,
1360 orm.FilterEq("did", did),
1361 orm.FilterEq("rkey", rkey),
1362 ); err != nil {
1363 return fmt.Errorf("failed to delete labeldef record: %w", err)
1364 }
1365
1366 return nil
1367 }
1368
1369 return nil
1370}
1371
1372func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1373 did := e.Did
1374 rkey := e.Commit.RKey
1375
1376 var err error
1377
1378 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1379 l.Info("ingesting record")
1380
1381 switch e.Commit.Operation {
1382 case jmodels.CommitOperationCreate:
1383 raw := json.RawMessage(e.Commit.Record)
1384 record := tangled.LabelOp{}
1385 err = json.Unmarshal(raw, &record)
1386 if err != nil {
1387 return fmt.Errorf("invalid record: %w", err)
1388 }
1389
1390 subject := syntax.ATURI(record.Subject)
1391 collection := subject.Collection()
1392
1393 var repo *models.Repo
1394 switch collection {
1395 case tangled.RepoIssueNSID:
1396 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1397 if err != nil || len(i) != 1 {
1398 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1399 }
1400 repo = i[0].Repo
1401 default:
1402 return fmt.Errorf("unsupported label subject: %s", collection)
1403 }
1404
1405 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1406 if err != nil {
1407 return fmt.Errorf("failed to build label application ctx: %w", err)
1408 }
1409
1410 ops := models.LabelOpsFromRecord(did, rkey, record)
1411
1412 for _, o := range ops {
1413 def, ok := actx.Defs[o.OperandKey]
1414 if !ok {
1415 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1416 }
1417 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1418 return fmt.Errorf("failed to validate labelop: %w", err)
1419 }
1420 }
1421
1422 tx, err := i.Db.Begin()
1423 if err != nil {
1424 return err
1425 }
1426 defer tx.Rollback()
1427
1428 for _, o := range ops {
1429 _, err = db.AddLabelOp(tx, &o)
1430 if err != nil {
1431 return fmt.Errorf("failed to add labelop: %w", err)
1432 }
1433 }
1434
1435 if err = tx.Commit(); err != nil {
1436 return err
1437 }
1438 }
1439
1440 return nil
1441}