Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "sync"
16
17 "time"
18
19 "github.com/avast/retry-go/v4"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 jmodels "github.com/bluesky-social/jetstream/pkg/models"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/ipfs/go-cid"
24 "golang.org/x/sync/errgroup"
25 "tangled.org/core/api/tangled"
26 "tangled.org/core/appview/cache"
27 "tangled.org/core/appview/config"
28 "tangled.org/core/appview/db"
29 "tangled.org/core/appview/models"
30 "tangled.org/core/appview/serververify"
31 "tangled.org/core/appview/validator"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35)
36
37type Ingester struct {
38 Db db.DbWrapper
39 Enforcer *rbac.Enforcer
40 IdResolver *idresolver.Resolver
41 Cache *cache.Cache
42 Config *config.Config
43 Logger *slog.Logger
44 Validator *validator.Validator
45}
46
47type processFunc func(ctx context.Context, e *jmodels.Event) error
48
49func (i *Ingester) Ingest() processFunc {
50 return func(ctx context.Context, e *jmodels.Event) error {
51 var err error
52
53 l := i.Logger.With("kind", e.Kind)
54 switch e.Kind {
55 case jmodels.EventKindAccount:
56 // TODO: sync account state to db
57 if e.Account.Active {
58 break
59 }
60 // TODO: revoke sessions by DID
61 if *e.Account.Status == "deactivated" {
62 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
63 }
64 case jmodels.EventKindIdentity:
65 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
66 case jmodels.EventKindCommit:
67 switch e.Commit.Collection {
68 case tangled.GraphFollowNSID:
69 err = i.ingestFollow(e)
70 case tangled.GraphVouchNSID:
71 err = i.ingestVouch(ctx, e)
72 case tangled.FeedStarNSID:
73 err = i.ingestStar(ctx, e)
74 case tangled.PublicKeyNSID:
75 err = i.ingestPublicKey(e)
76 case tangled.RepoArtifactNSID:
77 err = i.ingestArtifact(ctx, e)
78 case tangled.ActorProfileNSID:
79 err = i.ingestProfile(ctx, e)
80 case tangled.SpindleMemberNSID:
81 err = i.ingestSpindleMember(ctx, e)
82 case tangled.SpindleNSID:
83 err = i.ingestSpindle(ctx, e)
84 case tangled.KnotMemberNSID:
85 err = i.ingestKnotMember(e)
86 case tangled.KnotNSID:
87 err = i.ingestKnot(e)
88 case tangled.StringNSID:
89 err = i.ingestString(e)
90 case tangled.RepoIssueNSID:
91 err = i.ingestIssue(ctx, e)
92 case tangled.RepoPullNSID:
93 err = i.ingestPull(ctx, e)
94 case tangled.RepoIssueCommentNSID:
95 err = i.ingestIssueComment(e)
96 case tangled.LabelDefinitionNSID:
97 err = i.ingestLabelDefinition(e)
98 case tangled.LabelOpNSID:
99 err = i.ingestLabelOp(e)
100 }
101 l = i.Logger.With("nsid", e.Commit.Collection)
102 }
103
104 if err != nil {
105 l.Warn("failed to ingest record, skipping", "err", err)
106 }
107
108 lastTimeUs := e.TimeUS + 1
109 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
110 l.Error("failed to save cursor", "err", saveErr)
111 }
112
113 return nil
114 }
115}
116
117func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
118 var err error
119 did := e.Did
120
121 l := i.Logger.With("handler", "ingestStar")
122 l = l.With("nsid", e.Commit.Collection)
123
124 switch e.Commit.Operation {
125 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
126 var subjectUri syntax.ATURI
127
128 raw := json.RawMessage(e.Commit.Record)
129 record := tangled.FeedStar{}
130 err := json.Unmarshal(raw, &record)
131 if err != nil {
132 l.Error("invalid record", "err", err)
133 return err
134 }
135
136 star := &models.Star{
137 Did: did,
138 Rkey: e.Commit.RKey,
139 }
140
141 switch {
142 case record.SubjectDid != nil:
143 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
144 if repoErr == nil {
145 subjectUri = repo.RepoAt()
146 star.RepoAt = subjectUri
147 }
148 case record.Subject != nil:
149 subjectUri, err = syntax.ParseATURI(*record.Subject)
150 if err != nil {
151 l.Error("invalid record", "err", err)
152 return err
153 }
154 star.RepoAt = subjectUri
155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
156 if repoErr == nil && repo.RepoDid != "" {
157 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.FeedStarNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
159 }
160 }
161 default:
162 l.Error("star record has neither subject nor subjectDid")
163 return fmt.Errorf("star record has neither subject nor subjectDid")
164 }
165 err = db.AddStar(i.Db, star)
166 case jmodels.CommitOperationDelete:
167 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
168 }
169
170 if err != nil {
171 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
172 }
173
174 return nil
175}
176
177func (i *Ingester) ingestFollow(e *jmodels.Event) error {
178 var err error
179 did := e.Did
180
181 l := i.Logger.With("handler", "ingestFollow")
182 l = l.With("nsid", e.Commit.Collection)
183
184 switch e.Commit.Operation {
185 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
186 raw := json.RawMessage(e.Commit.Record)
187 record := tangled.GraphFollow{}
188 err = json.Unmarshal(raw, &record)
189 if err != nil {
190 l.Error("invalid record", "err", err)
191 return err
192 }
193
194 err = db.AddFollow(i.Db, &models.Follow{
195 UserDid: did,
196 SubjectDid: record.Subject,
197 Rkey: e.Commit.RKey,
198 })
199 case jmodels.CommitOperationDelete:
200 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
201 }
202
203 if err != nil {
204 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
205 }
206
207 return nil
208}
209
210func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
211 var err error
212 did := e.Did
213
214 l := i.Logger.With("handler", "ingestVouch")
215 l = l.With("nsid", e.Commit.Collection)
216 l.Info("ingesting vouch")
217
218 switch e.Commit.Operation {
219 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
220 raw := json.RawMessage(e.Commit.Record)
221 record := tangled.GraphVouch{}
222 err = json.Unmarshal(raw, &record)
223 if err != nil {
224 l.Error("invalid record", "err", err)
225 return err
226 }
227
228 // rkey is the subject_did being vouched for/denounced
229 subjectDID := e.Commit.RKey
230
231 _, err = syntax.ParseDID(subjectDID)
232 if err != nil {
233 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
234 return fmt.Errorf("invalid subject_did: %w", err)
235 }
236
237 if did == subjectDID {
238 l.Warn("attempted self-vouch", "did", did)
239 return fmt.Errorf("cannot vouch for self")
240 }
241
242 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
243 if err != nil {
244 return err
245 }
246
247 if subjectId.Handle.IsInvalidHandle() {
248 return err
249 }
250
251 kind, err := models.ParseVouchKind(record.Kind)
252 if err != nil {
253 l.Error("invalid kind", "kind", kind)
254 return fmt.Errorf("invalid kind: %s", kind)
255 }
256
257 recordCid, err := cid.Parse(e.Commit.CID)
258 if err != nil {
259 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
260 return fmt.Errorf("invalid cid: %w", err)
261 }
262
263 var evidences []syntax.ATURI
264 for _, raw := range record.Evidences {
265 uri, parseErr := syntax.ParseATURI(raw)
266 if parseErr != nil {
267 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
268 continue
269 }
270 evidences = append(evidences, uri)
271 }
272
273 ddb, ok := i.Db.Execer.(*db.DB)
274 if !ok {
275 return fmt.Errorf("failed to ingest vouch record, invalid db cast")
276 }
277
278 tx, txErr := ddb.Begin()
279 if txErr != nil {
280 return fmt.Errorf("failed to start transaction: %w", txErr)
281 }
282
283 addErr := db.AddVouch(tx, &models.Vouch{
284 Did: syntax.DID(did),
285 SubjectDid: subjectId.DID,
286 Cid: recordCid,
287 Kind: kind,
288 Reason: record.Reason,
289 Evidences: evidences,
290 })
291 if addErr != nil {
292 tx.Rollback()
293 err = addErr
294 } else {
295 err = tx.Commit()
296 }
297
298 case jmodels.CommitOperationDelete:
299 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
300 }
301
302 if err != nil {
303 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
304 }
305
306 return nil
307}
308
309func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
310 did := e.Did
311 var err error
312
313 l := i.Logger.With("handler", "ingestPublicKey")
314 l = l.With("nsid", e.Commit.Collection)
315
316 switch e.Commit.Operation {
317 case jmodels.CommitOperationCreate:
318 l.Debug("processing add of pubkey")
319 raw := json.RawMessage(e.Commit.Record)
320 record := tangled.PublicKey{}
321 err = json.Unmarshal(raw, &record)
322 if err != nil {
323 l.Error("invalid record", "err", err)
324 return err
325 }
326
327 name := record.Name
328 key := record.Key
329 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
330 case jmodels.CommitOperationUpdate:
331 l.Debug("processing update of pubkey")
332 raw := json.RawMessage(e.Commit.Record)
333 record := tangled.PublicKey{}
334 err = json.Unmarshal(raw, &record)
335 if err != nil {
336 l.Error("invalid record", "err", err)
337 return err
338 }
339
340 name := record.Name
341 key := record.Key
342 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
343 case jmodels.CommitOperationDelete:
344 l.Debug("processing delete of pubkey")
345 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
346 }
347
348 if err != nil {
349 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
350 }
351
352 return nil
353}
354
355func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
356 did := e.Did
357 var err error
358
359 l := i.Logger.With("handler", "ingestArtifact")
360 l = l.With("nsid", e.Commit.Collection)
361
362 switch e.Commit.Operation {
363 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
364 raw := json.RawMessage(e.Commit.Record)
365 record := tangled.RepoArtifact{}
366 err = json.Unmarshal(raw, &record)
367 if err != nil {
368 l.Error("invalid record", "err", err)
369 return err
370 }
371
372 var repo *models.Repo
373 if record.RepoDid != nil && *record.RepoDid != "" {
374 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
375 if err != nil && !errors.Is(err, sql.ErrNoRows) {
376 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
377 }
378 }
379 if repo == nil && record.Repo != nil {
380 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
381 if parseErr != nil {
382 return parseErr
383 }
384 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
385 if err != nil {
386 return err
387 }
388 }
389 if repo == nil {
390 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
391 }
392
393 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
394 if err != nil || !ok {
395 return err
396 }
397
398 repoDid := repo.RepoDid
399 if repoDid == "" && record.RepoDid != nil {
400 repoDid = *record.RepoDid
401 }
402 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
403 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
404 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
405 }
406 }
407
408 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
409 if err != nil {
410 createdAt = time.Now()
411 }
412
413 artifact := models.Artifact{
414 Did: did,
415 Rkey: e.Commit.RKey,
416 RepoAt: repo.RepoAt(),
417 Tag: plumbing.Hash(record.Tag),
418 CreatedAt: createdAt,
419 BlobCid: cid.Cid(record.Artifact.Ref),
420 Name: record.Name,
421 Size: uint64(record.Artifact.Size),
422 MimeType: record.Artifact.MimeType,
423 }
424
425 err = db.AddArtifact(i.Db, artifact)
426 case jmodels.CommitOperationDelete:
427 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
428 }
429
430 if err != nil {
431 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
432 }
433
434 return nil
435}
436
437func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
438 did := e.Did
439 var err error
440
441 l := i.Logger.With("handler", "ingestProfile")
442 l = l.With("nsid", e.Commit.Collection)
443
444 if e.Commit.RKey != "self" {
445 return fmt.Errorf("ingestProfile only ingests `self` record")
446 }
447
448 switch e.Commit.Operation {
449 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
450 raw := json.RawMessage(e.Commit.Record)
451 record := tangled.ActorProfile{}
452 err = json.Unmarshal(raw, &record)
453 if err != nil {
454 l.Error("invalid record", "err", err)
455 return err
456 }
457
458 avatar := ""
459 if record.Avatar != nil {
460 avatar = record.Avatar.Ref.String()
461 }
462
463 description := ""
464 if record.Description != nil {
465 description = *record.Description
466 }
467
468 includeBluesky := record.Bluesky
469
470 pronouns := ""
471 if record.Pronouns != nil {
472 pronouns = *record.Pronouns
473 }
474
475 location := ""
476 if record.Location != nil {
477 location = *record.Location
478 }
479
480 var links [5]string
481 for i, l := range record.Links {
482 if i < 5 {
483 links[i] = l
484 }
485 }
486
487 var stats [2]models.VanityStat
488 for i, s := range record.Stats {
489 if i < 2 {
490 stats[i].Kind = models.ParseVanityStatKind(s)
491 }
492 }
493
494 var pinned [6]string
495 for i, r := range record.PinnedRepositories {
496 if i < 6 {
497 pinned[i] = r
498 }
499 }
500
501 var preferredHandle syntax.Handle
502 if record.PreferredHandle != nil {
503 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
504 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
505 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
506 preferredHandle = h
507 }
508 }
509 }
510
511 profile := models.Profile{
512 Did: did,
513 Avatar: avatar,
514 Description: description,
515 IncludeBluesky: includeBluesky,
516 Location: location,
517 Links: links,
518 Stats: stats,
519 PinnedRepos: pinned,
520 Pronouns: pronouns,
521 PreferredHandle: preferredHandle,
522 }
523
524 ddb, ok := i.Db.Execer.(*db.DB)
525 if !ok {
526 return fmt.Errorf("failed to index profile record, invalid db cast")
527 }
528
529 tx, err := ddb.Begin()
530 if err != nil {
531 return fmt.Errorf("failed to start transaction")
532 }
533
534 err = db.ValidateProfile(tx, &profile)
535 if err != nil {
536 return fmt.Errorf("invalid profile record")
537 }
538
539 err = db.UpsertProfile(tx, &profile)
540 if err == nil && i.Cache != nil {
541 pipe := i.Cache.Pipeline()
542 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
543 if preferredHandle != "" {
544 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
545 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
546 } else {
547 pipe.Del(ctx, didKey)
548 }
549 if _, execErr := pipe.Exec(ctx); execErr != nil {
550 l.Warn("failed to update preferred handle cache", "err", execErr)
551 }
552 }
553 case jmodels.CommitOperationDelete:
554 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
555 }
556
557 if err != nil {
558 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
559 }
560
561 return nil
562}
563
564func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
565 did := e.Did
566 var err error
567
568 l := i.Logger.With("handler", "ingestSpindleMember")
569 l = l.With("nsid", e.Commit.Collection)
570
571 switch e.Commit.Operation {
572 case jmodels.CommitOperationCreate:
573 raw := json.RawMessage(e.Commit.Record)
574 record := tangled.SpindleMember{}
575 err = json.Unmarshal(raw, &record)
576 if err != nil {
577 l.Error("invalid record", "err", err)
578 return err
579 }
580
581 // only spindle owner can invite to spindles
582 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
583 if err != nil || !ok {
584 return fmt.Errorf("failed to enforce permissions: %w", err)
585 }
586
587 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
588 if err != nil {
589 return err
590 }
591
592 if memberId.Handle.IsInvalidHandle() {
593 return err
594 }
595
596 ddb, ok := i.Db.Execer.(*db.DB)
597 if !ok {
598 return fmt.Errorf("invalid db cast")
599 }
600
601 err = db.AddSpindleMember(ddb, models.SpindleMember{
602 Did: syntax.DID(did),
603 Rkey: e.Commit.RKey,
604 Instance: record.Instance,
605 Subject: memberId.DID,
606 })
607 if !ok {
608 return fmt.Errorf("failed to add to db: %w", err)
609 }
610
611 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
612 if err != nil {
613 return fmt.Errorf("failed to update ACLs: %w", err)
614 }
615
616 l.Info("added spindle member")
617 case jmodels.CommitOperationDelete:
618 rkey := e.Commit.RKey
619
620 ddb, ok := i.Db.Execer.(*db.DB)
621 if !ok {
622 return fmt.Errorf("failed to index profile record, invalid db cast")
623 }
624
625 // get record from db first
626 members, err := db.GetSpindleMembers(
627 ddb,
628 orm.FilterEq("did", did),
629 orm.FilterEq("rkey", rkey),
630 )
631 if err != nil || len(members) != 1 {
632 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
633 }
634 member := members[0]
635
636 tx, err := ddb.Begin()
637 if err != nil {
638 return fmt.Errorf("failed to start txn: %w", err)
639 }
640
641 // remove record by rkey && update enforcer
642 if err = db.RemoveSpindleMember(
643 tx,
644 orm.FilterEq("did", did),
645 orm.FilterEq("rkey", rkey),
646 ); err != nil {
647 return fmt.Errorf("failed to remove from db: %w", err)
648 }
649
650 // update enforcer
651 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
652 if err != nil {
653 return fmt.Errorf("failed to update ACLs: %w", err)
654 }
655
656 if err = tx.Commit(); err != nil {
657 return fmt.Errorf("failed to commit txn: %w", err)
658 }
659
660 if err = i.Enforcer.E.SavePolicy(); err != nil {
661 return fmt.Errorf("failed to save ACLs: %w", err)
662 }
663
664 l.Info("removed spindle member")
665 }
666
667 return nil
668}
669
670func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
671 did := e.Did
672 var err error
673
674 l := i.Logger.With("handler", "ingestSpindle")
675 l = l.With("nsid", e.Commit.Collection)
676
677 switch e.Commit.Operation {
678 case jmodels.CommitOperationCreate:
679 raw := json.RawMessage(e.Commit.Record)
680 record := tangled.Spindle{}
681 err = json.Unmarshal(raw, &record)
682 if err != nil {
683 l.Error("invalid record", "err", err)
684 return err
685 }
686
687 instance := e.Commit.RKey
688
689 ddb, ok := i.Db.Execer.(*db.DB)
690 if !ok {
691 return fmt.Errorf("failed to index profile record, invalid db cast")
692 }
693
694 err := db.AddSpindle(ddb, models.Spindle{
695 Owner: syntax.DID(did),
696 Instance: instance,
697 })
698 if err != nil {
699 l.Error("failed to add spindle to db", "err", err, "instance", instance)
700 return err
701 }
702
703 err = retry.Do(
704 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
705 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
706 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
707 )
708 if err != nil {
709 l.Error("failed to verify spindle after retries", "err", err, "instance", instance)
710 return err
711 }
712
713 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
714 if err != nil {
715 return fmt.Errorf("failed to mark verified: %w", err)
716 }
717
718 return nil
719
720 case jmodels.CommitOperationDelete:
721 instance := e.Commit.RKey
722
723 ddb, ok := i.Db.Execer.(*db.DB)
724 if !ok {
725 return fmt.Errorf("failed to index profile record, invalid db cast")
726 }
727
728 // get record from db first
729 spindles, err := db.GetSpindles(
730 ctx,
731 ddb,
732 orm.FilterEq("owner", did),
733 orm.FilterEq("instance", instance),
734 )
735 if err != nil || len(spindles) != 1 {
736 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
737 }
738 spindle := spindles[0]
739
740 tx, err := ddb.Begin()
741 if err != nil {
742 return err
743 }
744 defer func() {
745 tx.Rollback()
746 i.Enforcer.E.LoadPolicy()
747 }()
748
749 // remove spindle members first
750 err = db.RemoveSpindleMember(
751 tx,
752 orm.FilterEq("owner", did),
753 orm.FilterEq("instance", instance),
754 )
755 if err != nil {
756 return err
757 }
758
759 err = db.DeleteSpindle(
760 tx,
761 orm.FilterEq("owner", did),
762 orm.FilterEq("instance", instance),
763 )
764 if err != nil {
765 return err
766 }
767
768 if spindle.Verified != nil {
769 err = i.Enforcer.RemoveSpindle(instance)
770 if err != nil {
771 return err
772 }
773 }
774
775 err = tx.Commit()
776 if err != nil {
777 return err
778 }
779
780 err = i.Enforcer.E.SavePolicy()
781 if err != nil {
782 return err
783 }
784 }
785
786 return nil
787}
788
789func (i *Ingester) ingestString(e *jmodels.Event) error {
790 did := e.Did
791 rkey := e.Commit.RKey
792
793 var err error
794
795 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
796 l.Info("ingesting record")
797
798 ddb, ok := i.Db.Execer.(*db.DB)
799 if !ok {
800 return fmt.Errorf("failed to index string record, invalid db cast")
801 }
802
803 switch e.Commit.Operation {
804 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
805 raw := json.RawMessage(e.Commit.Record)
806 record := tangled.String{}
807 err = json.Unmarshal(raw, &record)
808 if err != nil {
809 l.Error("invalid record", "err", err)
810 return err
811 }
812
813 string := models.StringFromRecord(did, rkey, record)
814
815 if err = i.Validator.ValidateString(&string); err != nil {
816 l.Error("invalid record", "err", err)
817 return err
818 }
819
820 if err = db.AddString(ddb, string); err != nil {
821 l.Error("failed to add string", "err", err)
822 return err
823 }
824
825 return nil
826
827 case jmodels.CommitOperationDelete:
828 if err := db.DeleteString(
829 ddb,
830 orm.FilterEq("did", did),
831 orm.FilterEq("rkey", rkey),
832 ); err != nil {
833 l.Error("failed to delete", "err", err)
834 return fmt.Errorf("failed to delete string record: %w", err)
835 }
836
837 return nil
838 }
839
840 return nil
841}
842
843func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
844 did := e.Did
845 var err error
846
847 l := i.Logger.With("handler", "ingestKnotMember")
848 l = l.With("nsid", e.Commit.Collection)
849
850 switch e.Commit.Operation {
851 case jmodels.CommitOperationCreate:
852 raw := json.RawMessage(e.Commit.Record)
853 record := tangled.KnotMember{}
854 err = json.Unmarshal(raw, &record)
855 if err != nil {
856 l.Error("invalid record", "err", err)
857 return err
858 }
859
860 // only knot owner can invite to knots
861 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
862 if err != nil || !ok {
863 return fmt.Errorf("failed to enforce permissions: %w", err)
864 }
865
866 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
867 if err != nil {
868 return err
869 }
870
871 if memberId.Handle.IsInvalidHandle() {
872 return err
873 }
874
875 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
876 if err != nil {
877 return fmt.Errorf("failed to update ACLs: %w", err)
878 }
879
880 l.Info("added knot member")
881 case jmodels.CommitOperationDelete:
882 // we don't store knot members in a table (like we do for spindle)
883 // and we can't remove this just yet. possibly fixed if we switch
884 // to either:
885 // 1. a knot_members table like with spindle and store the rkey
886 // 2. use the knot host as the rkey
887 //
888 // TODO: implement member deletion
889 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
890 }
891
892 return nil
893}
894
895func (i *Ingester) ingestKnot(e *jmodels.Event) error {
896 did := e.Did
897 var err error
898
899 l := i.Logger.With("handler", "ingestKnot")
900 l = l.With("nsid", e.Commit.Collection)
901
902 switch e.Commit.Operation {
903 case jmodels.CommitOperationCreate:
904 raw := json.RawMessage(e.Commit.Record)
905 record := tangled.Knot{}
906 err = json.Unmarshal(raw, &record)
907 if err != nil {
908 l.Error("invalid record", "err", err)
909 return err
910 }
911
912 domain := e.Commit.RKey
913
914 ddb, ok := i.Db.Execer.(*db.DB)
915 if !ok {
916 return fmt.Errorf("failed to index profile record, invalid db cast")
917 }
918
919 err := db.AddKnot(ddb, domain, did)
920 if err != nil {
921 l.Error("failed to add knot to db", "err", err, "domain", domain)
922 return err
923 }
924
925 err = retry.Do(
926 func() error {
927 return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
928 },
929 retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second),
930 retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true),
931 )
932 if err != nil {
933 l.Error("failed to verify knot after retries", "err", err, "domain", domain)
934 return err
935 }
936
937 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
938 if err != nil {
939 return fmt.Errorf("failed to mark verified: %w", err)
940 }
941
942 return nil
943
944 case jmodels.CommitOperationDelete:
945 domain := e.Commit.RKey
946
947 ddb, ok := i.Db.Execer.(*db.DB)
948 if !ok {
949 return fmt.Errorf("failed to index knot record, invalid db cast")
950 }
951
952 // get record from db first
953 registrations, err := db.GetRegistrations(
954 ddb,
955 orm.FilterEq("domain", domain),
956 orm.FilterEq("did", did),
957 )
958 if err != nil {
959 return fmt.Errorf("failed to get registration: %w", err)
960 }
961 if len(registrations) != 1 {
962 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
963 }
964 registration := registrations[0]
965
966 tx, err := ddb.Begin()
967 if err != nil {
968 return err
969 }
970 defer func() {
971 tx.Rollback()
972 i.Enforcer.E.LoadPolicy()
973 }()
974
975 err = db.DeleteKnot(
976 tx,
977 orm.FilterEq("did", did),
978 orm.FilterEq("domain", domain),
979 )
980 if err != nil {
981 return err
982 }
983
984 if registration.Registered != nil {
985 err = i.Enforcer.RemoveKnot(domain)
986 if err != nil {
987 return err
988 }
989 }
990
991 err = tx.Commit()
992 if err != nil {
993 return err
994 }
995
996 err = i.Enforcer.E.SavePolicy()
997 if err != nil {
998 return err
999 }
1000 }
1001
1002 return nil
1003}
1004func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1005 did := e.Did
1006 rkey := e.Commit.RKey
1007
1008 var err error
1009
1010 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1011 l.Info("ingesting record")
1012
1013 ddb, ok := i.Db.Execer.(*db.DB)
1014 if !ok {
1015 return fmt.Errorf("failed to index issue record, invalid db cast")
1016 }
1017
1018 switch e.Commit.Operation {
1019 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1020 raw := json.RawMessage(e.Commit.Record)
1021 record := tangled.RepoIssue{}
1022 err = json.Unmarshal(raw, &record)
1023 if err != nil {
1024 l.Error("invalid record", "err", err)
1025 return err
1026 }
1027
1028 issue := models.IssueFromRecord(did, rkey, record)
1029
1030 if issue.RepoAt == "" {
1031 return fmt.Errorf("issue record has no repo field")
1032 }
1033
1034 if err := i.Validator.ValidateIssue(&issue); err != nil {
1035 return fmt.Errorf("failed to validate issue: %w", err)
1036 }
1037
1038 if record.Repo != nil {
1039 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
1040 if repoErr == nil && repo.RepoDid != "" {
1041 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1042 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1043 }
1044 }
1045 }
1046
1047 tx, err := ddb.BeginTx(ctx, nil)
1048 if err != nil {
1049 l.Error("failed to begin transaction", "err", err)
1050 return err
1051 }
1052 defer tx.Rollback()
1053
1054 err = db.PutIssue(tx, &issue)
1055 if err != nil {
1056 l.Error("failed to create issue", "err", err)
1057 return err
1058 }
1059
1060 err = tx.Commit()
1061 if err != nil {
1062 l.Error("failed to commit txn", "err", err)
1063 return err
1064 }
1065
1066 return nil
1067
1068 case jmodels.CommitOperationDelete:
1069 tx, err := ddb.BeginTx(ctx, nil)
1070 if err != nil {
1071 l.Error("failed to begin transaction", "err", err)
1072 return err
1073 }
1074 defer tx.Rollback()
1075
1076 if err := db.DeleteIssues(
1077 tx,
1078 did,
1079 rkey,
1080 ); err != nil {
1081 l.Error("failed to delete", "err", err)
1082 return fmt.Errorf("failed to delete issue record: %w", err)
1083 }
1084 if err := tx.Commit(); err != nil {
1085 l.Error("failed to commit txn", "err", err)
1086 return err
1087 }
1088
1089 return nil
1090 }
1091
1092 return nil
1093}
1094
1095func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1096 did := e.Did
1097 rkey := e.Commit.RKey
1098
1099 var err error
1100
1101 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1102 l.Info("ingesting record")
1103
1104 ddb, ok := i.Db.Execer.(*db.DB)
1105 if !ok {
1106 return fmt.Errorf("failed to index pull record, invalid db cast")
1107 }
1108
1109 switch e.Commit.Operation {
1110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1111 raw := json.RawMessage(e.Commit.Record)
1112 record := tangled.RepoPull{}
1113 err = json.Unmarshal(raw, &record)
1114 if err != nil {
1115 l.Error("invalid record", "err", err)
1116 return err
1117 }
1118
1119 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1120 if err != nil {
1121 l.Error("failed to resolve did")
1122 return err
1123 }
1124
1125 // go through and fetch all blobs in parallel
1126 readers := make([]*io.ReadCloser, len(record.Rounds))
1127 var mu sync.Mutex
1128
1129 g, gctx := errgroup.WithContext(ctx)
1130
1131 for idx, b := range record.Rounds {
1132 g.Go(func() error {
1133 // for some reason, a blob is empty
1134 if b.PatchBlob == nil {
1135 return fmt.Errorf("missing patchBlob in round %d", idx)
1136 }
1137
1138 ownerPds := ownerId.PDSEndpoint()
1139 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1140 q := url.Query()
1141 q.Set("cid", b.PatchBlob.Ref.String())
1142 q.Set("did", did)
1143 url.RawQuery = q.Encode()
1144
1145 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1146 if err != nil {
1147 l.Error("failed to create request")
1148 return err
1149 }
1150 req.Header.Set("Content-Type", "application/json")
1151
1152 resp, err := http.DefaultClient.Do(req)
1153 if err != nil {
1154 l.Error("failed to make request")
1155 return err
1156 }
1157
1158 mu.Lock()
1159 readers[idx] = &resp.Body
1160 mu.Unlock()
1161
1162 return nil
1163 })
1164 }
1165
1166 if err := g.Wait(); err != nil {
1167 for _, r := range readers {
1168 if r != nil && *r != nil {
1169 (*r).Close()
1170 }
1171 }
1172 return err
1173 }
1174
1175 defer func() {
1176 for _, r := range readers {
1177 if r != nil && *r != nil {
1178 (*r).Close()
1179 }
1180 }
1181 }()
1182
1183 pull, err := models.PullFromRecord(did, rkey, record, readers)
1184 if err != nil {
1185 return fmt.Errorf("failed to parse pull from record: %w", err)
1186 }
1187 if err := i.Validator.ValidatePull(pull); err != nil {
1188 return fmt.Errorf("failed to validate pull: %w", err)
1189 }
1190
1191 tx, err := ddb.BeginTx(ctx, nil)
1192 if err != nil {
1193 l.Error("failed to begin transaction", "err", err)
1194 return err
1195 }
1196 defer tx.Rollback()
1197
1198 err = db.PutPull(tx, pull)
1199 if err != nil {
1200 l.Error("failed to create pull", "err", err)
1201 return err
1202 }
1203
1204 err = tx.Commit()
1205 if err != nil {
1206 l.Error("failed to commit txn", "err", err)
1207 return err
1208 }
1209
1210 return nil
1211
1212 case jmodels.CommitOperationDelete:
1213 tx, err := ddb.BeginTx(ctx, nil)
1214 if err != nil {
1215 l.Error("failed to begin transaction", "err", err)
1216 return err
1217 }
1218 defer tx.Rollback()
1219
1220 if err := db.AbandonPulls(
1221 tx,
1222 orm.FilterEq("owner_did", did),
1223 orm.FilterEq("rkey", rkey),
1224 ); err != nil {
1225 l.Error("failed to abandon", "err", err)
1226 return fmt.Errorf("failed to abandon pull record: %w", err)
1227 }
1228 if err := tx.Commit(); err != nil {
1229 l.Error("failed to commit txn", "err", err)
1230 return err
1231 }
1232
1233 return nil
1234 }
1235
1236 return nil
1237}
1238
1239func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1240 did := e.Did
1241 rkey := e.Commit.RKey
1242
1243 var err error
1244
1245 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1246 l.Info("ingesting record")
1247
1248 ddb, ok := i.Db.Execer.(*db.DB)
1249 if !ok {
1250 return fmt.Errorf("failed to index issue comment record, invalid db cast")
1251 }
1252
1253 switch e.Commit.Operation {
1254 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1255 raw := json.RawMessage(e.Commit.Record)
1256 record := tangled.RepoIssueComment{}
1257 err = json.Unmarshal(raw, &record)
1258 if err != nil {
1259 return fmt.Errorf("invalid record: %w", err)
1260 }
1261
1262 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1263 if err != nil {
1264 return fmt.Errorf("failed to parse comment from record: %w", err)
1265 }
1266
1267 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1268 return fmt.Errorf("failed to validate comment: %w", err)
1269 }
1270
1271 tx, err := ddb.Begin()
1272 if err != nil {
1273 return fmt.Errorf("failed to start transaction: %w", err)
1274 }
1275 defer tx.Rollback()
1276
1277 _, err = db.AddIssueComment(tx, *comment)
1278 if err != nil {
1279 return fmt.Errorf("failed to create issue comment: %w", err)
1280 }
1281
1282 return tx.Commit()
1283
1284 case jmodels.CommitOperationDelete:
1285 if err := db.DeleteIssueComments(
1286 ddb,
1287 orm.FilterEq("did", did),
1288 orm.FilterEq("rkey", rkey),
1289 ); err != nil {
1290 return fmt.Errorf("failed to delete issue comment record: %w", err)
1291 }
1292
1293 return nil
1294 }
1295
1296 return nil
1297}
1298
1299func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1300 did := e.Did
1301 rkey := e.Commit.RKey
1302
1303 var err error
1304
1305 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1306 l.Info("ingesting record")
1307
1308 ddb, ok := i.Db.Execer.(*db.DB)
1309 if !ok {
1310 return fmt.Errorf("failed to index label definition, invalid db cast")
1311 }
1312
1313 switch e.Commit.Operation {
1314 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1315 raw := json.RawMessage(e.Commit.Record)
1316 record := tangled.LabelDefinition{}
1317 err = json.Unmarshal(raw, &record)
1318 if err != nil {
1319 return fmt.Errorf("invalid record: %w", err)
1320 }
1321
1322 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1323 if err != nil {
1324 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1325 }
1326
1327 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1328 return fmt.Errorf("failed to validate labeldef: %w", err)
1329 }
1330
1331 _, err = db.AddLabelDefinition(ddb, def)
1332 if err != nil {
1333 return fmt.Errorf("failed to create labeldef: %w", err)
1334 }
1335
1336 return nil
1337
1338 case jmodels.CommitOperationDelete:
1339 if err := db.DeleteLabelDefinition(
1340 ddb,
1341 orm.FilterEq("did", did),
1342 orm.FilterEq("rkey", rkey),
1343 ); err != nil {
1344 return fmt.Errorf("failed to delete labeldef record: %w", err)
1345 }
1346
1347 return nil
1348 }
1349
1350 return nil
1351}
1352
1353func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1354 did := e.Did
1355 rkey := e.Commit.RKey
1356
1357 var err error
1358
1359 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1360 l.Info("ingesting record")
1361
1362 ddb, ok := i.Db.Execer.(*db.DB)
1363 if !ok {
1364 return fmt.Errorf("failed to index label op, invalid db cast")
1365 }
1366
1367 switch e.Commit.Operation {
1368 case jmodels.CommitOperationCreate:
1369 raw := json.RawMessage(e.Commit.Record)
1370 record := tangled.LabelOp{}
1371 err = json.Unmarshal(raw, &record)
1372 if err != nil {
1373 return fmt.Errorf("invalid record: %w", err)
1374 }
1375
1376 subject := syntax.ATURI(record.Subject)
1377 collection := subject.Collection()
1378
1379 var repo *models.Repo
1380 switch collection {
1381 case tangled.RepoIssueNSID:
1382 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1383 if err != nil || len(i) != 1 {
1384 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1385 }
1386 repo = i[0].Repo
1387 default:
1388 return fmt.Errorf("unsupported label subject: %s", collection)
1389 }
1390
1391 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1392 if err != nil {
1393 return fmt.Errorf("failed to build label application ctx: %w", err)
1394 }
1395
1396 ops := models.LabelOpsFromRecord(did, rkey, record)
1397
1398 for _, o := range ops {
1399 def, ok := actx.Defs[o.OperandKey]
1400 if !ok {
1401 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1402 }
1403 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1404 return fmt.Errorf("failed to validate labelop: %w", err)
1405 }
1406 }
1407
1408 tx, err := ddb.Begin()
1409 if err != nil {
1410 return err
1411 }
1412 defer tx.Rollback()
1413
1414 for _, o := range ops {
1415 _, err = db.AddLabelOp(tx, &o)
1416 if err != nil {
1417 return fmt.Errorf("failed to add labelop: %w", err)
1418 }
1419 }
1420
1421 if err = tx.Commit(); err != nil {
1422 return err
1423 }
1424 }
1425
1426 return nil
1427}