Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "sync"
16
17 "time"
18
19 "github.com/avast/retry-go/v4"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 jmodels "github.com/bluesky-social/jetstream/pkg/models"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/ipfs/go-cid"
24 "golang.org/x/sync/errgroup"
25 "tangled.org/core/api/tangled"
26 "tangled.org/core/appview/cache"
27 "tangled.org/core/appview/config"
28 "tangled.org/core/appview/db"
29 "tangled.org/core/appview/models"
30 "tangled.org/core/appview/serververify"
31 "tangled.org/core/appview/validator"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35)
36
37type Ingester struct {
38 Db *db.DB
39 Enforcer *rbac.Enforcer
40 IdResolver *idresolver.Resolver
41 Cache *cache.Cache
42 Config *config.Config
43 Logger *slog.Logger
44 Validator *validator.Validator
45}
46
47type processFunc func(ctx context.Context, e *jmodels.Event) error
48
49func (i *Ingester) Ingest() processFunc {
50 return func(ctx context.Context, e *jmodels.Event) error {
51 var err error
52
53 l := i.Logger.With("kind", e.Kind)
54 switch e.Kind {
55 case jmodels.EventKindAccount:
56 // TODO: sync account state to db
57 if e.Account.Active {
58 break
59 }
60 // TODO: revoke sessions by DID
61 if *e.Account.Status == "deactivated" {
62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
63 }
64 case jmodels.EventKindIdentity:
65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
66 case jmodels.EventKindCommit:
67 switch e.Commit.Collection {
68 case tangled.GraphFollowNSID:
69 err = i.ingestFollow(e)
70 case tangled.GraphVouchNSID:
71 err = i.ingestVouch(ctx, e)
72 case tangled.FeedStarNSID:
73 err = i.ingestStar(ctx, e)
74 case tangled.PublicKeyNSID:
75 err = i.ingestPublicKey(e)
76 case tangled.RepoArtifactNSID:
77 err = i.ingestArtifact(ctx, e)
78 case tangled.ActorProfileNSID:
79 err = i.ingestProfile(ctx, e)
80 case tangled.SpindleMemberNSID:
81 err = i.ingestSpindleMember(ctx, e)
82 case tangled.SpindleNSID:
83 err = i.ingestSpindle(ctx, e)
84 case tangled.KnotMemberNSID:
85 err = i.ingestKnotMember(e)
86 case tangled.KnotNSID:
87 err = i.ingestKnot(e)
88 case tangled.StringNSID:
89 err = i.ingestString(e)
90 case tangled.RepoIssueNSID:
91 err = i.ingestIssue(ctx, e)
92 case tangled.RepoPullNSID:
93 err = i.ingestPull(ctx, e)
94 case tangled.RepoIssueCommentNSID:
95 err = i.ingestIssueComment(e)
96 case tangled.LabelDefinitionNSID:
97 err = i.ingestLabelDefinition(e)
98 case tangled.LabelOpNSID:
99 err = i.ingestLabelOp(e)
100 }
101 l = i.Logger.With("nsid", e.Commit.Collection)
102 }
103
104 if err != nil {
105 l.Warn("failed to ingest record, skipping", "err", err)
106 }
107
108 lastTimeUs := e.TimeUS + 1
109 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
110 l.Error("failed to save cursor", "err", saveErr)
111 }
112
113 return nil
114 }
115}
116
117func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
118 var err error
119 did := e.Did
120
121 l := i.Logger.With("handler", "ingestStar")
122 l = l.With("nsid", e.Commit.Collection)
123
124 switch e.Commit.Operation {
125 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
126 var subjectUri syntax.ATURI
127
128 raw := json.RawMessage(e.Commit.Record)
129 record := tangled.FeedStar{}
130 err := json.Unmarshal(raw, &record)
131 if err != nil {
132 l.Error("invalid record", "err", err)
133 return err
134 }
135
136 star := &models.Star{
137 Did: did,
138 Rkey: e.Commit.RKey,
139 }
140
141 switch {
142 case record.SubjectDid != nil:
143 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
144 if repoErr == nil {
145 subjectUri = repo.RepoAt()
146 star.RepoAt = subjectUri
147 }
148 case record.Subject != nil:
149 subjectUri, err = syntax.ParseATURI(*record.Subject)
150 if err != nil {
151 l.Error("invalid record", "err", err)
152 return err
153 }
154 star.RepoAt = subjectUri
155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
156 if repoErr == nil && repo.RepoDid != "" {
157 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.FeedStarNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
159 }
160 }
161 default:
162 l.Error("star record has neither subject nor subjectDid")
163 return fmt.Errorf("star record has neither subject nor subjectDid")
164 }
165 err = db.AddStar(i.Db, star)
166 case jmodels.CommitOperationDelete:
167 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
168 }
169
170 if err != nil {
171 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
172 }
173
174 return nil
175}
176
177func (i *Ingester) ingestFollow(e *jmodels.Event) error {
178 var err error
179 did := e.Did
180
181 l := i.Logger.With("handler", "ingestFollow")
182 l = l.With("nsid", e.Commit.Collection)
183
184 switch e.Commit.Operation {
185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
186 raw := json.RawMessage(e.Commit.Record)
187 record := tangled.GraphFollow{}
188 err = json.Unmarshal(raw, &record)
189 if err != nil {
190 l.Error("invalid record", "err", err)
191 return err
192 }
193
194 err = db.AddFollow(i.Db, &models.Follow{
195 UserDid: did,
196 SubjectDid: record.Subject,
197 Rkey: e.Commit.RKey,
198 })
199 case jmodels.CommitOperationDelete:
200 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
201 }
202
203 if err != nil {
204 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
205 }
206
207 return nil
208}
209
210func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
211 var err error
212 did := e.Did
213
214 l := i.Logger.With("handler", "ingestVouch")
215 l = l.With("nsid", e.Commit.Collection)
216 l.Info("ingesting vouch")
217
218 switch e.Commit.Operation {
219 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
220 raw := json.RawMessage(e.Commit.Record)
221 record := tangled.GraphVouch{}
222 err = json.Unmarshal(raw, &record)
223 if err != nil {
224 l.Error("invalid record", "err", err)
225 return err
226 }
227
228 // rkey is the subject_did being vouched for/denounced
229 subjectDID := e.Commit.RKey
230
231 _, err = syntax.ParseDID(subjectDID)
232 if err != nil {
233 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
234 return fmt.Errorf("invalid subject_did: %w", err)
235 }
236
237 if did == subjectDID {
238 l.Warn("attempted self-vouch", "did", did)
239 return fmt.Errorf("cannot vouch for self")
240 }
241
242 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
243 if err != nil {
244 return err
245 }
246
247 if subjectId.Handle.IsInvalidHandle() {
248 return err
249 }
250
251 kind, err := models.ParseVouchKind(record.Kind)
252 if err != nil {
253 l.Error("invalid kind", "kind", kind)
254 return fmt.Errorf("invalid kind: %s", kind)
255 }
256
257 recordCid, err := cid.Parse(e.Commit.CID)
258 if err != nil {
259 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
260 return fmt.Errorf("invalid cid: %w", err)
261 }
262
263 var evidences []syntax.ATURI
264 for _, raw := range record.Evidences {
265 uri, parseErr := syntax.ParseATURI(raw)
266 if parseErr != nil {
267 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
268 continue
269 }
270 evidences = append(evidences, uri)
271 }
272
273 tx, txErr := i.Db.Begin()
274 if txErr != nil {
275 return fmt.Errorf("failed to start transaction: %w", txErr)
276 }
277
278 addErr := db.AddVouch(tx, &models.Vouch{
279 Did: syntax.DID(did),
280 SubjectDid: subjectId.DID,
281 Cid: recordCid,
282 Kind: kind,
283 Reason: record.Reason,
284 Evidences: evidences,
285 })
286 if addErr != nil {
287 tx.Rollback()
288 err = addErr
289 } else {
290 err = tx.Commit()
291 }
292
293 case jmodels.CommitOperationDelete:
294 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
295 }
296
297 if err != nil {
298 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
299 }
300
301 return nil
302}
303
304func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
305 did := e.Did
306 var err error
307
308 l := i.Logger.With("handler", "ingestPublicKey")
309 l = l.With("nsid", e.Commit.Collection)
310
311 switch e.Commit.Operation {
312 case jmodels.CommitOperationCreate:
313 l.Debug("processing add of pubkey")
314 raw := json.RawMessage(e.Commit.Record)
315 record := tangled.PublicKey{}
316 err = json.Unmarshal(raw, &record)
317 if err != nil {
318 l.Error("invalid record", "err", err)
319 return err
320 }
321
322 name := record.Name
323 key := record.Key
324 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
325 case jmodels.CommitOperationUpdate:
326 l.Debug("processing update of pubkey")
327 raw := json.RawMessage(e.Commit.Record)
328 record := tangled.PublicKey{}
329 err = json.Unmarshal(raw, &record)
330 if err != nil {
331 l.Error("invalid record", "err", err)
332 return err
333 }
334
335 name := record.Name
336 key := record.Key
337 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
338 case jmodels.CommitOperationDelete:
339 l.Debug("processing delete of pubkey")
340 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
341 }
342
343 if err != nil {
344 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
345 }
346
347 return nil
348}
349
350func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
351 did := e.Did
352 var err error
353
354 l := i.Logger.With("handler", "ingestArtifact")
355 l = l.With("nsid", e.Commit.Collection)
356
357 switch e.Commit.Operation {
358 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
359 raw := json.RawMessage(e.Commit.Record)
360 record := tangled.RepoArtifact{}
361 err = json.Unmarshal(raw, &record)
362 if err != nil {
363 l.Error("invalid record", "err", err)
364 return err
365 }
366
367 var repo *models.Repo
368 if record.RepoDid != nil && *record.RepoDid != "" {
369 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
370 if err != nil && !errors.Is(err, sql.ErrNoRows) {
371 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
372 }
373 }
374 if repo == nil && record.Repo != nil {
375 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
376 if parseErr != nil {
377 return parseErr
378 }
379 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
380 if err != nil {
381 return err
382 }
383 }
384 if repo == nil {
385 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
386 }
387
388 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
389 if err != nil || !ok {
390 return err
391 }
392
393 repoDid := repo.RepoDid
394 if repoDid == "" && record.RepoDid != nil {
395 repoDid = *record.RepoDid
396 }
397 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
398 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
399 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
400 }
401 }
402
403 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
404 if err != nil {
405 createdAt = time.Now()
406 }
407
408 artifact := models.Artifact{
409 Did: did,
410 Rkey: e.Commit.RKey,
411 RepoAt: repo.RepoAt(),
412 Tag: plumbing.Hash(record.Tag),
413 CreatedAt: createdAt,
414 BlobCid: cid.Cid(record.Artifact.Ref),
415 Name: record.Name,
416 Size: uint64(record.Artifact.Size),
417 MimeType: record.Artifact.MimeType,
418 }
419
420 err = db.AddArtifact(i.Db, artifact)
421 case jmodels.CommitOperationDelete:
422 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
423 }
424
425 if err != nil {
426 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
427 }
428
429 return nil
430}
431
432func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
433 did := e.Did
434 var err error
435
436 l := i.Logger.With("handler", "ingestProfile")
437 l = l.With("nsid", e.Commit.Collection)
438
439 if e.Commit.RKey != "self" {
440 return fmt.Errorf("ingestProfile only ingests `self` record")
441 }
442
443 switch e.Commit.Operation {
444 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
445 raw := json.RawMessage(e.Commit.Record)
446 record := tangled.ActorProfile{}
447 err = json.Unmarshal(raw, &record)
448 if err != nil {
449 l.Error("invalid record", "err", err)
450 return err
451 }
452
453 avatar := ""
454 if record.Avatar != nil {
455 avatar = record.Avatar.Ref.String()
456 }
457
458 description := ""
459 if record.Description != nil {
460 description = *record.Description
461 }
462
463 includeBluesky := record.Bluesky
464
465 pronouns := ""
466 if record.Pronouns != nil {
467 pronouns = *record.Pronouns
468 }
469
470 location := ""
471 if record.Location != nil {
472 location = *record.Location
473 }
474
475 var links [5]string
476 for i, l := range record.Links {
477 if i < 5 {
478 links[i] = l
479 }
480 }
481
482 var stats [2]models.VanityStat
483 for i, s := range record.Stats {
484 if i < 2 {
485 stats[i].Kind = models.ParseVanityStatKind(s)
486 }
487 }
488
489 var pinned [6]string
490 for i, r := range record.PinnedRepositories {
491 if i < 6 {
492 pinned[i] = r
493 }
494 }
495
496 var preferredHandle syntax.Handle
497 if record.PreferredHandle != nil {
498 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
499 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
500 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
501 preferredHandle = h
502 }
503 }
504 }
505
506 profile := models.Profile{
507 Did: did,
508 Avatar: avatar,
509 Description: description,
510 IncludeBluesky: includeBluesky,
511 Location: location,
512 Links: links,
513 Stats: stats,
514 PinnedRepos: pinned,
515 Pronouns: pronouns,
516 PreferredHandle: preferredHandle,
517 }
518
519 tx, err := i.Db.Begin()
520 if err != nil {
521 return fmt.Errorf("failed to start transaction")
522 }
523
524 err = db.ValidateProfile(tx, &profile)
525 if err != nil {
526 return fmt.Errorf("invalid profile record")
527 }
528
529 err = db.UpsertProfile(tx, &profile)
530 if err == nil && i.Cache != nil {
531 pipe := i.Cache.Pipeline()
532 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
533 if preferredHandle != "" {
534 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
535 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
536 } else {
537 pipe.Del(ctx, didKey)
538 }
539 if _, execErr := pipe.Exec(ctx); execErr != nil {
540 l.Warn("failed to update preferred handle cache", "err", execErr)
541 }
542 }
543 case jmodels.CommitOperationDelete:
544 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
545 }
546
547 if err != nil {
548 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
549 }
550
551 return nil
552}
553
554func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
555 did := e.Did
556 var err error
557
558 l := i.Logger.With("handler", "ingestSpindleMember")
559 l = l.With("nsid", e.Commit.Collection)
560
561 switch e.Commit.Operation {
562 case jmodels.CommitOperationCreate:
563 raw := json.RawMessage(e.Commit.Record)
564 record := tangled.SpindleMember{}
565 err = json.Unmarshal(raw, &record)
566 if err != nil {
567 l.Error("invalid record", "err", err)
568 return err
569 }
570
571 // only spindle owner can invite to spindles
572 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
573 if err != nil || !ok {
574 return fmt.Errorf("failed to enforce permissions: %w", err)
575 }
576
577 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
578 if err != nil {
579 return err
580 }
581
582 if memberId.Handle.IsInvalidHandle() {
583 return err
584 }
585
586 err = db.AddSpindleMember(i.Db, models.SpindleMember{
587 Did: syntax.DID(did),
588 Rkey: e.Commit.RKey,
589 Instance: record.Instance,
590 Subject: memberId.DID,
591 })
592 if !ok {
593 return fmt.Errorf("failed to add to db: %w", err)
594 }
595
596 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
597 if err != nil {
598 return fmt.Errorf("failed to update ACLs: %w", err)
599 }
600
601 l.Info("added spindle member")
602 case jmodels.CommitOperationDelete:
603 rkey := e.Commit.RKey
604
605 // get record from db first
606 members, err := db.GetSpindleMembers(
607 i.Db,
608 orm.FilterEq("did", did),
609 orm.FilterEq("rkey", rkey),
610 )
611 if err != nil || len(members) != 1 {
612 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
613 }
614 member := members[0]
615
616 tx, err := i.Db.Begin()
617 if err != nil {
618 return fmt.Errorf("failed to start txn: %w", err)
619 }
620
621 // remove record by rkey && update enforcer
622 if err = db.RemoveSpindleMember(
623 tx,
624 orm.FilterEq("did", did),
625 orm.FilterEq("rkey", rkey),
626 ); err != nil {
627 return fmt.Errorf("failed to remove from db: %w", err)
628 }
629
630 // update enforcer
631 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
632 if err != nil {
633 return fmt.Errorf("failed to update ACLs: %w", err)
634 }
635
636 if err = tx.Commit(); err != nil {
637 return fmt.Errorf("failed to commit txn: %w", err)
638 }
639
640 if err = i.Enforcer.E.SavePolicy(); err != nil {
641 return fmt.Errorf("failed to save ACLs: %w", err)
642 }
643
644 l.Info("removed spindle member")
645 }
646
647 return nil
648}
649
650func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
651 did := e.Did
652 var err error
653
654 l := i.Logger.With("handler", "ingestSpindle")
655 l = l.With("nsid", e.Commit.Collection)
656
657 switch e.Commit.Operation {
658 case jmodels.CommitOperationCreate:
659 raw := json.RawMessage(e.Commit.Record)
660 record := tangled.Spindle{}
661 err = json.Unmarshal(raw, &record)
662 if err != nil {
663 l.Error("invalid record", "err", err)
664 return err
665 }
666
667 instance := e.Commit.RKey
668
669 err := db.AddSpindle(i.Db, models.Spindle{
670 Owner: syntax.DID(did),
671 Instance: instance,
672 })
673 if err != nil {
674 l.Error("failed to add spindle to db", "err", err, "instance", instance)
675 return err
676 }
677
678 err = retry.Do(
679 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
680 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
681 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
682 )
683 if err != nil {
684 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
685 return err
686 }
687
688 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
689 if err != nil {
690 return fmt.Errorf("failed to mark verified: %w", err)
691 }
692
693 return nil
694
695 case jmodels.CommitOperationDelete:
696 instance := e.Commit.RKey
697
698 // get record from db first
699 spindles, err := db.GetSpindles(
700 ctx,
701 i.Db,
702 orm.FilterEq("owner", did),
703 orm.FilterEq("instance", instance),
704 )
705 if err != nil || len(spindles) != 1 {
706 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
707 }
708 spindle := spindles[0]
709
710 tx, err := i.Db.Begin()
711 if err != nil {
712 return err
713 }
714 defer func() {
715 tx.Rollback()
716 i.Enforcer.E.LoadPolicy()
717 }()
718
719 // remove spindle members first
720 err = db.RemoveSpindleMember(
721 tx,
722 orm.FilterEq("owner", did),
723 orm.FilterEq("instance", instance),
724 )
725 if err != nil {
726 return err
727 }
728
729 err = db.DeleteSpindle(
730 tx,
731 orm.FilterEq("owner", did),
732 orm.FilterEq("instance", instance),
733 )
734 if err != nil {
735 return err
736 }
737
738 if spindle.Verified != nil {
739 err = i.Enforcer.RemoveSpindle(instance)
740 if err != nil {
741 return err
742 }
743 }
744
745 err = tx.Commit()
746 if err != nil {
747 return err
748 }
749
750 err = i.Enforcer.E.SavePolicy()
751 if err != nil {
752 return err
753 }
754 }
755
756 return nil
757}
758
759func (i *Ingester) ingestString(e *jmodels.Event) error {
760 did := e.Did
761 rkey := e.Commit.RKey
762
763 var err error
764
765 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
766 l.Info("ingesting record")
767
768 switch e.Commit.Operation {
769 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
770 raw := json.RawMessage(e.Commit.Record)
771 record := tangled.String{}
772 err = json.Unmarshal(raw, &record)
773 if err != nil {
774 l.Error("invalid record", "err", err)
775 return err
776 }
777
778 string := models.StringFromRecord(did, rkey, record)
779
780 if err = i.Validator.ValidateString(&string); err != nil {
781 l.Error("invalid record", "err", err)
782 return err
783 }
784
785 if err = db.AddString(i.Db, string); err != nil {
786 l.Error("failed to add string", "err", err)
787 return err
788 }
789
790 return nil
791
792 case jmodels.CommitOperationDelete:
793 if err := db.DeleteString(
794 i.Db,
795 orm.FilterEq("did", did),
796 orm.FilterEq("rkey", rkey),
797 ); err != nil {
798 l.Error("failed to delete", "err", err)
799 return fmt.Errorf("failed to delete string record: %w", err)
800 }
801
802 return nil
803 }
804
805 return nil
806}
807
808func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
809 did := e.Did
810 var err error
811
812 l := i.Logger.With("handler", "ingestKnotMember")
813 l = l.With("nsid", e.Commit.Collection)
814
815 switch e.Commit.Operation {
816 case jmodels.CommitOperationCreate:
817 raw := json.RawMessage(e.Commit.Record)
818 record := tangled.KnotMember{}
819 err = json.Unmarshal(raw, &record)
820 if err != nil {
821 l.Error("invalid record", "err", err)
822 return err
823 }
824
825 // only knot owner can invite to knots
826 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
827 if err != nil || !ok {
828 return fmt.Errorf("failed to enforce permissions: %w", err)
829 }
830
831 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
832 if err != nil {
833 return err
834 }
835
836 if memberId.Handle.IsInvalidHandle() {
837 return err
838 }
839
840 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
841 if err != nil {
842 return fmt.Errorf("failed to update ACLs: %w", err)
843 }
844
845 l.Info("added knot member")
846 case jmodels.CommitOperationDelete:
847 // we don't store knot members in a table (like we do for spindle)
848 // and we can't remove this just yet. possibly fixed if we switch
849 // to either:
850 // 1. a knot_members table like with spindle and store the rkey
851 // 2. use the knot host as the rkey
852 //
853 // TODO: implement member deletion
854 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
855 }
856
857 return nil
858}
859
860func (i *Ingester) ingestKnot(e *jmodels.Event) error {
861 did := e.Did
862 var err error
863
864 l := i.Logger.With("handler", "ingestKnot")
865 l = l.With("nsid", e.Commit.Collection)
866
867 switch e.Commit.Operation {
868 case jmodels.CommitOperationCreate:
869 raw := json.RawMessage(e.Commit.Record)
870 record := tangled.Knot{}
871 err = json.Unmarshal(raw, &record)
872 if err != nil {
873 l.Error("invalid record", "err", err)
874 return err
875 }
876
877 domain := e.Commit.RKey
878
879 err := db.AddKnot(i.Db, domain, did)
880 if err != nil {
881 l.Error("failed to add knot to db", "err", err, "domain", domain)
882 return err
883 }
884
885 err = retry.Do(
886 func() error {
887 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
888 },
889 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
890 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
891 )
892 if err != nil {
893 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
894 return err
895 }
896
897 err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
898 if err != nil {
899 return fmt.Errorf("failed to mark verified: %w", err)
900 }
901
902 return nil
903
904 case jmodels.CommitOperationDelete:
905 domain := e.Commit.RKey
906
907 // get record from db first
908 registrations, err := db.GetRegistrations(
909 i.Db,
910 orm.FilterEq("domain", domain),
911 orm.FilterEq("did", did),
912 )
913 if err != nil {
914 return fmt.Errorf("failed to get registration: %w", err)
915 }
916 if len(registrations) != 1 {
917 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
918 }
919 registration := registrations[0]
920
921 tx, err := i.Db.Begin()
922 if err != nil {
923 return err
924 }
925 defer func() {
926 tx.Rollback()
927 i.Enforcer.E.LoadPolicy()
928 }()
929
930 err = db.DeleteKnot(
931 tx,
932 orm.FilterEq("did", did),
933 orm.FilterEq("domain", domain),
934 )
935 if err != nil {
936 return err
937 }
938
939 if registration.Registered != nil {
940 err = i.Enforcer.RemoveKnot(domain)
941 if err != nil {
942 return err
943 }
944 }
945
946 err = tx.Commit()
947 if err != nil {
948 return err
949 }
950
951 err = i.Enforcer.E.SavePolicy()
952 if err != nil {
953 return err
954 }
955 }
956
957 return nil
958}
959func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
960 did := e.Did
961 rkey := e.Commit.RKey
962
963 var err error
964
965 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
966 l.Info("ingesting record")
967
968 switch e.Commit.Operation {
969 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
970 raw := json.RawMessage(e.Commit.Record)
971 record := tangled.RepoIssue{}
972 err = json.Unmarshal(raw, &record)
973 if err != nil {
974 l.Error("invalid record", "err", err)
975 return err
976 }
977
978 issue := models.IssueFromRecord(did, rkey, record)
979
980 if issue.RepoAt == "" {
981 return fmt.Errorf("issue record has no repo field")
982 }
983
984 if err := i.Validator.ValidateIssue(&issue); err != nil {
985 return fmt.Errorf("failed to validate issue: %w", err)
986 }
987
988 if record.Repo != nil {
989 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
990 if repoErr == nil && repo.RepoDid != "" {
991 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
992 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
993 }
994 }
995 }
996
997 tx, err := i.Db.BeginTx(ctx, nil)
998 if err != nil {
999 l.Error("failed to begin transaction", "err", err)
1000 return err
1001 }
1002 defer tx.Rollback()
1003
1004 err = db.PutIssue(tx, &issue)
1005 if err != nil {
1006 l.Error("failed to create issue", "err", err)
1007 return err
1008 }
1009
1010 err = tx.Commit()
1011 if err != nil {
1012 l.Error("failed to commit txn", "err", err)
1013 return err
1014 }
1015
1016 return nil
1017
1018 case jmodels.CommitOperationDelete:
1019 tx, err := i.Db.BeginTx(ctx, nil)
1020 if err != nil {
1021 l.Error("failed to begin transaction", "err", err)
1022 return err
1023 }
1024 defer tx.Rollback()
1025
1026 if err := db.DeleteIssues(
1027 tx,
1028 did,
1029 rkey,
1030 ); err != nil {
1031 l.Error("failed to delete", "err", err)
1032 return fmt.Errorf("failed to delete issue record: %w", err)
1033 }
1034 if err := tx.Commit(); err != nil {
1035 l.Error("failed to commit txn", "err", err)
1036 return err
1037 }
1038
1039 return nil
1040 }
1041
1042 return nil
1043}
1044
1045func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1046 did := e.Did
1047 rkey := e.Commit.RKey
1048
1049 var err error
1050
1051 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1052 l.Info("ingesting record")
1053
1054 switch e.Commit.Operation {
1055 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1056 raw := json.RawMessage(e.Commit.Record)
1057 record := tangled.RepoPull{}
1058 err = json.Unmarshal(raw, &record)
1059 if err != nil {
1060 l.Error("invalid record", "err", err)
1061 return err
1062 }
1063
1064 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1065 if err != nil {
1066 l.Error("failed to resolve did")
1067 return err
1068 }
1069
1070 // go through and fetch all blobs in parallel
1071 readers := make([]*io.ReadCloser, len(record.Rounds))
1072 var mu sync.Mutex
1073
1074 g, gctx := errgroup.WithContext(ctx)
1075
1076 for idx, b := range record.Rounds {
1077 g.Go(func() error {
1078 // for some reason, a blob is empty
1079 if b.PatchBlob == nil {
1080 return fmt.Errorf("missing patchBlob in round %d", idx)
1081 }
1082
1083 ownerPds := ownerId.PDSEndpoint()
1084 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1085 q := url.Query()
1086 q.Set("cid", b.PatchBlob.Ref.String())
1087 q.Set("did", did)
1088 url.RawQuery = q.Encode()
1089
1090 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1091 if err != nil {
1092 l.Error("failed to create request")
1093 return err
1094 }
1095 req.Header.Set("Content-Type", "application/json")
1096
1097 resp, err := http.DefaultClient.Do(req)
1098 if err != nil {
1099 l.Error("failed to make request")
1100 return err
1101 }
1102
1103 mu.Lock()
1104 readers[idx] = &resp.Body
1105 mu.Unlock()
1106
1107 return nil
1108 })
1109 }
1110
1111 if err := g.Wait(); err != nil {
1112 for _, r := range readers {
1113 if r != nil && *r != nil {
1114 (*r).Close()
1115 }
1116 }
1117 return err
1118 }
1119
1120 defer func() {
1121 for _, r := range readers {
1122 if r != nil && *r != nil {
1123 (*r).Close()
1124 }
1125 }
1126 }()
1127
1128 pull, err := models.PullFromRecord(did, rkey, record, readers)
1129 if err != nil {
1130 return fmt.Errorf("failed to parse pull from record: %w", err)
1131 }
1132 if err := i.Validator.ValidatePull(pull); err != nil {
1133 return fmt.Errorf("failed to validate pull: %w", err)
1134 }
1135
1136 tx, err := i.Db.BeginTx(ctx, nil)
1137 if err != nil {
1138 l.Error("failed to begin transaction", "err", err)
1139 return err
1140 }
1141 defer tx.Rollback()
1142
1143 err = db.PutPull(tx, pull)
1144 if err != nil {
1145 l.Error("failed to create pull", "err", err)
1146 return err
1147 }
1148
1149 err = tx.Commit()
1150 if err != nil {
1151 l.Error("failed to commit txn", "err", err)
1152 return err
1153 }
1154
1155 return nil
1156
1157 case jmodels.CommitOperationDelete:
1158 tx, err := i.Db.BeginTx(ctx, nil)
1159 if err != nil {
1160 l.Error("failed to begin transaction", "err", err)
1161 return err
1162 }
1163 defer tx.Rollback()
1164
1165 if err := db.AbandonPulls(
1166 tx,
1167 orm.FilterEq("owner_did", did),
1168 orm.FilterEq("rkey", rkey),
1169 ); err != nil {
1170 l.Error("failed to abandon", "err", err)
1171 return fmt.Errorf("failed to abandon pull record: %w", err)
1172 }
1173 if err := tx.Commit(); err != nil {
1174 l.Error("failed to commit txn", "err", err)
1175 return err
1176 }
1177
1178 return nil
1179 }
1180
1181 return nil
1182}
1183
1184func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1185 did := e.Did
1186 rkey := e.Commit.RKey
1187
1188 var err error
1189
1190 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1191 l.Info("ingesting record")
1192
1193 switch e.Commit.Operation {
1194 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1195 raw := json.RawMessage(e.Commit.Record)
1196 record := tangled.RepoIssueComment{}
1197 err = json.Unmarshal(raw, &record)
1198 if err != nil {
1199 return fmt.Errorf("invalid record: %w", err)
1200 }
1201
1202 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1203 if err != nil {
1204 return fmt.Errorf("failed to parse comment from record: %w", err)
1205 }
1206
1207 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1208 return fmt.Errorf("failed to validate comment: %w", err)
1209 }
1210
1211 tx, err := i.Db.Begin()
1212 if err != nil {
1213 return fmt.Errorf("failed to start transaction: %w", err)
1214 }
1215 defer tx.Rollback()
1216
1217 _, err = db.AddIssueComment(tx, *comment)
1218 if err != nil {
1219 return fmt.Errorf("failed to create issue comment: %w", err)
1220 }
1221
1222 return tx.Commit()
1223
1224 case jmodels.CommitOperationDelete:
1225 if err := db.DeleteIssueComments(
1226 i.Db,
1227 orm.FilterEq("did", did),
1228 orm.FilterEq("rkey", rkey),
1229 ); err != nil {
1230 return fmt.Errorf("failed to delete issue comment record: %w", err)
1231 }
1232
1233 return nil
1234 }
1235
1236 return nil
1237}
1238
1239func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1240 did := e.Did
1241 rkey := e.Commit.RKey
1242
1243 var err error
1244
1245 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1246 l.Info("ingesting record")
1247
1248 switch e.Commit.Operation {
1249 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1250 raw := json.RawMessage(e.Commit.Record)
1251 record := tangled.LabelDefinition{}
1252 err = json.Unmarshal(raw, &record)
1253 if err != nil {
1254 return fmt.Errorf("invalid record: %w", err)
1255 }
1256
1257 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1258 if err != nil {
1259 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1260 }
1261
1262 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1263 return fmt.Errorf("failed to validate labeldef: %w", err)
1264 }
1265
1266 _, err = db.AddLabelDefinition(i.Db, def)
1267 if err != nil {
1268 return fmt.Errorf("failed to create labeldef: %w", err)
1269 }
1270
1271 return nil
1272
1273 case jmodels.CommitOperationDelete:
1274 if err := db.DeleteLabelDefinition(
1275 i.Db,
1276 orm.FilterEq("did", did),
1277 orm.FilterEq("rkey", rkey),
1278 ); err != nil {
1279 return fmt.Errorf("failed to delete labeldef record: %w", err)
1280 }
1281
1282 return nil
1283 }
1284
1285 return nil
1286}
1287
1288func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1289 did := e.Did
1290 rkey := e.Commit.RKey
1291
1292 var err error
1293
1294 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1295 l.Info("ingesting record")
1296
1297 switch e.Commit.Operation {
1298 case jmodels.CommitOperationCreate:
1299 raw := json.RawMessage(e.Commit.Record)
1300 record := tangled.LabelOp{}
1301 err = json.Unmarshal(raw, &record)
1302 if err != nil {
1303 return fmt.Errorf("invalid record: %w", err)
1304 }
1305
1306 subject := syntax.ATURI(record.Subject)
1307 collection := subject.Collection()
1308
1309 var repo *models.Repo
1310 switch collection {
1311 case tangled.RepoIssueNSID:
1312 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1313 if err != nil || len(i) != 1 {
1314 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1315 }
1316 repo = i[0].Repo
1317 default:
1318 return fmt.Errorf("unsupported label subject: %s", collection)
1319 }
1320
1321 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1322 if err != nil {
1323 return fmt.Errorf("failed to build label application ctx: %w", err)
1324 }
1325
1326 ops := models.LabelOpsFromRecord(did, rkey, record)
1327
1328 for _, o := range ops {
1329 def, ok := actx.Defs[o.OperandKey]
1330 if !ok {
1331 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1332 }
1333 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1334 return fmt.Errorf("failed to validate labelop: %w", err)
1335 }
1336 }
1337
1338 tx, err := i.Db.Begin()
1339 if err != nil {
1340 return err
1341 }
1342 defer tx.Rollback()
1343
1344 for _, o := range ops {
1345 _, err = db.AddLabelOp(tx, &o)
1346 if err != nil {
1347 return fmt.Errorf("failed to add labelop: %w", err)
1348 }
1349 }
1350
1351 if err = tx.Commit(); err != nil {
1352 return err
1353 }
1354 }
1355
1356 return nil
1357}