Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/knotacl"
31 "tangled.org/core/appview/mentions"
32 "tangled.org/core/appview/models"
33 "tangled.org/core/appview/notify"
34 "tangled.org/core/appview/repoverify"
35 "tangled.org/core/appview/serververify"
36 "tangled.org/core/appview/validator"
37 "tangled.org/core/idresolver"
38 "tangled.org/core/orm"
39 "tangled.org/core/rbac"
40)
41
42type Ingester struct {
43 Ctx context.Context
44 Db *db.DB
45 Enforcer *rbac.Enforcer
46 Acl *knotacl.Service
47 IdResolver *idresolver.Resolver
48 Cache *cache.Cache
49 Config *config.Config
50 Logger *slog.Logger
51 Validator *validator.Validator
52 MentionsResolver *mentions.Resolver
53 Notifier notify.Notifier
54 Verifier repoverify.Verifier
55}
56
57type processFunc func(ctx context.Context, e *jmodels.Event) error
58
59func (i *Ingester) Ingest() processFunc {
60 return func(ctx context.Context, e *jmodels.Event) error {
61 var err error
62
63 l := i.Logger.With("kind", e.Kind)
64 switch e.Kind {
65 case jmodels.EventKindAccount:
66 // TODO: sync account state to db
67 if e.Account.Active {
68 break
69 }
70 // TODO: revoke sessions by DID
71 if *e.Account.Status == "deactivated" {
72 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
73 }
74 case jmodels.EventKindIdentity:
75 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
76 case jmodels.EventKindCommit:
77 l = l.With(
78 "nsid", e.Commit.Collection,
79 "did", e.Did,
80 "rkey", e.Commit.RKey,
81 "op", e.Commit.Operation,
82 )
83 switch e.Commit.Collection {
84 case tangled.GraphFollowNSID:
85 err = i.ingestFollow(e, l)
86 case tangled.GraphVouchNSID:
87 err = i.ingestVouch(ctx, e, l)
88 case tangled.FeedStarNSID:
89 err = i.ingestStar(ctx, e, l)
90 case tangled.FeedReactionNSID:
91 err = i.ingestReaction(e, l)
92 case tangled.PublicKeyNSID:
93 err = i.ingestPublicKey(e, l)
94 case tangled.RepoArtifactNSID:
95 err = i.ingestArtifact(ctx, e, l)
96 case tangled.ActorProfileNSID:
97 err = i.ingestProfile(ctx, e, l)
98 case tangled.SpindleMemberNSID:
99 err = i.ingestSpindleMember(ctx, e, l)
100 case tangled.SpindleNSID:
101 err = i.ingestSpindle(ctx, e, l)
102 case tangled.KnotMemberNSID:
103 err = i.ingestKnotMember(ctx, e, l)
104 case tangled.KnotNSID:
105 err = i.ingestKnot(ctx, e, l)
106 case tangled.StringNSID:
107 err = i.ingestString(e, l)
108 case tangled.RepoIssueNSID:
109 err = i.ingestIssue(ctx, e, l)
110 case tangled.RepoPullNSID:
111 err = i.ingestPull(ctx, e, l)
112 case tangled.FeedCommentNSID:
113 err = i.ingestComment(e, l)
114 case tangled.RepoIssueCommentNSID:
115 err = i.ingestIssueComment(e, l)
116 case tangled.RepoPullCommentNSID:
117 err = i.ingestPullComment(e, l)
118 case tangled.LabelDefinitionNSID:
119 err = i.ingestLabelDefinition(e, l)
120 case tangled.LabelOpNSID:
121 err = i.ingestLabelOp(ctx, e, l)
122 case tangled.RepoNSID:
123 err = i.ingestRepo(ctx, e, l)
124 }
125 }
126
127 if err != nil {
128 l.Warn("failed to ingest record, skipping", "err", err)
129 }
130
131 lastTimeUs := e.TimeUS + 1
132 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
133 l.Error("failed to save cursor", "err", saveErr)
134 }
135
136 return nil
137 }
138}
139
140func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
141 if strings.HasPrefix(ref, "did:") {
142 return db.GetRepoByDid(i.Db, ref)
143 }
144 return db.GetRepoByAtUri(i.Db, ref)
145}
146
147func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
148 var legacy struct {
149 Subject *string `json:"subject"`
150 SubjectDid *string `json:"subjectDid"`
151 }
152 if err := json.Unmarshal(raw, &legacy); err != nil {
153 return false, err
154 }
155
156 switch {
157 case legacy.SubjectDid != nil:
158 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
159 if err != nil {
160 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
161 return false, nil
162 }
163 star.SubjectType = models.StarSubjectRepo
164 star.Subject = repo.RepoDid
165 return true, nil
166
167 case legacy.Subject != nil:
168 uri, err := syntax.ParseATURI(*legacy.Subject)
169 if err != nil {
170 return false, fmt.Errorf("invalid old-format star subject: %w", err)
171 }
172 switch uri.Collection().String() {
173 case tangled.RepoNSID:
174 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
175 if err != nil {
176 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
177 return false, nil
178 }
179 star.SubjectType = models.StarSubjectRepo
180 star.Subject = repo.RepoDid
181 return true, nil
182 default:
183 star.SubjectType = models.StarSubjectString
184 star.Subject = *legacy.Subject
185 return true, nil
186 }
187
188 default:
189 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
190 }
191}
192
193func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
194 var err error
195 did := e.Did
196
197 l = l.With("handler", "ingestStar")
198
199 switch e.Commit.Operation {
200 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
201 raw := json.RawMessage(e.Commit.Record)
202 record := tangled.FeedStar{}
203 unmarshalErr := json.Unmarshal(raw, &record)
204
205 star := &models.Star{
206 Did: did,
207 Rkey: e.Commit.RKey,
208 }
209
210 switch {
211 case unmarshalErr != nil:
212 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
213 if resolveErr != nil {
214 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
215 return unmarshalErr
216 }
217 if !resolved {
218 return nil
219 }
220
221 case record.Subject == nil:
222 return fmt.Errorf("star record has nil subject")
223
224 case record.Subject.FeedStar_Repo != nil:
225 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
226 if repoErr != nil {
227 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
228 return nil
229 }
230 star.SubjectType = models.StarSubjectRepo
231 star.Subject = repo.RepoDid
232
233 case record.Subject.FeedStar_String != nil:
234 star.SubjectType = models.StarSubjectString
235 star.Subject = record.Subject.FeedStar_String.Uri
236
237 default:
238 return fmt.Errorf("star record has empty subject union")
239 }
240
241 err = db.AddStar(i.Db, star)
242 case jmodels.CommitOperationDelete:
243 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
244 }
245
246 if err != nil {
247 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
248 }
249
250 l.Info("ingested record")
251 return nil
252}
253
254func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error {
255 var err error
256 did := e.Did
257
258 l = l.With("handler", "ingestFollow")
259
260 switch e.Commit.Operation {
261 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
262 raw := json.RawMessage(e.Commit.Record)
263 record := tangled.GraphFollow{}
264 err = json.Unmarshal(raw, &record)
265 if err != nil {
266 l.Error("invalid record", "err", err)
267 return err
268 }
269
270 err = db.AddFollow(i.Db, &models.Follow{
271 UserDid: did,
272 SubjectDid: record.Subject,
273 Rkey: e.Commit.RKey,
274 })
275 case jmodels.CommitOperationDelete:
276 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
277 }
278
279 if err != nil {
280 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
281 }
282
283 l.Info("ingested record")
284 return nil
285}
286
287func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
288 var err error
289 did := e.Did
290
291 l = l.With("handler", "ingestVouch")
292
293 switch e.Commit.Operation {
294 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
295 raw := json.RawMessage(e.Commit.Record)
296 record := tangled.GraphVouch{}
297 err = json.Unmarshal(raw, &record)
298 if err != nil {
299 l.Error("invalid record", "err", err)
300 return err
301 }
302
303 // rkey is the subject_did being vouched for/denounced
304 subjectDID := e.Commit.RKey
305
306 _, err = syntax.ParseDID(subjectDID)
307 if err != nil {
308 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
309 return fmt.Errorf("invalid subject_did: %w", err)
310 }
311
312 if did == subjectDID {
313 l.Warn("attempted self-vouch", "did", did)
314 return fmt.Errorf("cannot vouch for self")
315 }
316
317 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
318 if err != nil {
319 return err
320 }
321
322 if subjectId.Handle.IsInvalidHandle() {
323 return err
324 }
325
326 kind, err := models.ParseVouchKind(record.Kind)
327 if err != nil {
328 l.Error("invalid kind", "kind", kind)
329 return fmt.Errorf("invalid kind: %s", kind)
330 }
331
332 recordCid, err := cid.Parse(e.Commit.CID)
333 if err != nil {
334 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
335 return fmt.Errorf("invalid cid: %w", err)
336 }
337
338 var evidences []syntax.ATURI
339 for _, raw := range record.Evidences {
340 uri, parseErr := syntax.ParseATURI(raw)
341 if parseErr != nil {
342 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
343 continue
344 }
345 evidences = append(evidences, uri)
346 }
347
348 tx, txErr := i.Db.Begin()
349 if txErr != nil {
350 return fmt.Errorf("failed to start transaction: %w", txErr)
351 }
352
353 addErr := db.AddVouch(tx, &models.Vouch{
354 Did: syntax.DID(did),
355 SubjectDid: subjectId.DID,
356 Cid: recordCid,
357 Kind: kind,
358 Reason: record.Reason,
359 Evidences: evidences,
360 })
361 if addErr != nil {
362 tx.Rollback()
363 err = addErr
364 } else {
365 err = tx.Commit()
366 }
367
368 case jmodels.CommitOperationDelete:
369 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
370 }
371
372 if err != nil {
373 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
374 }
375
376 l.Info("ingested record")
377 return nil
378}
379
380func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error {
381 did := e.Did
382 var err error
383
384 l = l.With("handler", "ingestPublicKey")
385
386 switch e.Commit.Operation {
387 case jmodels.CommitOperationCreate:
388 l.Debug("processing add of pubkey")
389 raw := json.RawMessage(e.Commit.Record)
390 record := tangled.PublicKey{}
391 err = json.Unmarshal(raw, &record)
392 if err != nil {
393 l.Error("invalid record", "err", err)
394 return err
395 }
396
397 name := record.Name
398 key := record.Key
399 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
400 case jmodels.CommitOperationUpdate:
401 l.Debug("processing update of pubkey")
402 raw := json.RawMessage(e.Commit.Record)
403 record := tangled.PublicKey{}
404 err = json.Unmarshal(raw, &record)
405 if err != nil {
406 l.Error("invalid record", "err", err)
407 return err
408 }
409
410 name := record.Name
411 key := record.Key
412 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
413 case jmodels.CommitOperationDelete:
414 l.Debug("processing delete of pubkey")
415 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
416 }
417
418 if err != nil {
419 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
420 }
421
422 l.Info("ingested record")
423 return nil
424}
425
426func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
427 did := e.Did
428 var err error
429
430 l = l.With("handler", "ingestArtifact")
431
432 switch e.Commit.Operation {
433 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
434 raw := json.RawMessage(e.Commit.Record)
435 record := tangled.RepoArtifact{}
436 err = json.Unmarshal(raw, &record)
437 if err != nil {
438 l.Error("invalid record", "err", err)
439 return err
440 }
441
442 var repo *models.Repo
443 if record.RepoDid != nil && *record.RepoDid != "" {
444 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
445 if err != nil && !errors.Is(err, sql.ErrNoRows) {
446 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
447 }
448 }
449 if repo == nil && record.Repo != nil {
450 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
451 if parseErr != nil {
452 return parseErr
453 }
454 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
455 if err != nil {
456 return err
457 }
458 }
459 if repo == nil {
460 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
461 }
462
463 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push")
464 if permErr != nil {
465 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr)
466 } else if !allowed {
467 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier())
468 return nil
469 }
470
471 repoDid := repo.RepoDid
472 if repoDid == "" && record.RepoDid != nil {
473 repoDid = *record.RepoDid
474 }
475 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
476 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
477 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
478 }
479 }
480
481 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
482 if parseErr != nil {
483 createdAt = time.Now()
484 }
485
486 artifact := models.Artifact{
487 Did: did,
488 Rkey: e.Commit.RKey,
489 RepoDid: syntax.DID(repo.RepoDid),
490 Tag: plumbing.Hash(record.Tag),
491 CreatedAt: createdAt,
492 BlobCid: cid.Cid(record.Artifact.Ref),
493 Name: record.Name,
494 Size: uint64(record.Artifact.Size),
495 MimeType: record.Artifact.MimeType,
496 }
497
498 err = db.AddArtifact(i.Db, artifact)
499 case jmodels.CommitOperationDelete:
500 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
501 }
502
503 if err != nil {
504 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
505 }
506
507 l.Info("ingested record")
508 return nil
509}
510
511func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
512 did := e.Did
513 var err error
514
515 l = l.With("handler", "ingestProfile")
516
517 if e.Commit.RKey != "self" {
518 return fmt.Errorf("ingestProfile only ingests `self` record")
519 }
520
521 switch e.Commit.Operation {
522 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
523 raw := json.RawMessage(e.Commit.Record)
524 record := tangled.ActorProfile{}
525 err = json.Unmarshal(raw, &record)
526 if err != nil {
527 l.Error("invalid record", "err", err)
528 return err
529 }
530
531 avatar := ""
532 if record.Avatar != nil {
533 avatar = record.Avatar.Ref.String()
534 }
535
536 description := ""
537 if record.Description != nil {
538 description = *record.Description
539 }
540
541 includeBluesky := record.Bluesky
542
543 pronouns := ""
544 if record.Pronouns != nil {
545 pronouns = *record.Pronouns
546 }
547
548 location := ""
549 if record.Location != nil {
550 location = *record.Location
551 }
552
553 var links [5]string
554 for i, l := range record.Links {
555 if i < 5 {
556 links[i] = l
557 }
558 }
559
560 var stats [2]models.VanityStat
561 for i, s := range record.Stats {
562 if i < 2 {
563 stats[i].Kind = models.ParseVanityStatKind(s)
564 }
565 }
566
567 var pinned [6]string
568 for i, r := range record.PinnedRepositories {
569 if i < 6 {
570 pinned[i] = r
571 }
572 }
573
574 var preferredHandle syntax.Handle
575 if record.PreferredHandle != nil {
576 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
577 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
578 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
579 preferredHandle = h
580 }
581 }
582 }
583
584 profile := models.Profile{
585 Did: did,
586 Avatar: avatar,
587 Description: description,
588 IncludeBluesky: includeBluesky,
589 Location: location,
590 Links: links,
591 Stats: stats,
592 PinnedRepos: pinned,
593 Pronouns: pronouns,
594 PreferredHandle: preferredHandle,
595 }
596
597 tx, err := i.Db.Begin()
598 if err != nil {
599 return fmt.Errorf("failed to start transaction: %w", err)
600 }
601
602 err = db.ValidateProfile(tx, &profile)
603 if err != nil {
604 return fmt.Errorf("invalid profile record")
605 }
606
607 err = db.UpsertProfile(tx, &profile)
608 if err == nil && i.Cache != nil {
609 pipe := i.Cache.Pipeline()
610 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
611 if preferredHandle != "" {
612 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
613 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
614 } else {
615 pipe.Del(ctx, didKey)
616 }
617 if _, execErr := pipe.Exec(ctx); execErr != nil {
618 l.Warn("failed to update preferred handle cache", "err", execErr)
619 }
620 }
621 case jmodels.CommitOperationDelete:
622 tx, beginErr := i.Db.Begin()
623 if beginErr != nil {
624 return fmt.Errorf("failed to start transaction: %w", beginErr)
625 }
626
627 priorHandle, phErr := db.GetPreferredHandle(tx, did)
628 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
629 l.Warn("failed to read prior preferred handle", "err", phErr)
630 }
631
632 err = db.DeleteProfile(tx, did)
633 if err == nil && i.Cache != nil {
634 pipe := i.Cache.Pipeline()
635 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
636 if priorHandle != "" {
637 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
638 }
639 if _, execErr := pipe.Exec(ctx); execErr != nil {
640 l.Warn("failed to evict preferred handle cache", "err", execErr)
641 }
642 }
643 }
644
645 if err != nil {
646 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
647 }
648
649 l.Info("ingested record")
650 return nil
651}
652
653func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
654 did := e.Did
655 var err error
656
657 l = l.With("handler", "ingestSpindleMember")
658
659 switch e.Commit.Operation {
660 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
661 raw := json.RawMessage(e.Commit.Record)
662 record := tangled.SpindleMember{}
663 err = json.Unmarshal(raw, &record)
664 if err != nil {
665 l.Error("invalid record", "err", err)
666 return err
667 }
668
669 // only spindle owner can invite to spindles
670 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
671 if err != nil {
672 return fmt.Errorf("failed to check invite permission: %w", err)
673 }
674 if !ok {
675 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
676 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
677 }
678 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
679 if err != nil {
680 return fmt.Errorf("failed to re-check invite permission: %w", err)
681 }
682 if !ok {
683 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
684 }
685 }
686
687 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
688 if err != nil {
689 return err
690 }
691
692 if memberId.Handle.IsInvalidHandle() {
693 return fmt.Errorf("invalid handle for member %s", record.Subject)
694 }
695
696 existing, err := db.GetSpindleMembers(i.Db,
697 orm.FilterEq("did", did),
698 orm.FilterEq("rkey", e.Commit.RKey),
699 )
700 if err != nil {
701 return fmt.Errorf("failed to look up existing member: %w", err)
702 }
703 if len(existing) > 1 {
704 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
705 }
706
707 tx, err := i.Db.Begin()
708 if err != nil {
709 return fmt.Errorf("failed to start txn: %w", err)
710 }
711 committed := false
712 defer func() {
713 if committed {
714 return
715 }
716 tx.Rollback()
717 i.Enforcer.E.LoadPolicy()
718 }()
719
720 if len(existing) == 1 {
721 prev := existing[0]
722 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
723 if err = db.RemoveSpindleMember(tx,
724 orm.FilterEq("did", did),
725 orm.FilterEq("rkey", e.Commit.RKey),
726 ); err != nil {
727 return fmt.Errorf("failed to remove stale row: %w", err)
728 }
729 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
730 return fmt.Errorf("failed to remove stale ACL: %w", err)
731 }
732 }
733 }
734
735 if err = db.AddSpindleMember(tx, models.SpindleMember{
736 Did: syntax.DID(did),
737 Rkey: e.Commit.RKey,
738 Instance: record.Instance,
739 Subject: memberId.DID,
740 }); err != nil {
741 return fmt.Errorf("failed to add to db: %w", err)
742 }
743
744 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
745 return fmt.Errorf("failed to update ACLs: %w", err)
746 }
747
748 if err = tx.Commit(); err != nil {
749 return fmt.Errorf("failed to commit txn: %w", err)
750 }
751
752 if err = i.Enforcer.E.SavePolicy(); err != nil {
753 return fmt.Errorf("failed to save ACLs: %w", err)
754 }
755 committed = true
756
757 l.Info("upserted spindle member")
758 case jmodels.CommitOperationDelete:
759 rkey := e.Commit.RKey
760
761 // get record from db first
762 members, err := db.GetSpindleMembers(
763 i.Db,
764 orm.FilterEq("did", did),
765 orm.FilterEq("rkey", rkey),
766 )
767 if err != nil || len(members) != 1 {
768 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
769 }
770 member := members[0]
771
772 tx, err := i.Db.Begin()
773 if err != nil {
774 return fmt.Errorf("failed to start txn: %w", err)
775 }
776 committed := false
777 defer func() {
778 if committed {
779 return
780 }
781 tx.Rollback()
782 i.Enforcer.E.LoadPolicy()
783 }()
784
785 // remove record by rkey && update enforcer
786 if err = db.RemoveSpindleMember(
787 tx,
788 orm.FilterEq("did", did),
789 orm.FilterEq("rkey", rkey),
790 ); err != nil {
791 return fmt.Errorf("failed to remove from db: %w", err)
792 }
793
794 // update enforcer
795 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
796 if err != nil {
797 return fmt.Errorf("failed to update ACLs: %w", err)
798 }
799
800 if err = tx.Commit(); err != nil {
801 return fmt.Errorf("failed to commit txn: %w", err)
802 }
803
804 if err = i.Enforcer.E.SavePolicy(); err != nil {
805 return fmt.Errorf("failed to save ACLs: %w", err)
806 }
807 committed = true
808
809 l.Info("removed spindle member")
810 }
811
812 return nil
813}
814
815func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
816 did := e.Did
817 var err error
818
819 l = l.With("handler", "ingestSpindle")
820
821 switch e.Commit.Operation {
822 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
823 raw := json.RawMessage(e.Commit.Record)
824 record := tangled.Spindle{}
825 err = json.Unmarshal(raw, &record)
826 if err != nil {
827 l.Error("invalid record", "err", err)
828 return err
829 }
830
831 instance := e.Commit.RKey
832
833 err := db.AddSpindle(i.Db, models.Spindle{
834 Owner: syntax.DID(did),
835 Instance: instance,
836 })
837 if err != nil {
838 l.Error("failed to add spindle to db", "err", err, "instance", instance)
839 return err
840 }
841
842 if err := i.verifySpindle(ctx, instance, did); err != nil {
843 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
844 }
845
846 l.Info("ingested record", "instance", instance)
847 return nil
848
849 case jmodels.CommitOperationDelete:
850 instance := e.Commit.RKey
851
852 // get record from db first
853 spindles, err := db.GetSpindles(
854 ctx,
855 i.Db,
856 orm.FilterEq("owner", did),
857 orm.FilterEq("instance", instance),
858 )
859 if err != nil || len(spindles) != 1 {
860 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
861 }
862 spindle := spindles[0]
863
864 tx, err := i.Db.Begin()
865 if err != nil {
866 return fmt.Errorf("failed to start txn: %w", err)
867 }
868 defer func() {
869 tx.Rollback()
870 i.Enforcer.E.LoadPolicy()
871 }()
872
873 // remove spindle members first
874 err = db.RemoveSpindleMember(
875 tx,
876 orm.FilterEq("owner", did),
877 orm.FilterEq("instance", instance),
878 )
879 if err != nil {
880 return fmt.Errorf("failed to remove spindle members: %w", err)
881 }
882
883 err = db.DeleteSpindle(
884 tx,
885 orm.FilterEq("owner", did),
886 orm.FilterEq("instance", instance),
887 )
888 if err != nil {
889 return fmt.Errorf("failed to delete spindle: %w", err)
890 }
891
892 if spindle.Verified != nil {
893 err = i.Enforcer.RemoveSpindle(instance)
894 if err != nil {
895 return fmt.Errorf("failed to remove spindle from enforcer: %w", err)
896 }
897 }
898
899 err = tx.Commit()
900 if err != nil {
901 return fmt.Errorf("failed to commit txn: %w", err)
902 }
903
904 err = i.Enforcer.E.SavePolicy()
905 if err != nil {
906 return fmt.Errorf("failed to save ACLs: %w", err)
907 }
908
909 l.Info("ingested record", "instance", instance)
910 }
911
912 return nil
913}
914
915func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error {
916 did := e.Did
917 rkey := e.Commit.RKey
918
919 var err error
920
921 l = l.With("handler", "ingestString")
922
923 switch e.Commit.Operation {
924 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
925 raw := json.RawMessage(e.Commit.Record)
926 record := tangled.String{}
927 err = json.Unmarshal(raw, &record)
928 if err != nil {
929 l.Error("invalid record", "err", err)
930 return err
931 }
932
933 string := models.StringFromRecord(did, rkey, record)
934
935 if err = i.Validator.ValidateString(&string); err != nil {
936 l.Error("invalid record", "err", err)
937 return err
938 }
939
940 if err = db.AddString(i.Db, string); err != nil {
941 l.Error("failed to add string", "err", err)
942 return err
943 }
944
945 l.Info("ingested record")
946 return nil
947
948 case jmodels.CommitOperationDelete:
949 if err := db.DeleteString(
950 i.Db,
951 orm.FilterEq("did", did),
952 orm.FilterEq("rkey", rkey),
953 ); err != nil {
954 l.Error("failed to delete", "err", err)
955 return fmt.Errorf("failed to delete string record: %w", err)
956 }
957
958 l.Info("ingested record")
959 return nil
960 }
961
962 return nil
963}
964
965func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
966 did := e.Did
967 var err error
968
969 l = l.With("handler", "ingestKnotMember")
970
971 switch e.Commit.Operation {
972 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
973 raw := json.RawMessage(e.Commit.Record)
974 record := tangled.KnotMember{}
975 err = json.Unmarshal(raw, &record)
976 if err != nil {
977 l.Error("invalid record", "err", err)
978 return err
979 }
980
981 // only knot owner can invite to knots
982 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
983 if err != nil {
984 return fmt.Errorf("failed to check invite permission: %w", err)
985 }
986 if !ok {
987 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
988 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
989 }
990 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
991 if err != nil {
992 return fmt.Errorf("failed to re-check invite permission: %w", err)
993 }
994 if !ok {
995 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
996 }
997 }
998
999 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
1000 if err != nil {
1001 return err
1002 }
1003
1004 if memberId.Handle.IsInvalidHandle() {
1005 return fmt.Errorf("invalid handle for member %s", record.Subject)
1006 }
1007
1008 existing, err := db.GetKnotMembers(i.Db,
1009 orm.FilterEq("did", did),
1010 orm.FilterEq("rkey", e.Commit.RKey),
1011 )
1012 if err != nil {
1013 return fmt.Errorf("failed to look up existing member: %w", err)
1014 }
1015 if len(existing) > 1 {
1016 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1017 }
1018
1019 tx, err := i.Db.Begin()
1020 if err != nil {
1021 return fmt.Errorf("failed to start txn: %w", err)
1022 }
1023 committed := false
1024 defer func() {
1025 if committed {
1026 return
1027 }
1028 tx.Rollback()
1029 i.Enforcer.E.LoadPolicy()
1030 }()
1031
1032 if len(existing) == 1 {
1033 prev := existing[0]
1034 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1035 if err = db.RemoveKnotMember(tx,
1036 orm.FilterEq("did", did),
1037 orm.FilterEq("rkey", e.Commit.RKey),
1038 ); err != nil {
1039 return fmt.Errorf("failed to remove stale row: %w", err)
1040 }
1041 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1042 return fmt.Errorf("failed to remove stale ACL: %w", err)
1043 }
1044 }
1045 }
1046
1047 if err = db.AddKnotMember(tx, models.KnotMember{
1048 Did: syntax.DID(did),
1049 Rkey: e.Commit.RKey,
1050 Domain: record.Domain,
1051 Subject: memberId.DID,
1052 }); err != nil {
1053 return fmt.Errorf("failed to add to db: %w", err)
1054 }
1055
1056 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1057 return fmt.Errorf("failed to update ACLs: %w", err)
1058 }
1059
1060 if err = tx.Commit(); err != nil {
1061 return fmt.Errorf("failed to commit txn: %w", err)
1062 }
1063
1064 if err = i.Enforcer.E.SavePolicy(); err != nil {
1065 return fmt.Errorf("failed to save ACLs: %w", err)
1066 }
1067 committed = true
1068
1069 l.Info("upserted knot member")
1070 case jmodels.CommitOperationDelete:
1071 rkey := e.Commit.RKey
1072
1073 members, err := db.GetKnotMembers(
1074 i.Db,
1075 orm.FilterEq("did", did),
1076 orm.FilterEq("rkey", rkey),
1077 )
1078 if err != nil {
1079 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1080 }
1081 if len(members) == 0 {
1082 l.Info("knot member already removed", "rkey", rkey)
1083 return nil
1084 }
1085 if len(members) > 1 {
1086 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1087 }
1088 member := members[0]
1089
1090 tx, err := i.Db.Begin()
1091 if err != nil {
1092 return fmt.Errorf("failed to start txn: %w", err)
1093 }
1094 committed := false
1095 defer func() {
1096 if committed {
1097 return
1098 }
1099 tx.Rollback()
1100 i.Enforcer.E.LoadPolicy()
1101 }()
1102
1103 if err = db.RemoveKnotMember(
1104 tx,
1105 orm.FilterEq("did", did),
1106 orm.FilterEq("rkey", rkey),
1107 ); err != nil {
1108 return fmt.Errorf("failed to remove from db: %w", err)
1109 }
1110
1111 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1112 return fmt.Errorf("failed to update ACLs: %w", err)
1113 }
1114
1115 if err = tx.Commit(); err != nil {
1116 return fmt.Errorf("failed to commit txn: %w", err)
1117 }
1118
1119 if err = i.Enforcer.E.SavePolicy(); err != nil {
1120 return fmt.Errorf("failed to save ACLs: %w", err)
1121 }
1122 committed = true
1123
1124 l.Info("removed knot member")
1125 }
1126
1127 return nil
1128}
1129
1130func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1131 did := e.Did
1132 var err error
1133
1134 l = l.With("handler", "ingestKnot")
1135
1136 switch e.Commit.Operation {
1137 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1138 raw := json.RawMessage(e.Commit.Record)
1139 record := tangled.Knot{}
1140 err = json.Unmarshal(raw, &record)
1141 if err != nil {
1142 l.Error("invalid record", "err", err)
1143 return err
1144 }
1145
1146 domain := e.Commit.RKey
1147
1148 err := db.AddKnot(i.Db, domain, did)
1149 if err != nil {
1150 l.Error("failed to add knot to db", "err", err, "domain", domain)
1151 return err
1152 }
1153
1154 if err := i.verifyKnot(ctx, domain, did); err != nil {
1155 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1156 }
1157
1158 l.Info("ingested record", "domain", domain)
1159 return nil
1160
1161 case jmodels.CommitOperationDelete:
1162 domain := e.Commit.RKey
1163
1164 // get record from db first
1165 registrations, err := db.GetRegistrations(
1166 i.Db,
1167 orm.FilterEq("domain", domain),
1168 orm.FilterEq("did", did),
1169 )
1170 if err != nil {
1171 return fmt.Errorf("failed to get registration: %w", err)
1172 }
1173 if len(registrations) != 1 {
1174 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1175 }
1176 registration := registrations[0]
1177
1178 tx, err := i.Db.Begin()
1179 if err != nil {
1180 return fmt.Errorf("failed to start txn: %w", err)
1181 }
1182 defer func() {
1183 tx.Rollback()
1184 i.Enforcer.E.LoadPolicy()
1185 }()
1186
1187 err = db.RemoveKnotMember(
1188 tx,
1189 orm.FilterEq("did", did),
1190 orm.FilterEq("domain", domain),
1191 )
1192 if err != nil {
1193 return fmt.Errorf("failed to remove knot members: %w", err)
1194 }
1195
1196 err = db.DeleteKnot(
1197 tx,
1198 orm.FilterEq("did", did),
1199 orm.FilterEq("domain", domain),
1200 )
1201 if err != nil {
1202 return fmt.Errorf("failed to delete knot: %w", err)
1203 }
1204
1205 err = db.RemoveReposByKnot(tx, domain)
1206 if err != nil {
1207 return fmt.Errorf("failed to remove repos by knot: %w", err)
1208 }
1209
1210 if registration.Registered != nil {
1211 err = i.Enforcer.RemoveKnot(domain)
1212 if err != nil {
1213 return fmt.Errorf("failed to remove knot from enforcer: %w", err)
1214 }
1215 }
1216
1217 err = tx.Commit()
1218 if err != nil {
1219 return fmt.Errorf("failed to commit txn: %w", err)
1220 }
1221
1222 err = i.Enforcer.E.SavePolicy()
1223 if err != nil {
1224 return fmt.Errorf("failed to save ACLs: %w", err)
1225 }
1226
1227 l.Info("ingested record", "domain", domain)
1228 }
1229
1230 return nil
1231}
1232
1233const (
1234 verifyAttempts = 4
1235 verifyMinDelay = 1 * time.Second
1236 verifyMaxDelay = 5 * time.Second
1237)
1238
1239func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1240 regs, err := db.GetRegistrations(i.Db,
1241 orm.FilterEq("domain", domain),
1242 orm.FilterEq("did", did),
1243 )
1244 if err != nil {
1245 return fmt.Errorf("look up registration: %w", err)
1246 }
1247 if len(regs) != 1 {
1248 return fmt.Errorf("no registration for %s by %s", domain, did)
1249 }
1250 if regs[0].Registered != nil {
1251 return nil
1252 }
1253
1254 err = retry.Do(
1255 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1256 retry.Context(ctx),
1257 retry.Attempts(verifyAttempts),
1258 retry.Delay(verifyMinDelay),
1259 retry.MaxDelay(verifyMaxDelay),
1260 retry.DelayType(retry.BackOffDelay),
1261 retry.LastErrorOnly(true),
1262 )
1263 if err != nil {
1264 return fmt.Errorf("verify: %w", err)
1265 }
1266 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1267}
1268
1269func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1270 spindles, err := db.GetSpindles(ctx, i.Db,
1271 orm.FilterEq("instance", instance),
1272 orm.FilterEq("owner", did),
1273 )
1274 if err != nil {
1275 return fmt.Errorf("look up spindle: %w", err)
1276 }
1277 if len(spindles) != 1 {
1278 return fmt.Errorf("no spindle for %s by %s", instance, did)
1279 }
1280 if spindles[0].Verified != nil {
1281 return nil
1282 }
1283
1284 err = retry.Do(
1285 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1286 retry.Context(ctx),
1287 retry.Attempts(verifyAttempts),
1288 retry.Delay(verifyMinDelay),
1289 retry.MaxDelay(verifyMaxDelay),
1290 retry.DelayType(retry.BackOffDelay),
1291 retry.LastErrorOnly(true),
1292 )
1293 if err != nil {
1294 return fmt.Errorf("verify: %w", err)
1295 }
1296 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1297 return err
1298}
1299
1300const sweepConcurrency = 4
1301
1302func (i *Ingester) SweepPendingVerifications() {
1303 l := i.Logger.With("handler", "SweepPendingVerifications")
1304
1305 var g errgroup.Group
1306 g.SetLimit(sweepConcurrency)
1307
1308 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1309 if err != nil {
1310 l.Error("failed to list unverified knots", "err", err)
1311 } else {
1312 for _, reg := range regs {
1313 g.Go(func() error {
1314 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1315 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1316 }
1317 return nil
1318 })
1319 }
1320 }
1321
1322 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1323 if err != nil {
1324 l.Error("failed to list unverified spindles", "err", err)
1325 g.Wait()
1326 return
1327 }
1328 for _, s := range spindles {
1329 g.Go(func() error {
1330 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1331 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1332 }
1333 return nil
1334 })
1335 }
1336 g.Wait()
1337}
1338
1339func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1340 did := e.Did
1341 rkey := e.Commit.RKey
1342
1343 var err error
1344
1345 l = l.With("handler", "ingestIssue")
1346
1347 switch e.Commit.Operation {
1348 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1349 raw := json.RawMessage(e.Commit.Record)
1350 record := tangled.RepoIssue{}
1351 err = json.Unmarshal(raw, &record)
1352 if err != nil {
1353 l.Error("invalid record", "err", err)
1354 return err
1355 }
1356
1357 issue := models.IssueFromRecord(did, rkey, record)
1358
1359 if issue.RepoDid == "" {
1360 return fmt.Errorf("issue record has no repo field")
1361 }
1362 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1363 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1364 }
1365
1366 if err := i.Validator.ValidateIssue(&issue); err != nil {
1367 return fmt.Errorf("failed to validate issue: %w", err)
1368 }
1369
1370 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1371 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1372 if repoErr == nil && repo.RepoDid != "" {
1373 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1374 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1375 }
1376 }
1377 }
1378
1379 tx, err := i.Db.BeginTx(ctx, nil)
1380 if err != nil {
1381 l.Error("failed to begin transaction", "err", err)
1382 return err
1383 }
1384 defer tx.Rollback()
1385
1386 err = db.PutIssue(tx, &issue)
1387 if err != nil {
1388 l.Error("failed to create issue", "err", err)
1389 return err
1390 }
1391
1392 err = tx.Commit()
1393 if err != nil {
1394 l.Error("failed to commit txn", "err", err)
1395 return err
1396 }
1397
1398 l.Info("ingested record")
1399 return nil
1400
1401 case jmodels.CommitOperationDelete:
1402 tx, err := i.Db.BeginTx(ctx, nil)
1403 if err != nil {
1404 l.Error("failed to begin transaction", "err", err)
1405 return err
1406 }
1407 defer tx.Rollback()
1408
1409 if err := db.DeleteIssues(
1410 tx,
1411 did,
1412 rkey,
1413 ); err != nil {
1414 l.Error("failed to delete", "err", err)
1415 return fmt.Errorf("failed to delete issue record: %w", err)
1416 }
1417 if err := tx.Commit(); err != nil {
1418 l.Error("failed to commit txn", "err", err)
1419 return err
1420 }
1421
1422 l.Info("ingested record")
1423 return nil
1424 }
1425
1426 return nil
1427}
1428
1429func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1430 did := e.Did
1431 rkey := e.Commit.RKey
1432
1433 var err error
1434
1435 l = l.With("handler", "ingestPull")
1436
1437 switch e.Commit.Operation {
1438 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1439 raw := json.RawMessage(e.Commit.Record)
1440 record := tangled.RepoPull{}
1441 err = json.Unmarshal(raw, &record)
1442 if err != nil {
1443 l.Error("invalid record", "err", err)
1444 return err
1445 }
1446
1447 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1448 if err != nil {
1449 l.Error("failed to resolve did", "err", err)
1450 return err
1451 }
1452
1453 // go through and fetch all blobs in parallel
1454 readers := make([]*io.ReadCloser, len(record.Rounds))
1455 var mu sync.Mutex
1456
1457 g, gctx := errgroup.WithContext(ctx)
1458
1459 for idx, b := range record.Rounds {
1460 g.Go(func() error {
1461 // for some reason, a blob is empty
1462 if b.PatchBlob == nil {
1463 return fmt.Errorf("missing patchBlob in round %d", idx)
1464 }
1465
1466 ownerPds := ownerId.PDSEndpoint()
1467 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1468 q := url.Query()
1469 q.Set("cid", b.PatchBlob.Ref.String())
1470 q.Set("did", did)
1471 url.RawQuery = q.Encode()
1472
1473 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1474 if err != nil {
1475 l.Error("failed to create request")
1476 return err
1477 }
1478 req.Header.Set("Content-Type", "application/json")
1479
1480 resp, err := http.DefaultClient.Do(req)
1481 if err != nil {
1482 l.Error("failed to make request")
1483 return err
1484 }
1485
1486 mu.Lock()
1487 readers[idx] = &resp.Body
1488 mu.Unlock()
1489
1490 return nil
1491 })
1492 }
1493
1494 if err := g.Wait(); err != nil {
1495 for _, r := range readers {
1496 if r != nil && *r != nil {
1497 (*r).Close()
1498 }
1499 }
1500 return err
1501 }
1502
1503 defer func() {
1504 for _, r := range readers {
1505 if r != nil && *r != nil {
1506 (*r).Close()
1507 }
1508 }
1509 }()
1510
1511 pull, err := models.PullFromRecord(did, rkey, record, readers)
1512 if err != nil {
1513 return fmt.Errorf("failed to parse pull from record: %w", err)
1514 }
1515 if err := i.Validator.ValidatePull(pull); err != nil {
1516 return fmt.Errorf("failed to validate pull: %w", err)
1517 }
1518
1519 tx, err := i.Db.BeginTx(ctx, nil)
1520 if err != nil {
1521 l.Error("failed to begin transaction", "err", err)
1522 return err
1523 }
1524 defer tx.Rollback()
1525
1526 err = db.PutPull(tx, pull)
1527 if err != nil {
1528 l.Error("failed to create pull", "err", err)
1529 return err
1530 }
1531
1532 err = tx.Commit()
1533 if err != nil {
1534 l.Error("failed to commit txn", "err", err)
1535 return err
1536 }
1537
1538 l.Info("ingested record")
1539 return nil
1540
1541 case jmodels.CommitOperationDelete:
1542 tx, err := i.Db.BeginTx(ctx, nil)
1543 if err != nil {
1544 l.Error("failed to begin transaction", "err", err)
1545 return err
1546 }
1547 defer tx.Rollback()
1548
1549 if err := db.AbandonPulls(
1550 tx,
1551 orm.FilterEq("owner_did", did),
1552 orm.FilterEq("rkey", rkey),
1553 ); err != nil {
1554 l.Error("failed to abandon", "err", err)
1555 return fmt.Errorf("failed to abandon pull record: %w", err)
1556 }
1557 if err := tx.Commit(); err != nil {
1558 l.Error("failed to commit txn", "err", err)
1559 return err
1560 }
1561
1562 l.Info("ingested record")
1563 return nil
1564 }
1565
1566 return nil
1567}
1568
1569// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1570func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error {
1571 l = l.With("handler", "ingestIssueComment")
1572
1573 switch e.Commit.Operation {
1574 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1575 // no-op. sh.tangled.repo.issue.comment is deprecated
1576
1577 case jmodels.CommitOperationDelete:
1578 if err := db.PurgeComments(
1579 i.Db,
1580 orm.FilterEq("did", e.Did),
1581 orm.FilterEq("collection", e.Commit.Collection),
1582 orm.FilterEq("rkey", e.Commit.RKey),
1583 ); err != nil {
1584 return fmt.Errorf("failed to delete comment record: %w", err)
1585 }
1586 }
1587
1588 l.Info("ingested record")
1589 return nil
1590}
1591
1592// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1593func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error {
1594 l = l.With("handler", "ingestPullComment")
1595
1596 switch e.Commit.Operation {
1597 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1598 // no-op. sh.tangled.repo.pull.comment is deprecated
1599
1600 case jmodels.CommitOperationDelete:
1601 if err := db.PurgeComments(
1602 i.Db,
1603 orm.FilterEq("did", e.Did),
1604 orm.FilterEq("collection", e.Commit.Collection),
1605 orm.FilterEq("rkey", e.Commit.RKey),
1606 ); err != nil {
1607 return fmt.Errorf("failed to delete comment record: %w", err)
1608 }
1609 }
1610
1611 l.Info("ingested record")
1612 return nil
1613}
1614
1615func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error {
1616 did := e.Did
1617 rkey := e.Commit.RKey
1618 cid := e.Commit.CID
1619
1620 var err error
1621
1622 l = l.With("handler", "ingestComment")
1623
1624 ctx := context.Background()
1625
1626 switch e.Commit.Operation {
1627 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1628 raw := json.RawMessage(e.Commit.Record)
1629 record := tangled.FeedComment{}
1630 err = json.Unmarshal(raw, &record)
1631 if err != nil {
1632 return fmt.Errorf("invalid record: %w", err)
1633 }
1634
1635 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1636 if err != nil {
1637 return fmt.Errorf("failed to parse comment from record: %w", err)
1638 }
1639
1640 if err := comment.Validate(); err != nil {
1641 return fmt.Errorf("failed to validate comment: %w", err)
1642 }
1643
1644 var references []syntax.ATURI
1645 if comment.Body.Original != nil {
1646 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1647 }
1648
1649 tx, err := i.Db.Begin()
1650 if err != nil {
1651 return fmt.Errorf("failed to start transaction: %w", err)
1652 }
1653 defer tx.Rollback()
1654
1655 _, err = db.PutComment(tx, comment, references)
1656 if err != nil {
1657 return fmt.Errorf("failed to create comment: %w", err)
1658 }
1659
1660 if err := tx.Commit(); err != nil {
1661 return err
1662 }
1663
1664 case jmodels.CommitOperationDelete:
1665 if err := db.DeleteComments(
1666 i.Db,
1667 orm.FilterEq("did", did),
1668 orm.FilterEq("collection", e.Commit.Collection),
1669 orm.FilterEq("rkey", rkey),
1670 ); err != nil {
1671 return fmt.Errorf("failed to delete comment record: %w", err)
1672 }
1673 }
1674
1675 l.Info("ingested record")
1676 return nil
1677}
1678
1679func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error {
1680 did := e.Did
1681 rkey := e.Commit.RKey
1682
1683 l = l.With("handler", "ingestReaction")
1684
1685 switch e.Commit.Operation {
1686 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1687 raw := json.RawMessage(e.Commit.Record)
1688 record := tangled.FeedReaction{}
1689 if err := json.Unmarshal(raw, &record); err != nil {
1690 return fmt.Errorf("invalid record: %w", err)
1691 }
1692
1693 subjectUri, err := syntax.ParseATURI(record.Subject)
1694 if err != nil {
1695 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1696 }
1697 subjectUri = models.NormalizeReactionSubject(subjectUri)
1698
1699 kind, ok := models.ParseReactionKind(record.Reaction)
1700 if !ok {
1701 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1702 }
1703
1704 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1705 if parseErr != nil {
1706 created = time.Now()
1707 }
1708
1709 tx, err := i.Db.Begin()
1710 if err != nil {
1711 return fmt.Errorf("failed to start transaction: %w", err)
1712 }
1713 defer tx.Rollback()
1714
1715 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil {
1716 return fmt.Errorf("failed to clear existing reaction: %w", err)
1717 }
1718 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil {
1719 return fmt.Errorf("failed to add reaction: %w", err)
1720 }
1721
1722 if err := tx.Commit(); err != nil {
1723 return err
1724 }
1725
1726 case jmodels.CommitOperationDelete:
1727 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1728 return fmt.Errorf("failed to delete reaction record: %w", err)
1729 }
1730 }
1731
1732 l.Info("ingested record")
1733 return nil
1734}
1735
1736func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error {
1737 did := e.Did
1738 rkey := e.Commit.RKey
1739
1740 var err error
1741
1742 l = l.With("handler", "ingestLabelDefinition")
1743
1744 switch e.Commit.Operation {
1745 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1746 raw := json.RawMessage(e.Commit.Record)
1747 record := tangled.LabelDefinition{}
1748 err = json.Unmarshal(raw, &record)
1749 if err != nil {
1750 return fmt.Errorf("invalid record: %w", err)
1751 }
1752
1753 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1754 if err != nil {
1755 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1756 }
1757
1758 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1759 return fmt.Errorf("failed to validate labeldef: %w", err)
1760 }
1761
1762 _, err = db.AddLabelDefinition(i.Db, def)
1763 if err != nil {
1764 return fmt.Errorf("failed to create labeldef: %w", err)
1765 }
1766
1767 l.Info("ingested record")
1768 return nil
1769
1770 case jmodels.CommitOperationDelete:
1771 if err := db.DeleteLabelDefinition(
1772 i.Db,
1773 orm.FilterEq("did", did),
1774 orm.FilterEq("rkey", rkey),
1775 ); err != nil {
1776 return fmt.Errorf("failed to delete labeldef record: %w", err)
1777 }
1778
1779 l.Info("ingested record")
1780 return nil
1781 }
1782
1783 return nil
1784}
1785
1786func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1787 did := e.Did
1788 rkey := e.Commit.RKey
1789
1790 var err error
1791
1792 l = l.With("handler", "ingestLabelOp")
1793
1794 switch e.Commit.Operation {
1795 case jmodels.CommitOperationCreate:
1796 raw := json.RawMessage(e.Commit.Record)
1797 record := tangled.LabelOp{}
1798 err = json.Unmarshal(raw, &record)
1799 if err != nil {
1800 return fmt.Errorf("invalid record: %w", err)
1801 }
1802
1803 subject := syntax.ATURI(record.Subject)
1804 collection := subject.Collection()
1805
1806 var repo *models.Repo
1807 switch collection {
1808 case tangled.RepoIssueNSID:
1809 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1810 if err != nil || len(i) != 1 {
1811 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1812 }
1813 repo = i[0].Repo
1814 case tangled.RepoPullNSID:
1815 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1816 if err != nil || len(p) != 1 {
1817 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1818 }
1819 repo = p[0].Repo
1820 default:
1821 return fmt.Errorf("unsupported label subject: %s", collection)
1822 }
1823
1824 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1825 if err != nil {
1826 return fmt.Errorf("failed to build label application ctx: %w", err)
1827 }
1828
1829 ops := models.LabelOpsFromRecord(did, rkey, record)
1830
1831 for _, o := range ops {
1832 def, ok := actx.Defs[o.OperandKey]
1833 if !ok {
1834 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1835 }
1836 if err := i.Validator.ValidateLabelOp(ctx, def, repo, &o); err != nil {
1837 if !errors.Is(err, knotacl.ErrKnotUnreachable) {
1838 return fmt.Errorf("failed to validate labelop: %w", err)
1839 }
1840 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", err)
1841 }
1842 }
1843
1844 tx, err := i.Db.Begin()
1845 if err != nil {
1846 return err
1847 }
1848 defer tx.Rollback()
1849
1850 for _, o := range ops {
1851 _, err = db.AddLabelOp(tx, &o)
1852 if err != nil {
1853 return fmt.Errorf("failed to add labelop: %w", err)
1854 }
1855 }
1856
1857 if err = tx.Commit(); err != nil {
1858 return err
1859 }
1860
1861 l.Info("ingested record")
1862 }
1863
1864 return nil
1865}