Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/knotacl"
31 "tangled.org/core/appview/mentions"
32 "tangled.org/core/appview/models"
33 "tangled.org/core/appview/notify"
34 "tangled.org/core/appview/repoverify"
35 "tangled.org/core/appview/serververify"
36 "tangled.org/core/appview/validator"
37 "tangled.org/core/blobstore"
38 "tangled.org/core/idresolver"
39 "tangled.org/core/orm"
40 "tangled.org/core/rbac"
41)
42
43type Ingester struct {
44 Ctx context.Context
45 Db *db.DB
46 Enforcer *rbac.Enforcer
47 Acl *knotacl.Service
48 IdResolver *idresolver.Resolver
49 BlobStore blobstore.BlobStore
50 Cache *cache.Cache
51 Config *config.Config
52 Logger *slog.Logger
53 Validator *validator.Validator
54 MentionsResolver *mentions.Resolver
55 Notifier notify.Notifier
56 Verifier repoverify.Verifier
57}
58
59type processFunc func(ctx context.Context, e *jmodels.Event) error
60
61func (i *Ingester) Ingest() processFunc {
62 return func(ctx context.Context, e *jmodels.Event) error {
63 var err error
64
65 l := i.Logger.With("kind", e.Kind)
66 switch e.Kind {
67 case jmodels.EventKindAccount:
68 // TODO: sync account state to db
69 if e.Account.Active {
70 break
71 }
72 // TODO: revoke sessions by DID
73 if *e.Account.Status == "deactivated" {
74 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
75 }
76 case jmodels.EventKindIdentity:
77 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
78 case jmodels.EventKindCommit:
79 l = l.With(
80 "nsid", e.Commit.Collection,
81 "did", e.Did,
82 "rkey", e.Commit.RKey,
83 "cid", e.Commit.CID,
84 "op", e.Commit.Operation,
85 )
86 switch e.Commit.Collection {
87 case tangled.GraphFollowNSID:
88 err = i.ingestFollow(e, l)
89 case tangled.GraphVouchNSID:
90 err = i.ingestVouch(ctx, e, l)
91 case tangled.FeedStarNSID:
92 err = i.ingestStar(ctx, e, l)
93 case tangled.FeedReactionNSID:
94 err = i.ingestReaction(e, l)
95 case tangled.PublicKeyNSID:
96 err = i.ingestPublicKey(e, l)
97 case tangled.RepoArtifactNSID:
98 err = i.ingestArtifact(ctx, e, l)
99 case tangled.ActorProfileNSID:
100 err = i.ingestProfile(ctx, e, l)
101 case tangled.SpindleMemberNSID:
102 err = i.ingestSpindleMember(ctx, e, l)
103 case tangled.SpindleNSID:
104 err = i.ingestSpindle(ctx, e, l)
105 case tangled.KnotMemberNSID:
106 err = i.ingestKnotMember(ctx, e, l)
107 case tangled.KnotNSID:
108 err = i.ingestKnot(ctx, e, l)
109 case tangled.StringNSID:
110 err = i.ingestString(ctx, e, l)
111 case tangled.RepoIssueNSID:
112 err = i.ingestIssue(ctx, e, l)
113 case tangled.RepoPullNSID:
114 err = i.ingestPull(ctx, e, l)
115 case tangled.FeedCommentNSID:
116 err = i.ingestComment(e, l)
117 case tangled.RepoIssueCommentNSID:
118 err = i.ingestIssueComment(e, l)
119 case tangled.RepoPullCommentNSID:
120 err = i.ingestPullComment(e, l)
121 case tangled.LabelDefinitionNSID:
122 err = i.ingestLabelDefinition(e, l)
123 case tangled.LabelOpNSID:
124 err = i.ingestLabelOp(ctx, e, l)
125 case tangled.RepoNSID:
126 err = i.ingestRepo(ctx, e, l)
127 }
128 }
129
130 if err != nil {
131 l.Warn("failed to ingest record, skipping", "err", err)
132 }
133
134 lastTimeUs := e.TimeUS + 1
135 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
136 l.Error("failed to save cursor", "err", saveErr)
137 }
138
139 return nil
140 }
141}
142
143func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
144 if strings.HasPrefix(ref, "did:") {
145 return db.GetRepoByDid(i.Db, ref)
146 }
147 return db.GetRepoByAtUri(i.Db, ref)
148}
149
150func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
151 var legacy struct {
152 Subject *string `json:"subject"`
153 SubjectDid *string `json:"subjectDid"`
154 }
155 if err := json.Unmarshal(raw, &legacy); err != nil {
156 return false, err
157 }
158
159 switch {
160 case legacy.SubjectDid != nil:
161 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
162 if err != nil {
163 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
164 return false, nil
165 }
166 star.SubjectType = models.StarSubjectRepo
167 star.Subject = repo.RepoDid
168 return true, nil
169
170 case legacy.Subject != nil:
171 uri, err := syntax.ParseATURI(*legacy.Subject)
172 if err != nil {
173 return false, fmt.Errorf("invalid old-format star subject: %w", err)
174 }
175 switch uri.Collection().String() {
176 case tangled.RepoNSID:
177 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
178 if err != nil {
179 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
180 return false, nil
181 }
182 star.SubjectType = models.StarSubjectRepo
183 star.Subject = repo.RepoDid
184 return true, nil
185 default:
186 star.SubjectType = models.StarSubjectString
187 star.Subject = *legacy.Subject
188 return true, nil
189 }
190
191 default:
192 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
193 }
194}
195
196func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
197 var err error
198 did := e.Did
199
200 l = l.With("handler", "ingestStar")
201
202 switch e.Commit.Operation {
203 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
204 raw := json.RawMessage(e.Commit.Record)
205 record := tangled.FeedStar{}
206 unmarshalErr := json.Unmarshal(raw, &record)
207
208 star := &models.Star{
209 Did: did,
210 Rkey: e.Commit.RKey,
211 }
212
213 switch {
214 case unmarshalErr != nil:
215 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
216 if resolveErr != nil {
217 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
218 return unmarshalErr
219 }
220 if !resolved {
221 return nil
222 }
223
224 case record.Subject == nil:
225 return fmt.Errorf("star record has nil subject")
226
227 case record.Subject.FeedStar_Repo != nil:
228 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
229 if repoErr != nil {
230 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
231 return nil
232 }
233 star.SubjectType = models.StarSubjectRepo
234 star.Subject = repo.RepoDid
235
236 case record.Subject.FeedStar_String != nil:
237 star.SubjectType = models.StarSubjectString
238 star.Subject = record.Subject.FeedStar_String.Uri
239
240 default:
241 return fmt.Errorf("star record has empty subject union")
242 }
243
244 err = db.AddStar(i.Db, star)
245 case jmodels.CommitOperationDelete:
246 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
247 }
248
249 if err != nil {
250 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
251 }
252
253 l.Info("ingested record")
254 return nil
255}
256
257func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error {
258 var err error
259 did := e.Did
260
261 l = l.With("handler", "ingestFollow")
262
263 switch e.Commit.Operation {
264 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
265 raw := json.RawMessage(e.Commit.Record)
266 record := tangled.GraphFollow{}
267 err = json.Unmarshal(raw, &record)
268 if err != nil {
269 l.Error("invalid record", "err", err)
270 return err
271 }
272
273 err = db.AddFollow(i.Db, &models.Follow{
274 UserDid: did,
275 SubjectDid: record.Subject,
276 Rkey: e.Commit.RKey,
277 })
278 case jmodels.CommitOperationDelete:
279 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
280 }
281
282 if err != nil {
283 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
284 }
285
286 l.Info("ingested record")
287 return nil
288}
289
290func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
291 var err error
292 did := e.Did
293
294 l = l.With("handler", "ingestVouch")
295
296 switch e.Commit.Operation {
297 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
298 raw := json.RawMessage(e.Commit.Record)
299 record := tangled.GraphVouch{}
300 err = json.Unmarshal(raw, &record)
301 if err != nil {
302 l.Error("invalid record", "err", err)
303 return err
304 }
305
306 // rkey is the subject_did being vouched for/denounced
307 subjectDID := e.Commit.RKey
308
309 _, err = syntax.ParseDID(subjectDID)
310 if err != nil {
311 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
312 return fmt.Errorf("invalid subject_did: %w", err)
313 }
314
315 if did == subjectDID {
316 l.Warn("attempted self-vouch", "did", did)
317 return fmt.Errorf("cannot vouch for self")
318 }
319
320 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
321 if err != nil {
322 return err
323 }
324
325 if subjectId.Handle.IsInvalidHandle() {
326 return err
327 }
328
329 kind, err := models.ParseVouchKind(record.Kind)
330 if err != nil {
331 l.Error("invalid kind", "kind", kind)
332 return fmt.Errorf("invalid kind: %s", kind)
333 }
334
335 recordCid, err := cid.Parse(e.Commit.CID)
336 if err != nil {
337 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
338 return fmt.Errorf("invalid cid: %w", err)
339 }
340
341 var evidences []syntax.ATURI
342 for _, raw := range record.Evidences {
343 uri, parseErr := syntax.ParseATURI(raw)
344 if parseErr != nil {
345 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
346 continue
347 }
348 evidences = append(evidences, uri)
349 }
350
351 tx, txErr := i.Db.Begin()
352 if txErr != nil {
353 return fmt.Errorf("failed to start transaction: %w", txErr)
354 }
355
356 addErr := db.AddVouch(tx, &models.Vouch{
357 Did: syntax.DID(did),
358 SubjectDid: subjectId.DID,
359 Cid: recordCid,
360 Kind: kind,
361 Reason: record.Reason,
362 Evidences: evidences,
363 })
364 if addErr != nil {
365 tx.Rollback()
366 err = addErr
367 } else {
368 err = tx.Commit()
369 }
370
371 case jmodels.CommitOperationDelete:
372 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
373 }
374
375 if err != nil {
376 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
377 }
378
379 l.Info("ingested record")
380 return nil
381}
382
383func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error {
384 did := e.Did
385 var err error
386
387 l = l.With("handler", "ingestPublicKey")
388
389 switch e.Commit.Operation {
390 case jmodels.CommitOperationCreate:
391 l.Debug("processing add of pubkey")
392 raw := json.RawMessage(e.Commit.Record)
393 record := tangled.PublicKey{}
394 err = json.Unmarshal(raw, &record)
395 if err != nil {
396 l.Error("invalid record", "err", err)
397 return err
398 }
399
400 name := record.Name
401 key := record.Key
402 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
403 case jmodels.CommitOperationUpdate:
404 l.Debug("processing update of pubkey")
405 raw := json.RawMessage(e.Commit.Record)
406 record := tangled.PublicKey{}
407 err = json.Unmarshal(raw, &record)
408 if err != nil {
409 l.Error("invalid record", "err", err)
410 return err
411 }
412
413 name := record.Name
414 key := record.Key
415 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
416 case jmodels.CommitOperationDelete:
417 l.Debug("processing delete of pubkey")
418 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
419 }
420
421 if err != nil {
422 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
423 }
424
425 l.Info("ingested record")
426 return nil
427}
428
429func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
430 did := e.Did
431 var err error
432
433 l = l.With("handler", "ingestArtifact")
434
435 switch e.Commit.Operation {
436 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
437 raw := json.RawMessage(e.Commit.Record)
438 record := tangled.RepoArtifact{}
439 err = json.Unmarshal(raw, &record)
440 if err != nil {
441 l.Error("invalid record", "err", err)
442 return err
443 }
444
445 var repo *models.Repo
446 if record.RepoDid != nil && *record.RepoDid != "" {
447 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
448 if err != nil && !errors.Is(err, sql.ErrNoRows) {
449 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
450 }
451 }
452 if repo == nil && record.Repo != nil {
453 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
454 if parseErr != nil {
455 return parseErr
456 }
457 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
458 if err != nil {
459 return err
460 }
461 }
462 if repo == nil {
463 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
464 }
465
466 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push")
467 if permErr != nil {
468 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr)
469 } else if !allowed {
470 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier())
471 return nil
472 }
473
474 repoDid := repo.RepoDid
475 if repoDid == "" && record.RepoDid != nil {
476 repoDid = *record.RepoDid
477 }
478 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
479 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
480 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
481 }
482 }
483
484 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
485 if parseErr != nil {
486 createdAt = time.Now()
487 }
488
489 artifact := models.Artifact{
490 Did: did,
491 Rkey: e.Commit.RKey,
492 RepoDid: syntax.DID(repo.RepoDid),
493 Tag: plumbing.Hash(record.Tag),
494 CreatedAt: createdAt,
495 BlobCid: cid.Cid(record.Artifact.Ref),
496 Name: record.Name,
497 Size: uint64(record.Artifact.Size),
498 MimeType: record.Artifact.MimeType,
499 }
500
501 err = db.AddArtifact(i.Db, artifact)
502 case jmodels.CommitOperationDelete:
503 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
504 }
505
506 if err != nil {
507 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
508 }
509
510 l.Info("ingested record")
511 return nil
512}
513
514func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
515 did := e.Did
516 var err error
517
518 l = l.With("handler", "ingestProfile")
519
520 if e.Commit.RKey != "self" {
521 return fmt.Errorf("ingestProfile only ingests `self` record")
522 }
523
524 switch e.Commit.Operation {
525 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
526 raw := json.RawMessage(e.Commit.Record)
527 record := tangled.ActorProfile{}
528 err = json.Unmarshal(raw, &record)
529 if err != nil {
530 l.Error("invalid record", "err", err)
531 return err
532 }
533
534 avatar := ""
535 if record.Avatar != nil {
536 avatar = record.Avatar.Ref.String()
537 }
538
539 description := ""
540 if record.Description != nil {
541 description = *record.Description
542 }
543
544 includeBluesky := record.Bluesky
545
546 pronouns := ""
547 if record.Pronouns != nil {
548 pronouns = *record.Pronouns
549 }
550
551 location := ""
552 if record.Location != nil {
553 location = *record.Location
554 }
555
556 var links [5]string
557 for i, l := range record.Links {
558 if i < 5 {
559 links[i] = l
560 }
561 }
562
563 var stats [2]models.VanityStat
564 for i, s := range record.Stats {
565 if i < 2 {
566 stats[i].Kind = models.ParseVanityStatKind(s)
567 }
568 }
569
570 var pinned [6]string
571 for i, r := range record.PinnedRepositories {
572 if i < 6 {
573 pinned[i] = r
574 }
575 }
576
577 var preferredHandle syntax.Handle
578 if record.PreferredHandle != nil {
579 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
580 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
581 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
582 preferredHandle = h
583 }
584 }
585 }
586
587 profile := models.Profile{
588 Did: did,
589 Avatar: avatar,
590 Description: description,
591 IncludeBluesky: includeBluesky,
592 Location: location,
593 Links: links,
594 Stats: stats,
595 PinnedRepos: pinned,
596 Pronouns: pronouns,
597 PreferredHandle: preferredHandle,
598 }
599
600 tx, err := i.Db.Begin()
601 if err != nil {
602 return fmt.Errorf("failed to start transaction: %w", err)
603 }
604
605 err = db.ValidateProfile(tx, &profile)
606 if err != nil {
607 return fmt.Errorf("invalid profile record")
608 }
609
610 err = db.UpsertProfile(tx, &profile)
611 if err == nil && i.Cache != nil {
612 pipe := i.Cache.Pipeline()
613 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
614 if preferredHandle != "" {
615 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
616 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
617 } else {
618 pipe.Del(ctx, didKey)
619 }
620 if _, execErr := pipe.Exec(ctx); execErr != nil {
621 l.Warn("failed to update preferred handle cache", "err", execErr)
622 }
623 }
624 case jmodels.CommitOperationDelete:
625 tx, beginErr := i.Db.Begin()
626 if beginErr != nil {
627 return fmt.Errorf("failed to start transaction: %w", beginErr)
628 }
629
630 priorHandle, phErr := db.GetPreferredHandle(tx, did)
631 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
632 l.Warn("failed to read prior preferred handle", "err", phErr)
633 }
634
635 err = db.DeleteProfile(tx, did)
636 if err == nil && i.Cache != nil {
637 pipe := i.Cache.Pipeline()
638 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
639 if priorHandle != "" {
640 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
641 }
642 if _, execErr := pipe.Exec(ctx); execErr != nil {
643 l.Warn("failed to evict preferred handle cache", "err", execErr)
644 }
645 }
646 }
647
648 if err != nil {
649 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
650 }
651
652 l.Info("ingested record")
653 return nil
654}
655
656func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
657 did := e.Did
658 var err error
659
660 l = l.With("handler", "ingestSpindleMember")
661
662 switch e.Commit.Operation {
663 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
664 raw := json.RawMessage(e.Commit.Record)
665 record := tangled.SpindleMember{}
666 err = json.Unmarshal(raw, &record)
667 if err != nil {
668 l.Error("invalid record", "err", err)
669 return err
670 }
671
672 // only spindle owner can invite to spindles
673 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
674 if err != nil {
675 return fmt.Errorf("failed to check invite permission: %w", err)
676 }
677 if !ok {
678 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
679 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
680 }
681 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
682 if err != nil {
683 return fmt.Errorf("failed to re-check invite permission: %w", err)
684 }
685 if !ok {
686 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
687 }
688 }
689
690 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
691 if err != nil {
692 return err
693 }
694
695 if memberId.Handle.IsInvalidHandle() {
696 return fmt.Errorf("invalid handle for member %s", record.Subject)
697 }
698
699 existing, err := db.GetSpindleMembers(i.Db,
700 orm.FilterEq("did", did),
701 orm.FilterEq("rkey", e.Commit.RKey),
702 )
703 if err != nil {
704 return fmt.Errorf("failed to look up existing member: %w", err)
705 }
706 if len(existing) > 1 {
707 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
708 }
709
710 tx, err := i.Db.Begin()
711 if err != nil {
712 return fmt.Errorf("failed to start txn: %w", err)
713 }
714 committed := false
715 defer func() {
716 if committed {
717 return
718 }
719 tx.Rollback()
720 i.Enforcer.E.LoadPolicy()
721 }()
722
723 if len(existing) == 1 {
724 prev := existing[0]
725 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
726 if err = db.RemoveSpindleMember(tx,
727 orm.FilterEq("did", did),
728 orm.FilterEq("rkey", e.Commit.RKey),
729 ); err != nil {
730 return fmt.Errorf("failed to remove stale row: %w", err)
731 }
732 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
733 return fmt.Errorf("failed to remove stale ACL: %w", err)
734 }
735 }
736 }
737
738 if err = db.AddSpindleMember(tx, models.SpindleMember{
739 Did: syntax.DID(did),
740 Rkey: e.Commit.RKey,
741 Instance: record.Instance,
742 Subject: memberId.DID,
743 }); err != nil {
744 return fmt.Errorf("failed to add to db: %w", err)
745 }
746
747 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
748 return fmt.Errorf("failed to update ACLs: %w", err)
749 }
750
751 if err = tx.Commit(); err != nil {
752 return fmt.Errorf("failed to commit txn: %w", err)
753 }
754
755 if err = i.Enforcer.E.SavePolicy(); err != nil {
756 return fmt.Errorf("failed to save ACLs: %w", err)
757 }
758 committed = true
759
760 l.Info("upserted spindle member")
761 case jmodels.CommitOperationDelete:
762 rkey := e.Commit.RKey
763
764 // get record from db first
765 members, err := db.GetSpindleMembers(
766 i.Db,
767 orm.FilterEq("did", did),
768 orm.FilterEq("rkey", rkey),
769 )
770 if err != nil || len(members) != 1 {
771 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
772 }
773 member := members[0]
774
775 tx, err := i.Db.Begin()
776 if err != nil {
777 return fmt.Errorf("failed to start txn: %w", err)
778 }
779 committed := false
780 defer func() {
781 if committed {
782 return
783 }
784 tx.Rollback()
785 i.Enforcer.E.LoadPolicy()
786 }()
787
788 // remove record by rkey && update enforcer
789 if err = db.RemoveSpindleMember(
790 tx,
791 orm.FilterEq("did", did),
792 orm.FilterEq("rkey", rkey),
793 ); err != nil {
794 return fmt.Errorf("failed to remove from db: %w", err)
795 }
796
797 // update enforcer
798 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
799 if err != nil {
800 return fmt.Errorf("failed to update ACLs: %w", err)
801 }
802
803 if err = tx.Commit(); err != nil {
804 return fmt.Errorf("failed to commit txn: %w", err)
805 }
806
807 if err = i.Enforcer.E.SavePolicy(); err != nil {
808 return fmt.Errorf("failed to save ACLs: %w", err)
809 }
810 committed = true
811
812 l.Info("removed spindle member")
813 }
814
815 return nil
816}
817
818func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
819 did := e.Did
820 var err error
821
822 l = l.With("handler", "ingestSpindle")
823
824 switch e.Commit.Operation {
825 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
826 raw := json.RawMessage(e.Commit.Record)
827 record := tangled.Spindle{}
828 err = json.Unmarshal(raw, &record)
829 if err != nil {
830 l.Error("invalid record", "err", err)
831 return err
832 }
833
834 instance := e.Commit.RKey
835
836 err := db.AddSpindle(i.Db, models.Spindle{
837 Owner: syntax.DID(did),
838 Instance: instance,
839 })
840 if err != nil {
841 l.Error("failed to add spindle to db", "err", err, "instance", instance)
842 return err
843 }
844
845 if err := i.verifySpindle(ctx, instance, did); err != nil {
846 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
847 }
848
849 l.Info("ingested record", "instance", instance)
850 return nil
851
852 case jmodels.CommitOperationDelete:
853 instance := e.Commit.RKey
854
855 // get record from db first
856 spindles, err := db.GetSpindles(
857 ctx,
858 i.Db,
859 orm.FilterEq("owner", did),
860 orm.FilterEq("instance", instance),
861 )
862 if err != nil || len(spindles) != 1 {
863 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
864 }
865 spindle := spindles[0]
866
867 tx, err := i.Db.Begin()
868 if err != nil {
869 return fmt.Errorf("failed to start txn: %w", err)
870 }
871 defer func() {
872 tx.Rollback()
873 i.Enforcer.E.LoadPolicy()
874 }()
875
876 // remove spindle members first
877 err = db.RemoveSpindleMember(
878 tx,
879 orm.FilterEq("owner", did),
880 orm.FilterEq("instance", instance),
881 )
882 if err != nil {
883 return fmt.Errorf("failed to remove spindle members: %w", err)
884 }
885
886 err = db.DeleteSpindle(
887 tx,
888 orm.FilterEq("owner", did),
889 orm.FilterEq("instance", instance),
890 )
891 if err != nil {
892 return fmt.Errorf("failed to delete spindle: %w", err)
893 }
894
895 if spindle.Verified != nil {
896 err = i.Enforcer.RemoveSpindle(instance)
897 if err != nil {
898 return fmt.Errorf("failed to remove spindle from enforcer: %w", err)
899 }
900 }
901
902 err = tx.Commit()
903 if err != nil {
904 return fmt.Errorf("failed to commit txn: %w", err)
905 }
906
907 err = i.Enforcer.E.SavePolicy()
908 if err != nil {
909 return fmt.Errorf("failed to save ACLs: %w", err)
910 }
911
912 l.Info("ingested record", "instance", instance)
913 }
914
915 return nil
916}
917
918func (i *Ingester) ingestString(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
919 did := e.Did
920 rkey := e.Commit.RKey
921
922 var err error
923
924 l = l.With("handler", "ingestString")
925
926 switch e.Commit.Operation {
927 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
928 raw := json.RawMessage(e.Commit.Record)
929 record := tangled.String{}
930 err = json.Unmarshal(raw, &record)
931 if err != nil {
932 l.Error("invalid record", "err", err)
933 return err
934 }
935
936 str, err := models.StringFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(e.Commit.CID), record)
937 if err != nil {
938 return fmt.Errorf("failed to parse string record: %w", err)
939 }
940 if err = str.Validate(); err != nil {
941 l.Error("invalid record", "err", err)
942 return err
943 }
944
945 if err = db.AddString(i.Db, str); err != nil {
946 l.Error("failed to add string", "err", err)
947 return err
948 }
949
950 l.Info("ingested record")
951 return nil
952
953 case jmodels.CommitOperationDelete:
954 if err := db.DeleteString(
955 i.Db,
956 orm.FilterEq("did", did),
957 orm.FilterEq("rkey", rkey),
958 ); err != nil {
959 l.Error("failed to delete", "err", err)
960 return fmt.Errorf("failed to delete string record: %w", err)
961 }
962
963 l.Info("ingested record")
964 return nil
965 }
966
967 return nil
968}
969
970func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
971 did := e.Did
972 var err error
973
974 l = l.With("handler", "ingestKnotMember")
975
976 switch e.Commit.Operation {
977 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
978 raw := json.RawMessage(e.Commit.Record)
979 record := tangled.KnotMember{}
980 err = json.Unmarshal(raw, &record)
981 if err != nil {
982 l.Error("invalid record", "err", err)
983 return err
984 }
985
986 // only knot owner can invite to knots
987 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
988 if err != nil {
989 return fmt.Errorf("failed to check invite permission: %w", err)
990 }
991 if !ok {
992 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
993 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
994 }
995 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
996 if err != nil {
997 return fmt.Errorf("failed to re-check invite permission: %w", err)
998 }
999 if !ok {
1000 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
1001 }
1002 }
1003
1004 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
1005 if err != nil {
1006 return err
1007 }
1008
1009 if memberId.Handle.IsInvalidHandle() {
1010 return fmt.Errorf("invalid handle for member %s", record.Subject)
1011 }
1012
1013 existing, err := db.GetKnotMembers(i.Db,
1014 orm.FilterEq("did", did),
1015 orm.FilterEq("rkey", e.Commit.RKey),
1016 )
1017 if err != nil {
1018 return fmt.Errorf("failed to look up existing member: %w", err)
1019 }
1020 if len(existing) > 1 {
1021 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1022 }
1023
1024 tx, err := i.Db.Begin()
1025 if err != nil {
1026 return fmt.Errorf("failed to start txn: %w", err)
1027 }
1028 committed := false
1029 defer func() {
1030 if committed {
1031 return
1032 }
1033 tx.Rollback()
1034 i.Enforcer.E.LoadPolicy()
1035 }()
1036
1037 if len(existing) == 1 {
1038 prev := existing[0]
1039 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1040 if err = db.RemoveKnotMember(tx,
1041 orm.FilterEq("did", did),
1042 orm.FilterEq("rkey", e.Commit.RKey),
1043 ); err != nil {
1044 return fmt.Errorf("failed to remove stale row: %w", err)
1045 }
1046 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1047 return fmt.Errorf("failed to remove stale ACL: %w", err)
1048 }
1049 }
1050 }
1051
1052 if err = db.AddKnotMember(tx, models.KnotMember{
1053 Did: syntax.DID(did),
1054 Rkey: e.Commit.RKey,
1055 Domain: record.Domain,
1056 Subject: memberId.DID,
1057 }); err != nil {
1058 return fmt.Errorf("failed to add to db: %w", err)
1059 }
1060
1061 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1062 return fmt.Errorf("failed to update ACLs: %w", err)
1063 }
1064
1065 if err = tx.Commit(); err != nil {
1066 return fmt.Errorf("failed to commit txn: %w", err)
1067 }
1068
1069 if err = i.Enforcer.E.SavePolicy(); err != nil {
1070 return fmt.Errorf("failed to save ACLs: %w", err)
1071 }
1072 committed = true
1073
1074 l.Info("upserted knot member")
1075 case jmodels.CommitOperationDelete:
1076 rkey := e.Commit.RKey
1077
1078 members, err := db.GetKnotMembers(
1079 i.Db,
1080 orm.FilterEq("did", did),
1081 orm.FilterEq("rkey", rkey),
1082 )
1083 if err != nil {
1084 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1085 }
1086 if len(members) == 0 {
1087 l.Info("knot member already removed", "rkey", rkey)
1088 return nil
1089 }
1090 if len(members) > 1 {
1091 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1092 }
1093 member := members[0]
1094
1095 tx, err := i.Db.Begin()
1096 if err != nil {
1097 return fmt.Errorf("failed to start txn: %w", err)
1098 }
1099 committed := false
1100 defer func() {
1101 if committed {
1102 return
1103 }
1104 tx.Rollback()
1105 i.Enforcer.E.LoadPolicy()
1106 }()
1107
1108 if err = db.RemoveKnotMember(
1109 tx,
1110 orm.FilterEq("did", did),
1111 orm.FilterEq("rkey", rkey),
1112 ); err != nil {
1113 return fmt.Errorf("failed to remove from db: %w", err)
1114 }
1115
1116 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1117 return fmt.Errorf("failed to update ACLs: %w", err)
1118 }
1119
1120 if err = tx.Commit(); err != nil {
1121 return fmt.Errorf("failed to commit txn: %w", err)
1122 }
1123
1124 if err = i.Enforcer.E.SavePolicy(); err != nil {
1125 return fmt.Errorf("failed to save ACLs: %w", err)
1126 }
1127 committed = true
1128
1129 l.Info("removed knot member")
1130 }
1131
1132 return nil
1133}
1134
1135func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1136 did := e.Did
1137 var err error
1138
1139 l = l.With("handler", "ingestKnot")
1140
1141 switch e.Commit.Operation {
1142 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1143 raw := json.RawMessage(e.Commit.Record)
1144 record := tangled.Knot{}
1145 err = json.Unmarshal(raw, &record)
1146 if err != nil {
1147 l.Error("invalid record", "err", err)
1148 return err
1149 }
1150
1151 domain := e.Commit.RKey
1152
1153 err := db.AddKnot(i.Db, domain, did)
1154 if err != nil {
1155 l.Error("failed to add knot to db", "err", err, "domain", domain)
1156 return err
1157 }
1158
1159 if err := i.verifyKnot(ctx, domain, did); err != nil {
1160 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1161 }
1162
1163 l.Info("ingested record", "domain", domain)
1164 return nil
1165
1166 case jmodels.CommitOperationDelete:
1167 domain := e.Commit.RKey
1168
1169 // get record from db first
1170 registrations, err := db.GetRegistrations(
1171 i.Db,
1172 orm.FilterEq("domain", domain),
1173 orm.FilterEq("did", did),
1174 )
1175 if err != nil {
1176 return fmt.Errorf("failed to get registration: %w", err)
1177 }
1178 if len(registrations) != 1 {
1179 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1180 }
1181 registration := registrations[0]
1182
1183 tx, err := i.Db.Begin()
1184 if err != nil {
1185 return fmt.Errorf("failed to start txn: %w", err)
1186 }
1187 defer func() {
1188 tx.Rollback()
1189 i.Enforcer.E.LoadPolicy()
1190 }()
1191
1192 err = db.RemoveKnotMember(
1193 tx,
1194 orm.FilterEq("did", did),
1195 orm.FilterEq("domain", domain),
1196 )
1197 if err != nil {
1198 return fmt.Errorf("failed to remove knot members: %w", err)
1199 }
1200
1201 err = db.DeleteKnot(
1202 tx,
1203 orm.FilterEq("did", did),
1204 orm.FilterEq("domain", domain),
1205 )
1206 if err != nil {
1207 return fmt.Errorf("failed to delete knot: %w", err)
1208 }
1209
1210 err = db.RemoveReposByKnot(tx, domain)
1211 if err != nil {
1212 return fmt.Errorf("failed to remove repos by knot: %w", err)
1213 }
1214
1215 if registration.Registered != nil {
1216 err = i.Enforcer.RemoveKnot(domain)
1217 if err != nil {
1218 return fmt.Errorf("failed to remove knot from enforcer: %w", err)
1219 }
1220 }
1221
1222 err = tx.Commit()
1223 if err != nil {
1224 return fmt.Errorf("failed to commit txn: %w", err)
1225 }
1226
1227 err = i.Enforcer.E.SavePolicy()
1228 if err != nil {
1229 return fmt.Errorf("failed to save ACLs: %w", err)
1230 }
1231
1232 l.Info("ingested record", "domain", domain)
1233 }
1234
1235 return nil
1236}
1237
1238const (
1239 verifyAttempts = 4
1240 verifyMinDelay = 1 * time.Second
1241 verifyMaxDelay = 5 * time.Second
1242)
1243
1244func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1245 regs, err := db.GetRegistrations(i.Db,
1246 orm.FilterEq("domain", domain),
1247 orm.FilterEq("did", did),
1248 )
1249 if err != nil {
1250 return fmt.Errorf("look up registration: %w", err)
1251 }
1252 if len(regs) != 1 {
1253 return fmt.Errorf("no registration for %s by %s", domain, did)
1254 }
1255 if regs[0].Registered != nil {
1256 return nil
1257 }
1258
1259 err = retry.Do(
1260 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1261 retry.Context(ctx),
1262 retry.Attempts(verifyAttempts),
1263 retry.Delay(verifyMinDelay),
1264 retry.MaxDelay(verifyMaxDelay),
1265 retry.DelayType(retry.BackOffDelay),
1266 retry.LastErrorOnly(true),
1267 )
1268 if err != nil {
1269 return fmt.Errorf("verify: %w", err)
1270 }
1271 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1272}
1273
1274func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1275 spindles, err := db.GetSpindles(ctx, i.Db,
1276 orm.FilterEq("instance", instance),
1277 orm.FilterEq("owner", did),
1278 )
1279 if err != nil {
1280 return fmt.Errorf("look up spindle: %w", err)
1281 }
1282 if len(spindles) != 1 {
1283 return fmt.Errorf("no spindle for %s by %s", instance, did)
1284 }
1285 if spindles[0].Verified != nil {
1286 return nil
1287 }
1288
1289 err = retry.Do(
1290 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1291 retry.Context(ctx),
1292 retry.Attempts(verifyAttempts),
1293 retry.Delay(verifyMinDelay),
1294 retry.MaxDelay(verifyMaxDelay),
1295 retry.DelayType(retry.BackOffDelay),
1296 retry.LastErrorOnly(true),
1297 )
1298 if err != nil {
1299 return fmt.Errorf("verify: %w", err)
1300 }
1301 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1302 return err
1303}
1304
1305const sweepConcurrency = 4
1306
1307func (i *Ingester) SweepPendingVerifications() {
1308 l := i.Logger.With("handler", "SweepPendingVerifications")
1309
1310 var g errgroup.Group
1311 g.SetLimit(sweepConcurrency)
1312
1313 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1314 if err != nil {
1315 l.Error("failed to list unverified knots", "err", err)
1316 } else {
1317 for _, reg := range regs {
1318 g.Go(func() error {
1319 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1320 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1321 }
1322 return nil
1323 })
1324 }
1325 }
1326
1327 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1328 if err != nil {
1329 l.Error("failed to list unverified spindles", "err", err)
1330 g.Wait()
1331 return
1332 }
1333 for _, s := range spindles {
1334 g.Go(func() error {
1335 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1336 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1337 }
1338 return nil
1339 })
1340 }
1341 g.Wait()
1342}
1343
1344func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1345 did := e.Did
1346 rkey := e.Commit.RKey
1347
1348 var err error
1349
1350 l = l.With("handler", "ingestIssue")
1351
1352 switch e.Commit.Operation {
1353 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1354 raw := json.RawMessage(e.Commit.Record)
1355 record := tangled.RepoIssue{}
1356 err = json.Unmarshal(raw, &record)
1357 if err != nil {
1358 l.Error("invalid record", "err", err)
1359 return err
1360 }
1361
1362 issue := models.IssueFromRecord(did, rkey, record)
1363
1364 if issue.RepoDid == "" {
1365 return fmt.Errorf("issue record has no repo field")
1366 }
1367 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1368 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1369 }
1370
1371 if err := i.Validator.ValidateIssue(&issue); err != nil {
1372 return fmt.Errorf("failed to validate issue: %w", err)
1373 }
1374
1375 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1376 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1377 if repoErr == nil && repo.RepoDid != "" {
1378 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1379 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1380 }
1381 }
1382 }
1383
1384 tx, err := i.Db.BeginTx(ctx, nil)
1385 if err != nil {
1386 l.Error("failed to begin transaction", "err", err)
1387 return err
1388 }
1389 defer tx.Rollback()
1390
1391 err = db.PutIssue(tx, &issue)
1392 if err != nil {
1393 l.Error("failed to create issue", "err", err)
1394 return err
1395 }
1396
1397 err = tx.Commit()
1398 if err != nil {
1399 l.Error("failed to commit txn", "err", err)
1400 return err
1401 }
1402
1403 l.Info("ingested record")
1404 return nil
1405
1406 case jmodels.CommitOperationDelete:
1407 tx, err := i.Db.BeginTx(ctx, nil)
1408 if err != nil {
1409 l.Error("failed to begin transaction", "err", err)
1410 return err
1411 }
1412 defer tx.Rollback()
1413
1414 if err := db.DeleteIssues(
1415 tx,
1416 did,
1417 rkey,
1418 ); err != nil {
1419 l.Error("failed to delete", "err", err)
1420 return fmt.Errorf("failed to delete issue record: %w", err)
1421 }
1422 if err := tx.Commit(); err != nil {
1423 l.Error("failed to commit txn", "err", err)
1424 return err
1425 }
1426
1427 l.Info("ingested record")
1428 return nil
1429 }
1430
1431 return nil
1432}
1433
1434func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1435 did := e.Did
1436 rkey := e.Commit.RKey
1437
1438 var err error
1439
1440 l = l.With("handler", "ingestPull")
1441
1442 switch e.Commit.Operation {
1443 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1444 raw := json.RawMessage(e.Commit.Record)
1445 record := tangled.RepoPull{}
1446 err = json.Unmarshal(raw, &record)
1447 if err != nil {
1448 l.Error("invalid record", "err", err)
1449 return err
1450 }
1451
1452 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1453 if err != nil {
1454 l.Error("failed to resolve did", "err", err)
1455 return err
1456 }
1457
1458 // go through and fetch all blobs in parallel
1459 readers := make([]*io.ReadCloser, len(record.Rounds))
1460 var mu sync.Mutex
1461
1462 g, gctx := errgroup.WithContext(ctx)
1463
1464 for idx, b := range record.Rounds {
1465 g.Go(func() error {
1466 // for some reason, a blob is empty
1467 if b.PatchBlob == nil {
1468 return fmt.Errorf("missing patchBlob in round %d", idx)
1469 }
1470
1471 ownerPds := ownerId.PDSEndpoint()
1472 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1473 q := url.Query()
1474 q.Set("cid", b.PatchBlob.Ref.String())
1475 q.Set("did", did)
1476 url.RawQuery = q.Encode()
1477
1478 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1479 if err != nil {
1480 l.Error("failed to create request")
1481 return err
1482 }
1483 req.Header.Set("Content-Type", "application/json")
1484
1485 resp, err := http.DefaultClient.Do(req)
1486 if err != nil {
1487 l.Error("failed to make request")
1488 return err
1489 }
1490
1491 mu.Lock()
1492 readers[idx] = &resp.Body
1493 mu.Unlock()
1494
1495 return nil
1496 })
1497 }
1498
1499 if err := g.Wait(); err != nil {
1500 for _, r := range readers {
1501 if r != nil && *r != nil {
1502 (*r).Close()
1503 }
1504 }
1505 return err
1506 }
1507
1508 defer func() {
1509 for _, r := range readers {
1510 if r != nil && *r != nil {
1511 (*r).Close()
1512 }
1513 }
1514 }()
1515
1516 pull, err := models.PullFromRecord(did, rkey, record, readers)
1517 if err != nil {
1518 return fmt.Errorf("failed to parse pull from record: %w", err)
1519 }
1520 if err := i.Validator.ValidatePull(pull); err != nil {
1521 return fmt.Errorf("failed to validate pull: %w", err)
1522 }
1523
1524 tx, err := i.Db.BeginTx(ctx, nil)
1525 if err != nil {
1526 l.Error("failed to begin transaction", "err", err)
1527 return err
1528 }
1529 defer tx.Rollback()
1530
1531 err = db.PutPull(tx, pull)
1532 if err != nil {
1533 l.Error("failed to create pull", "err", err)
1534 return err
1535 }
1536
1537 err = tx.Commit()
1538 if err != nil {
1539 l.Error("failed to commit txn", "err", err)
1540 return err
1541 }
1542
1543 l.Info("ingested record")
1544 return nil
1545
1546 case jmodels.CommitOperationDelete:
1547 tx, err := i.Db.BeginTx(ctx, nil)
1548 if err != nil {
1549 l.Error("failed to begin transaction", "err", err)
1550 return err
1551 }
1552 defer tx.Rollback()
1553
1554 if err := db.AbandonPulls(
1555 tx,
1556 orm.FilterEq("owner_did", did),
1557 orm.FilterEq("rkey", rkey),
1558 ); err != nil {
1559 l.Error("failed to abandon", "err", err)
1560 return fmt.Errorf("failed to abandon pull record: %w", err)
1561 }
1562 if err := tx.Commit(); err != nil {
1563 l.Error("failed to commit txn", "err", err)
1564 return err
1565 }
1566
1567 l.Info("ingested record")
1568 return nil
1569 }
1570
1571 return nil
1572}
1573
1574// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1575func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error {
1576 l = l.With("handler", "ingestIssueComment")
1577
1578 switch e.Commit.Operation {
1579 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1580 // no-op. sh.tangled.repo.issue.comment is deprecated
1581
1582 case jmodels.CommitOperationDelete:
1583 if err := db.PurgeComments(
1584 i.Db,
1585 orm.FilterEq("did", e.Did),
1586 orm.FilterEq("collection", e.Commit.Collection),
1587 orm.FilterEq("rkey", e.Commit.RKey),
1588 ); err != nil {
1589 return fmt.Errorf("failed to delete comment record: %w", err)
1590 }
1591 }
1592
1593 l.Info("ingested record")
1594 return nil
1595}
1596
1597// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1598func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error {
1599 l = l.With("handler", "ingestPullComment")
1600
1601 switch e.Commit.Operation {
1602 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1603 // no-op. sh.tangled.repo.pull.comment is deprecated
1604
1605 case jmodels.CommitOperationDelete:
1606 if err := db.PurgeComments(
1607 i.Db,
1608 orm.FilterEq("did", e.Did),
1609 orm.FilterEq("collection", e.Commit.Collection),
1610 orm.FilterEq("rkey", e.Commit.RKey),
1611 ); err != nil {
1612 return fmt.Errorf("failed to delete comment record: %w", err)
1613 }
1614 }
1615
1616 l.Info("ingested record")
1617 return nil
1618}
1619
1620func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error {
1621 did := e.Did
1622 rkey := e.Commit.RKey
1623 cid := e.Commit.CID
1624
1625 var err error
1626
1627 l = l.With("handler", "ingestComment")
1628
1629 ctx := context.Background()
1630
1631 switch e.Commit.Operation {
1632 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1633 raw := json.RawMessage(e.Commit.Record)
1634 record := tangled.FeedComment{}
1635 err = json.Unmarshal(raw, &record)
1636 if err != nil {
1637 return fmt.Errorf("invalid record: %w", err)
1638 }
1639
1640 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1641 if err != nil {
1642 return fmt.Errorf("failed to parse comment from record: %w", err)
1643 }
1644
1645 if err := comment.Validate(); err != nil {
1646 return fmt.Errorf("failed to validate comment: %w", err)
1647 }
1648
1649 var references []syntax.ATURI
1650 if comment.Body.Original != nil {
1651 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1652 }
1653
1654 tx, err := i.Db.Begin()
1655 if err != nil {
1656 return fmt.Errorf("failed to start transaction: %w", err)
1657 }
1658 defer tx.Rollback()
1659
1660 _, err = db.PutComment(tx, comment, references)
1661 if err != nil {
1662 return fmt.Errorf("failed to create comment: %w", err)
1663 }
1664
1665 if err := tx.Commit(); err != nil {
1666 return err
1667 }
1668
1669 case jmodels.CommitOperationDelete:
1670 if err := db.DeleteComments(
1671 i.Db,
1672 orm.FilterEq("did", did),
1673 orm.FilterEq("collection", e.Commit.Collection),
1674 orm.FilterEq("rkey", rkey),
1675 ); err != nil {
1676 return fmt.Errorf("failed to delete comment record: %w", err)
1677 }
1678 }
1679
1680 l.Info("ingested record")
1681 return nil
1682}
1683
1684func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error {
1685 did := e.Did
1686 rkey := e.Commit.RKey
1687
1688 l = l.With("handler", "ingestReaction")
1689
1690 switch e.Commit.Operation {
1691 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1692 raw := json.RawMessage(e.Commit.Record)
1693 record := tangled.FeedReaction{}
1694 if err := json.Unmarshal(raw, &record); err != nil {
1695 return fmt.Errorf("invalid record: %w", err)
1696 }
1697
1698 subjectUri, err := syntax.ParseATURI(record.Subject)
1699 if err != nil {
1700 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1701 }
1702 subjectUri = models.NormalizeReactionSubject(subjectUri)
1703
1704 kind, ok := models.ParseReactionKind(record.Reaction)
1705 if !ok {
1706 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1707 }
1708
1709 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1710 if parseErr != nil {
1711 created = time.Now()
1712 }
1713
1714 tx, err := i.Db.Begin()
1715 if err != nil {
1716 return fmt.Errorf("failed to start transaction: %w", err)
1717 }
1718 defer tx.Rollback()
1719
1720 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil {
1721 return fmt.Errorf("failed to clear existing reaction: %w", err)
1722 }
1723 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil {
1724 return fmt.Errorf("failed to add reaction: %w", err)
1725 }
1726
1727 if err := tx.Commit(); err != nil {
1728 return err
1729 }
1730
1731 case jmodels.CommitOperationDelete:
1732 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1733 return fmt.Errorf("failed to delete reaction record: %w", err)
1734 }
1735 }
1736
1737 l.Info("ingested record")
1738 return nil
1739}
1740
1741func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error {
1742 did := e.Did
1743 rkey := e.Commit.RKey
1744
1745 var err error
1746
1747 l = l.With("handler", "ingestLabelDefinition")
1748
1749 switch e.Commit.Operation {
1750 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1751 raw := json.RawMessage(e.Commit.Record)
1752 record := tangled.LabelDefinition{}
1753 err = json.Unmarshal(raw, &record)
1754 if err != nil {
1755 return fmt.Errorf("invalid record: %w", err)
1756 }
1757
1758 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1759 if err != nil {
1760 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1761 }
1762
1763 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1764 return fmt.Errorf("failed to validate labeldef: %w", err)
1765 }
1766
1767 _, err = db.AddLabelDefinition(i.Db, def)
1768 if err != nil {
1769 return fmt.Errorf("failed to create labeldef: %w", err)
1770 }
1771
1772 l.Info("ingested record")
1773 return nil
1774
1775 case jmodels.CommitOperationDelete:
1776 if err := db.DeleteLabelDefinition(
1777 i.Db,
1778 orm.FilterEq("did", did),
1779 orm.FilterEq("rkey", rkey),
1780 ); err != nil {
1781 return fmt.Errorf("failed to delete labeldef record: %w", err)
1782 }
1783
1784 l.Info("ingested record")
1785 return nil
1786 }
1787
1788 return nil
1789}
1790
1791func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1792 did := e.Did
1793 rkey := e.Commit.RKey
1794
1795 var err error
1796
1797 l = l.With("handler", "ingestLabelOp")
1798
1799 switch e.Commit.Operation {
1800 case jmodels.CommitOperationCreate:
1801 raw := json.RawMessage(e.Commit.Record)
1802 record := tangled.LabelOp{}
1803 err = json.Unmarshal(raw, &record)
1804 if err != nil {
1805 return fmt.Errorf("invalid record: %w", err)
1806 }
1807
1808 subject := syntax.ATURI(record.Subject)
1809 collection := subject.Collection()
1810
1811 var repo *models.Repo
1812 switch collection {
1813 case tangled.RepoIssueNSID:
1814 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1815 if err != nil || len(i) != 1 {
1816 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1817 }
1818 repo = i[0].Repo
1819 case tangled.RepoPullNSID:
1820 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1821 if err != nil || len(p) != 1 {
1822 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1823 }
1824 repo = p[0].Repo
1825 default:
1826 return fmt.Errorf("unsupported label subject: %s", collection)
1827 }
1828
1829 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1830 if err != nil {
1831 return fmt.Errorf("failed to build label application ctx: %w", err)
1832 }
1833
1834 ops := models.LabelOpsFromRecord(did, rkey, record)
1835
1836 for _, o := range ops {
1837 def, ok := actx.Defs[o.OperandKey]
1838 if !ok {
1839 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1840 }
1841 if err := i.Validator.ValidateLabelOp(ctx, def, repo, &o); err != nil {
1842 if !errors.Is(err, knotacl.ErrKnotUnreachable) {
1843 return fmt.Errorf("failed to validate labelop: %w", err)
1844 }
1845 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", err)
1846 }
1847 }
1848
1849 tx, err := i.Db.Begin()
1850 if err != nil {
1851 return err
1852 }
1853 defer tx.Rollback()
1854
1855 for _, o := range ops {
1856 _, err = db.AddLabelOp(tx, &o)
1857 if err != nil {
1858 return fmt.Errorf("failed to add labelop: %w", err)
1859 }
1860 }
1861
1862 if err = tx.Commit(); err != nil {
1863 return err
1864 }
1865
1866 l.Info("ingested record")
1867 }
1868
1869 return nil
1870}