Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "compress/gzip"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "net/url"
15 "slices"
16 "strings"
17 "sync"
18
19 "time"
20
21 "github.com/avast/retry-go/v4"
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 jmodels "github.com/bluesky-social/jetstream/pkg/models"
24 "github.com/go-git/go-git/v5/plumbing"
25 "github.com/ipfs/go-cid"
26 "golang.org/x/sync/errgroup"
27 "tangled.org/core/api/tangled"
28 "tangled.org/core/appview/cache"
29 "tangled.org/core/appview/config"
30 "tangled.org/core/appview/db"
31 "tangled.org/core/appview/knotacl"
32 "tangled.org/core/appview/mentions"
33 "tangled.org/core/appview/models"
34 "tangled.org/core/appview/notify"
35 "tangled.org/core/appview/repoverify"
36 "tangled.org/core/appview/serververify"
37 "tangled.org/core/appview/validator"
38 "tangled.org/core/blobstore"
39 "tangled.org/core/idresolver"
40 "tangled.org/core/orm"
41 "tangled.org/core/rbac"
42)
43
44type Ingester struct {
45 Ctx context.Context
46 Db *db.DB
47 Enforcer *rbac.Enforcer
48 Acl *knotacl.Service
49 IdResolver *idresolver.Resolver
50 BlobStore blobstore.BlobStore
51 Cache *cache.Cache
52 Config *config.Config
53 Logger *slog.Logger
54 Validator *validator.Validator
55 MentionsResolver *mentions.Resolver
56 Notifier notify.Notifier
57 Verifier repoverify.Verifier
58}
59
60type processFunc func(ctx context.Context, e *jmodels.Event) error
61
62func (i *Ingester) Ingest() processFunc {
63 return func(ctx context.Context, e *jmodels.Event) error {
64 var err error
65
66 l := i.Logger.With("kind", e.Kind)
67 switch e.Kind {
68 case jmodels.EventKindAccount:
69 // TODO: sync account state to db
70 if e.Account.Active {
71 break
72 }
73 // TODO: revoke sessions by DID
74 if *e.Account.Status == "deactivated" {
75 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
76 }
77 case jmodels.EventKindIdentity:
78 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
79 case jmodels.EventKindCommit:
80 l = l.With(
81 "nsid", e.Commit.Collection,
82 "did", e.Did,
83 "rkey", e.Commit.RKey,
84 "cid", e.Commit.CID,
85 "op", e.Commit.Operation,
86 )
87 switch e.Commit.Collection {
88 case tangled.GraphFollowNSID:
89 err = i.ingestFollow(e, l)
90 case tangled.GraphVouchNSID:
91 err = i.ingestVouch(ctx, e, l)
92 case tangled.FeedStarNSID:
93 err = i.ingestStar(ctx, e, l)
94 case tangled.FeedReactionNSID:
95 err = i.ingestReaction(e, l)
96 case tangled.PublicKeyNSID:
97 err = i.ingestPublicKey(e, l)
98 case tangled.RepoArtifactNSID:
99 err = i.ingestArtifact(ctx, e, l)
100 case tangled.ActorProfileNSID:
101 err = i.ingestProfile(ctx, e, l)
102 case tangled.SpindleMemberNSID:
103 err = i.ingestSpindleMember(ctx, e, l)
104 case tangled.SpindleNSID:
105 err = i.ingestSpindle(ctx, e, l)
106 case tangled.KnotMemberNSID:
107 err = i.ingestKnotMember(ctx, e, l)
108 case tangled.KnotNSID:
109 err = i.ingestKnot(ctx, e, l)
110 case tangled.StringNSID:
111 err = i.ingestString(ctx, e, l)
112 case tangled.RepoIssueNSID:
113 err = i.ingestIssue(ctx, e, l)
114 case tangled.RepoPullNSID:
115 err = i.ingestPull(ctx, e, l)
116 case tangled.FeedCommentNSID:
117 err = i.ingestComment(e, l)
118 case tangled.RepoIssueCommentNSID:
119 err = i.ingestIssueComment(e, l)
120 case tangled.RepoPullCommentNSID:
121 err = i.ingestPullComment(e, l)
122 case tangled.LabelDefinitionNSID:
123 err = i.ingestLabelDefinition(e, l)
124 case tangled.LabelOpNSID:
125 err = i.ingestLabelOp(ctx, e, l)
126 case tangled.RepoNSID:
127 err = i.ingestRepo(ctx, e, l)
128 }
129 }
130
131 if err != nil {
132 l.Warn("failed to ingest record, skipping", "err", err)
133 }
134
135 lastTimeUs := e.TimeUS + 1
136 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
137 l.Error("failed to save cursor", "err", saveErr)
138 }
139
140 return nil
141 }
142}
143
144func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
145 if strings.HasPrefix(ref, "did:") {
146 return db.GetRepoByDid(i.Db, ref)
147 }
148 return db.GetRepoByAtUri(i.Db, ref)
149}
150
151func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
152 var legacy struct {
153 Subject *string `json:"subject"`
154 SubjectDid *string `json:"subjectDid"`
155 }
156 if err := json.Unmarshal(raw, &legacy); err != nil {
157 return false, err
158 }
159
160 switch {
161 case legacy.SubjectDid != nil:
162 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
163 if err != nil {
164 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
165 return false, nil
166 }
167 star.SubjectType = models.StarSubjectRepo
168 star.Subject = repo.RepoDid
169 return true, nil
170
171 case legacy.Subject != nil:
172 uri, err := syntax.ParseATURI(*legacy.Subject)
173 if err != nil {
174 return false, fmt.Errorf("invalid old-format star subject: %w", err)
175 }
176 switch uri.Collection().String() {
177 case tangled.RepoNSID:
178 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
179 if err != nil {
180 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
181 return false, nil
182 }
183 star.SubjectType = models.StarSubjectRepo
184 star.Subject = repo.RepoDid
185 return true, nil
186 default:
187 star.SubjectType = models.StarSubjectString
188 star.Subject = *legacy.Subject
189 return true, nil
190 }
191
192 default:
193 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
194 }
195}
196
197func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
198 var err error
199 did := e.Did
200
201 l = l.With("handler", "ingestStar")
202
203 switch e.Commit.Operation {
204 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
205 raw := json.RawMessage(e.Commit.Record)
206 record := tangled.FeedStar{}
207 unmarshalErr := json.Unmarshal(raw, &record)
208
209 star := &models.Star{
210 Did: did,
211 Rkey: e.Commit.RKey,
212 }
213
214 switch {
215 case unmarshalErr != nil:
216 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
217 if resolveErr != nil {
218 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
219 return unmarshalErr
220 }
221 if !resolved {
222 return nil
223 }
224
225 case record.Subject == nil:
226 return fmt.Errorf("star record has nil subject")
227
228 case record.Subject.FeedStar_Repo != nil:
229 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
230 if repoErr != nil {
231 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
232 return nil
233 }
234 star.SubjectType = models.StarSubjectRepo
235 star.Subject = repo.RepoDid
236
237 case record.Subject.FeedStar_String != nil:
238 star.SubjectType = models.StarSubjectString
239 star.Subject = record.Subject.FeedStar_String.Uri
240
241 default:
242 return fmt.Errorf("star record has empty subject union")
243 }
244
245 err = db.AddStar(i.Db, star)
246 case jmodels.CommitOperationDelete:
247 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
248 }
249
250 if err != nil {
251 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
252 }
253
254 l.Info("ingested record")
255 return nil
256}
257
258func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error {
259 var err error
260 did := e.Did
261
262 l = l.With("handler", "ingestFollow")
263
264 switch e.Commit.Operation {
265 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
266 raw := json.RawMessage(e.Commit.Record)
267 record := tangled.GraphFollow{}
268 err = json.Unmarshal(raw, &record)
269 if err != nil {
270 l.Error("invalid record", "err", err)
271 return err
272 }
273
274 err = db.AddFollow(i.Db, &models.Follow{
275 UserDid: did,
276 SubjectDid: record.Subject,
277 Rkey: e.Commit.RKey,
278 })
279 case jmodels.CommitOperationDelete:
280 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
281 }
282
283 if err != nil {
284 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
285 }
286
287 l.Info("ingested record")
288 return nil
289}
290
291func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
292 var err error
293 did := e.Did
294
295 l = l.With("handler", "ingestVouch")
296
297 switch e.Commit.Operation {
298 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
299 raw := json.RawMessage(e.Commit.Record)
300 record := tangled.GraphVouch{}
301 err = json.Unmarshal(raw, &record)
302 if err != nil {
303 l.Error("invalid record", "err", err)
304 return err
305 }
306
307 // rkey is the subject_did being vouched for/denounced
308 subjectDID := e.Commit.RKey
309
310 _, err = syntax.ParseDID(subjectDID)
311 if err != nil {
312 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
313 return fmt.Errorf("invalid subject_did: %w", err)
314 }
315
316 if did == subjectDID {
317 l.Warn("attempted self-vouch", "did", did)
318 return fmt.Errorf("cannot vouch for self")
319 }
320
321 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
322 if err != nil {
323 return err
324 }
325
326 if subjectId.Handle.IsInvalidHandle() {
327 return err
328 }
329
330 kind, err := models.ParseVouchKind(record.Kind)
331 if err != nil {
332 l.Error("invalid kind", "kind", kind)
333 return fmt.Errorf("invalid kind: %s", kind)
334 }
335
336 recordCid, err := cid.Parse(e.Commit.CID)
337 if err != nil {
338 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
339 return fmt.Errorf("invalid cid: %w", err)
340 }
341
342 var evidences []syntax.ATURI
343 for _, raw := range record.Evidences {
344 uri, parseErr := syntax.ParseATURI(raw)
345 if parseErr != nil {
346 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
347 continue
348 }
349 evidences = append(evidences, uri)
350 }
351
352 tx, txErr := i.Db.Begin()
353 if txErr != nil {
354 return fmt.Errorf("failed to start transaction: %w", txErr)
355 }
356
357 addErr := db.AddVouch(tx, &models.Vouch{
358 Did: syntax.DID(did),
359 SubjectDid: subjectId.DID,
360 Cid: recordCid,
361 Kind: kind,
362 Reason: record.Reason,
363 Evidences: evidences,
364 })
365 if addErr != nil {
366 tx.Rollback()
367 err = addErr
368 } else {
369 err = tx.Commit()
370 }
371
372 case jmodels.CommitOperationDelete:
373 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
374 }
375
376 if err != nil {
377 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
378 }
379
380 l.Info("ingested record")
381 return nil
382}
383
384func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error {
385 did := e.Did
386 var err error
387
388 l = l.With("handler", "ingestPublicKey")
389
390 switch e.Commit.Operation {
391 case jmodels.CommitOperationCreate:
392 l.Debug("processing add of pubkey")
393 raw := json.RawMessage(e.Commit.Record)
394 record := tangled.PublicKey{}
395 err = json.Unmarshal(raw, &record)
396 if err != nil {
397 l.Error("invalid record", "err", err)
398 return err
399 }
400
401 name := record.Name
402 key := record.Key
403 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
404 case jmodels.CommitOperationUpdate:
405 l.Debug("processing update of pubkey")
406 raw := json.RawMessage(e.Commit.Record)
407 record := tangled.PublicKey{}
408 err = json.Unmarshal(raw, &record)
409 if err != nil {
410 l.Error("invalid record", "err", err)
411 return err
412 }
413
414 name := record.Name
415 key := record.Key
416 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
417 case jmodels.CommitOperationDelete:
418 l.Debug("processing delete of pubkey")
419 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
420 }
421
422 if err != nil {
423 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
424 }
425
426 l.Info("ingested record")
427 return nil
428}
429
430func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
431 did := e.Did
432 var err error
433
434 l = l.With("handler", "ingestArtifact")
435
436 switch e.Commit.Operation {
437 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
438 raw := json.RawMessage(e.Commit.Record)
439 record := tangled.RepoArtifact{}
440 err = json.Unmarshal(raw, &record)
441 if err != nil {
442 l.Error("invalid record", "err", err)
443 return err
444 }
445
446 var repo *models.Repo
447 if record.RepoDid != nil && *record.RepoDid != "" {
448 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
449 if err != nil && !errors.Is(err, sql.ErrNoRows) {
450 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
451 }
452 }
453 if repo == nil && record.Repo != nil {
454 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
455 if parseErr != nil {
456 return parseErr
457 }
458 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
459 if err != nil {
460 return err
461 }
462 }
463 if repo == nil {
464 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
465 }
466
467 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push")
468 if permErr != nil {
469 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr)
470 } else if !allowed {
471 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier())
472 return nil
473 }
474
475 repoDid := repo.RepoDid
476 if repoDid == "" && record.RepoDid != nil {
477 repoDid = *record.RepoDid
478 }
479 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
480 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
481 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
482 }
483 }
484
485 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
486 if parseErr != nil {
487 createdAt = time.Now()
488 }
489
490 artifact := models.Artifact{
491 Did: did,
492 Rkey: e.Commit.RKey,
493 RepoDid: syntax.DID(repo.RepoDid),
494 Tag: plumbing.Hash(record.Tag),
495 CreatedAt: createdAt,
496 BlobCid: cid.Cid(record.Artifact.Ref),
497 Name: record.Name,
498 Size: uint64(record.Artifact.Size),
499 MimeType: record.Artifact.MimeType,
500 }
501
502 err = db.AddArtifact(i.Db, artifact)
503 case jmodels.CommitOperationDelete:
504 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
505 }
506
507 if err != nil {
508 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
509 }
510
511 l.Info("ingested record")
512 return nil
513}
514
515func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
516 did := e.Did
517 var err error
518
519 l = l.With("handler", "ingestProfile")
520
521 if e.Commit.RKey != "self" {
522 return fmt.Errorf("ingestProfile only ingests `self` record")
523 }
524
525 switch e.Commit.Operation {
526 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
527 raw := json.RawMessage(e.Commit.Record)
528 record := tangled.ActorProfile{}
529 err = json.Unmarshal(raw, &record)
530 if err != nil {
531 l.Error("invalid record", "err", err)
532 return err
533 }
534
535 avatar := ""
536 if record.Avatar != nil {
537 avatar = record.Avatar.Ref.String()
538 }
539
540 description := ""
541 if record.Description != nil {
542 description = *record.Description
543 }
544
545 includeBluesky := record.Bluesky
546
547 pronouns := ""
548 if record.Pronouns != nil {
549 pronouns = *record.Pronouns
550 }
551
552 location := ""
553 if record.Location != nil {
554 location = *record.Location
555 }
556
557 var links [5]string
558 for i, l := range record.Links {
559 if i < 5 {
560 links[i] = l
561 }
562 }
563
564 var stats [2]models.VanityStat
565 for i, s := range record.Stats {
566 if i < 2 {
567 stats[i].Kind = models.ParseVanityStatKind(s)
568 }
569 }
570
571 var pinned [6]string
572 for i, r := range record.PinnedRepositories {
573 if i < 6 {
574 pinned[i] = r
575 }
576 }
577
578 var preferredHandle syntax.Handle
579 if record.PreferredHandle != nil {
580 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
581 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
582 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
583 preferredHandle = h
584 }
585 }
586 }
587
588 profile := models.Profile{
589 Did: did,
590 Avatar: avatar,
591 Description: description,
592 IncludeBluesky: includeBluesky,
593 Location: location,
594 Links: links,
595 Stats: stats,
596 PinnedRepos: pinned,
597 Pronouns: pronouns,
598 PreferredHandle: preferredHandle,
599 }
600
601 tx, err := i.Db.Begin()
602 if err != nil {
603 return fmt.Errorf("failed to start transaction: %w", err)
604 }
605
606 err = db.ValidateProfile(tx, &profile)
607 if err != nil {
608 return fmt.Errorf("invalid profile record")
609 }
610
611 err = db.UpsertProfile(tx, &profile)
612 if err == nil && i.Cache != nil {
613 pipe := i.Cache.Pipeline()
614 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
615 if preferredHandle != "" {
616 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
617 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
618 } else {
619 pipe.Del(ctx, didKey)
620 }
621 if _, execErr := pipe.Exec(ctx); execErr != nil {
622 l.Warn("failed to update preferred handle cache", "err", execErr)
623 }
624 }
625 case jmodels.CommitOperationDelete:
626 tx, beginErr := i.Db.Begin()
627 if beginErr != nil {
628 return fmt.Errorf("failed to start transaction: %w", beginErr)
629 }
630
631 priorHandle, phErr := db.GetPreferredHandle(tx, did)
632 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
633 l.Warn("failed to read prior preferred handle", "err", phErr)
634 }
635
636 err = db.DeleteProfile(tx, did)
637 if err == nil && i.Cache != nil {
638 pipe := i.Cache.Pipeline()
639 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
640 if priorHandle != "" {
641 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
642 }
643 if _, execErr := pipe.Exec(ctx); execErr != nil {
644 l.Warn("failed to evict preferred handle cache", "err", execErr)
645 }
646 }
647 }
648
649 if err != nil {
650 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
651 }
652
653 l.Info("ingested record")
654 return nil
655}
656
657func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
658 did := e.Did
659 var err error
660
661 l = l.With("handler", "ingestSpindleMember")
662
663 switch e.Commit.Operation {
664 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
665 raw := json.RawMessage(e.Commit.Record)
666 record := tangled.SpindleMember{}
667 err = json.Unmarshal(raw, &record)
668 if err != nil {
669 l.Error("invalid record", "err", err)
670 return err
671 }
672
673 // only spindle owner can invite to spindles
674 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
675 if err != nil {
676 return fmt.Errorf("failed to check invite permission: %w", err)
677 }
678 if !ok {
679 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
680 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
681 }
682 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
683 if err != nil {
684 return fmt.Errorf("failed to re-check invite permission: %w", err)
685 }
686 if !ok {
687 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
688 }
689 }
690
691 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
692 if err != nil {
693 return err
694 }
695
696 if memberId.Handle.IsInvalidHandle() {
697 return fmt.Errorf("invalid handle for member %s", record.Subject)
698 }
699
700 existing, err := db.GetSpindleMembers(i.Db,
701 orm.FilterEq("did", did),
702 orm.FilterEq("rkey", e.Commit.RKey),
703 )
704 if err != nil {
705 return fmt.Errorf("failed to look up existing member: %w", err)
706 }
707 if len(existing) > 1 {
708 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
709 }
710
711 tx, err := i.Db.Begin()
712 if err != nil {
713 return fmt.Errorf("failed to start txn: %w", err)
714 }
715 committed := false
716 defer func() {
717 if committed {
718 return
719 }
720 tx.Rollback()
721 i.Enforcer.E.LoadPolicy()
722 }()
723
724 if len(existing) == 1 {
725 prev := existing[0]
726 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
727 if err = db.RemoveSpindleMember(tx,
728 orm.FilterEq("did", did),
729 orm.FilterEq("rkey", e.Commit.RKey),
730 ); err != nil {
731 return fmt.Errorf("failed to remove stale row: %w", err)
732 }
733 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
734 return fmt.Errorf("failed to remove stale ACL: %w", err)
735 }
736 }
737 }
738
739 if err = db.AddSpindleMember(tx, models.SpindleMember{
740 Did: syntax.DID(did),
741 Rkey: e.Commit.RKey,
742 Instance: record.Instance,
743 Subject: memberId.DID,
744 }); err != nil {
745 return fmt.Errorf("failed to add to db: %w", err)
746 }
747
748 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
749 return fmt.Errorf("failed to update ACLs: %w", err)
750 }
751
752 if err = tx.Commit(); err != nil {
753 return fmt.Errorf("failed to commit txn: %w", err)
754 }
755
756 if err = i.Enforcer.E.SavePolicy(); err != nil {
757 return fmt.Errorf("failed to save ACLs: %w", err)
758 }
759 committed = true
760
761 l.Info("upserted spindle member")
762 case jmodels.CommitOperationDelete:
763 rkey := e.Commit.RKey
764
765 // get record from db first
766 members, err := db.GetSpindleMembers(
767 i.Db,
768 orm.FilterEq("did", did),
769 orm.FilterEq("rkey", rkey),
770 )
771 if err != nil || len(members) != 1 {
772 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
773 }
774 member := members[0]
775
776 tx, err := i.Db.Begin()
777 if err != nil {
778 return fmt.Errorf("failed to start txn: %w", err)
779 }
780 committed := false
781 defer func() {
782 if committed {
783 return
784 }
785 tx.Rollback()
786 i.Enforcer.E.LoadPolicy()
787 }()
788
789 // remove record by rkey && update enforcer
790 if err = db.RemoveSpindleMember(
791 tx,
792 orm.FilterEq("did", did),
793 orm.FilterEq("rkey", rkey),
794 ); err != nil {
795 return fmt.Errorf("failed to remove from db: %w", err)
796 }
797
798 // update enforcer
799 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
800 if err != nil {
801 return fmt.Errorf("failed to update ACLs: %w", err)
802 }
803
804 if err = tx.Commit(); err != nil {
805 return fmt.Errorf("failed to commit txn: %w", err)
806 }
807
808 if err = i.Enforcer.E.SavePolicy(); err != nil {
809 return fmt.Errorf("failed to save ACLs: %w", err)
810 }
811 committed = true
812
813 l.Info("removed spindle member")
814 }
815
816 return nil
817}
818
819func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
820 did := e.Did
821 var err error
822
823 l = l.With("handler", "ingestSpindle")
824
825 switch e.Commit.Operation {
826 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
827 raw := json.RawMessage(e.Commit.Record)
828 record := tangled.Spindle{}
829 err = json.Unmarshal(raw, &record)
830 if err != nil {
831 l.Error("invalid record", "err", err)
832 return err
833 }
834
835 instance := e.Commit.RKey
836
837 err := db.AddSpindle(i.Db, models.Spindle{
838 Owner: syntax.DID(did),
839 Instance: instance,
840 })
841 if err != nil {
842 l.Error("failed to add spindle to db", "err", err, "instance", instance)
843 return err
844 }
845
846 if err := i.verifySpindle(ctx, instance, did); err != nil {
847 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
848 }
849
850 l.Info("ingested record", "instance", instance)
851 return nil
852
853 case jmodels.CommitOperationDelete:
854 instance := e.Commit.RKey
855
856 // get record from db first
857 spindles, err := db.GetSpindles(
858 ctx,
859 i.Db,
860 orm.FilterEq("owner", did),
861 orm.FilterEq("instance", instance),
862 )
863 if err != nil || len(spindles) != 1 {
864 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
865 }
866 spindle := spindles[0]
867
868 tx, err := i.Db.Begin()
869 if err != nil {
870 return fmt.Errorf("failed to start txn: %w", err)
871 }
872 defer func() {
873 tx.Rollback()
874 i.Enforcer.E.LoadPolicy()
875 }()
876
877 // remove spindle members first
878 err = db.RemoveSpindleMember(
879 tx,
880 orm.FilterEq("owner", did),
881 orm.FilterEq("instance", instance),
882 )
883 if err != nil {
884 return fmt.Errorf("failed to remove spindle members: %w", err)
885 }
886
887 err = db.DeleteSpindle(
888 tx,
889 orm.FilterEq("owner", did),
890 orm.FilterEq("instance", instance),
891 )
892 if err != nil {
893 return fmt.Errorf("failed to delete spindle: %w", err)
894 }
895
896 if spindle.Verified != nil {
897 err = i.Enforcer.RemoveSpindle(instance)
898 if err != nil {
899 return fmt.Errorf("failed to remove spindle from enforcer: %w", err)
900 }
901 }
902
903 err = tx.Commit()
904 if err != nil {
905 return fmt.Errorf("failed to commit txn: %w", err)
906 }
907
908 err = i.Enforcer.E.SavePolicy()
909 if err != nil {
910 return fmt.Errorf("failed to save ACLs: %w", err)
911 }
912
913 l.Info("ingested record", "instance", instance)
914 }
915
916 return nil
917}
918
919func (i *Ingester) ingestString(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
920 did := e.Did
921 rkey := e.Commit.RKey
922
923 var err error
924
925 l = l.With("handler", "ingestString")
926
927 switch e.Commit.Operation {
928 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
929 raw := json.RawMessage(e.Commit.Record)
930 record := tangled.String{}
931 err = json.Unmarshal(raw, &record)
932 if err != nil {
933 l.Error("invalid record", "err", err)
934 return err
935 }
936
937 str, err := models.StringFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(e.Commit.CID), record)
938 if err != nil {
939 return fmt.Errorf("failed to parse string record: %w", err)
940 }
941 if err = str.Validate(); err != nil {
942 l.Error("invalid record", "err", err)
943 return err
944 }
945
946 g, gctx := errgroup.WithContext(ctx)
947 for idx, file := range str.Files {
948 if file.Gzip == nil {
949 continue
950 }
951 g.Go(func() error {
952 blob, err := i.BlobStore.GetBlob(gctx, str.Did, cid.Cid(file.Content.Ref))
953 if err != nil {
954 return fmt.Errorf("files[%d]: failed to fetch blob: %w", idx, err)
955 }
956 defer blob.Close()
957 gzr, err := gzip.NewReader(blob)
958 if err != nil {
959 return fmt.Errorf("files[%d]: invalid gzip stream: %w", idx, err)
960 }
961 gzr.Close()
962
963 content, err := io.ReadAll(gzr)
964 if err != nil {
965 return fmt.Errorf("files[%d]: failed to read blob: %w", idx, err)
966 }
967 file.Gzip.Content = string(content)
968 str.Files[idx] = file
969 return nil
970 })
971 }
972 if err := g.Wait(); err != nil {
973 return err
974 }
975
976 if err = db.AddString(i.Db, str); err != nil {
977 l.Error("failed to add string", "err", err)
978 return err
979 }
980
981 l.Info("ingested record")
982 return nil
983
984 case jmodels.CommitOperationDelete:
985 if err := db.DeleteString(
986 i.Db,
987 orm.FilterEq("did", did),
988 orm.FilterEq("rkey", rkey),
989 ); err != nil {
990 l.Error("failed to delete", "err", err)
991 return fmt.Errorf("failed to delete string record: %w", err)
992 }
993
994 l.Info("ingested record")
995 return nil
996 }
997
998 return nil
999}
1000
1001func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1002 did := e.Did
1003 var err error
1004
1005 l = l.With("handler", "ingestKnotMember")
1006
1007 switch e.Commit.Operation {
1008 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1009 raw := json.RawMessage(e.Commit.Record)
1010 record := tangled.KnotMember{}
1011 err = json.Unmarshal(raw, &record)
1012 if err != nil {
1013 l.Error("invalid record", "err", err)
1014 return err
1015 }
1016
1017 // only knot owner can invite to knots
1018 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
1019 if err != nil {
1020 return fmt.Errorf("failed to check invite permission: %w", err)
1021 }
1022 if !ok {
1023 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
1024 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
1025 }
1026 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
1027 if err != nil {
1028 return fmt.Errorf("failed to re-check invite permission: %w", err)
1029 }
1030 if !ok {
1031 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
1032 }
1033 }
1034
1035 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
1036 if err != nil {
1037 return err
1038 }
1039
1040 if memberId.Handle.IsInvalidHandle() {
1041 return fmt.Errorf("invalid handle for member %s", record.Subject)
1042 }
1043
1044 existing, err := db.GetKnotMembers(i.Db,
1045 orm.FilterEq("did", did),
1046 orm.FilterEq("rkey", e.Commit.RKey),
1047 )
1048 if err != nil {
1049 return fmt.Errorf("failed to look up existing member: %w", err)
1050 }
1051 if len(existing) > 1 {
1052 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1053 }
1054
1055 tx, err := i.Db.Begin()
1056 if err != nil {
1057 return fmt.Errorf("failed to start txn: %w", err)
1058 }
1059 committed := false
1060 defer func() {
1061 if committed {
1062 return
1063 }
1064 tx.Rollback()
1065 i.Enforcer.E.LoadPolicy()
1066 }()
1067
1068 if len(existing) == 1 {
1069 prev := existing[0]
1070 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1071 if err = db.RemoveKnotMember(tx,
1072 orm.FilterEq("did", did),
1073 orm.FilterEq("rkey", e.Commit.RKey),
1074 ); err != nil {
1075 return fmt.Errorf("failed to remove stale row: %w", err)
1076 }
1077 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1078 return fmt.Errorf("failed to remove stale ACL: %w", err)
1079 }
1080 }
1081 }
1082
1083 if err = db.AddKnotMember(tx, models.KnotMember{
1084 Did: syntax.DID(did),
1085 Rkey: e.Commit.RKey,
1086 Domain: record.Domain,
1087 Subject: memberId.DID,
1088 }); err != nil {
1089 return fmt.Errorf("failed to add to db: %w", err)
1090 }
1091
1092 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1093 return fmt.Errorf("failed to update ACLs: %w", err)
1094 }
1095
1096 if err = tx.Commit(); err != nil {
1097 return fmt.Errorf("failed to commit txn: %w", err)
1098 }
1099
1100 if err = i.Enforcer.E.SavePolicy(); err != nil {
1101 return fmt.Errorf("failed to save ACLs: %w", err)
1102 }
1103 committed = true
1104
1105 l.Info("upserted knot member")
1106 case jmodels.CommitOperationDelete:
1107 rkey := e.Commit.RKey
1108
1109 members, err := db.GetKnotMembers(
1110 i.Db,
1111 orm.FilterEq("did", did),
1112 orm.FilterEq("rkey", rkey),
1113 )
1114 if err != nil {
1115 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1116 }
1117 if len(members) == 0 {
1118 l.Info("knot member already removed", "rkey", rkey)
1119 return nil
1120 }
1121 if len(members) > 1 {
1122 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1123 }
1124 member := members[0]
1125
1126 tx, err := i.Db.Begin()
1127 if err != nil {
1128 return fmt.Errorf("failed to start txn: %w", err)
1129 }
1130 committed := false
1131 defer func() {
1132 if committed {
1133 return
1134 }
1135 tx.Rollback()
1136 i.Enforcer.E.LoadPolicy()
1137 }()
1138
1139 if err = db.RemoveKnotMember(
1140 tx,
1141 orm.FilterEq("did", did),
1142 orm.FilterEq("rkey", rkey),
1143 ); err != nil {
1144 return fmt.Errorf("failed to remove from db: %w", err)
1145 }
1146
1147 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1148 return fmt.Errorf("failed to update ACLs: %w", err)
1149 }
1150
1151 if err = tx.Commit(); err != nil {
1152 return fmt.Errorf("failed to commit txn: %w", err)
1153 }
1154
1155 if err = i.Enforcer.E.SavePolicy(); err != nil {
1156 return fmt.Errorf("failed to save ACLs: %w", err)
1157 }
1158 committed = true
1159
1160 l.Info("removed knot member")
1161 }
1162
1163 return nil
1164}
1165
1166func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1167 did := e.Did
1168 var err error
1169
1170 l = l.With("handler", "ingestKnot")
1171
1172 switch e.Commit.Operation {
1173 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1174 raw := json.RawMessage(e.Commit.Record)
1175 record := tangled.Knot{}
1176 err = json.Unmarshal(raw, &record)
1177 if err != nil {
1178 l.Error("invalid record", "err", err)
1179 return err
1180 }
1181
1182 domain := e.Commit.RKey
1183
1184 err := db.AddKnot(i.Db, domain, did)
1185 if err != nil {
1186 l.Error("failed to add knot to db", "err", err, "domain", domain)
1187 return err
1188 }
1189
1190 if err := i.verifyKnot(ctx, domain, did); err != nil {
1191 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1192 }
1193
1194 l.Info("ingested record", "domain", domain)
1195 return nil
1196
1197 case jmodels.CommitOperationDelete:
1198 domain := e.Commit.RKey
1199
1200 // get record from db first
1201 registrations, err := db.GetRegistrations(
1202 i.Db,
1203 orm.FilterEq("domain", domain),
1204 orm.FilterEq("did", did),
1205 )
1206 if err != nil {
1207 return fmt.Errorf("failed to get registration: %w", err)
1208 }
1209 if len(registrations) != 1 {
1210 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1211 }
1212 registration := registrations[0]
1213
1214 tx, err := i.Db.Begin()
1215 if err != nil {
1216 return fmt.Errorf("failed to start txn: %w", err)
1217 }
1218 defer func() {
1219 tx.Rollback()
1220 i.Enforcer.E.LoadPolicy()
1221 }()
1222
1223 err = db.RemoveKnotMember(
1224 tx,
1225 orm.FilterEq("did", did),
1226 orm.FilterEq("domain", domain),
1227 )
1228 if err != nil {
1229 return fmt.Errorf("failed to remove knot members: %w", err)
1230 }
1231
1232 err = db.DeleteKnot(
1233 tx,
1234 orm.FilterEq("did", did),
1235 orm.FilterEq("domain", domain),
1236 )
1237 if err != nil {
1238 return fmt.Errorf("failed to delete knot: %w", err)
1239 }
1240
1241 err = db.RemoveReposByKnot(tx, domain)
1242 if err != nil {
1243 return fmt.Errorf("failed to remove repos by knot: %w", err)
1244 }
1245
1246 if registration.Registered != nil {
1247 err = i.Enforcer.RemoveKnot(domain)
1248 if err != nil {
1249 return fmt.Errorf("failed to remove knot from enforcer: %w", err)
1250 }
1251 }
1252
1253 err = tx.Commit()
1254 if err != nil {
1255 return fmt.Errorf("failed to commit txn: %w", err)
1256 }
1257
1258 err = i.Enforcer.E.SavePolicy()
1259 if err != nil {
1260 return fmt.Errorf("failed to save ACLs: %w", err)
1261 }
1262
1263 l.Info("ingested record", "domain", domain)
1264 }
1265
1266 return nil
1267}
1268
1269const (
1270 verifyAttempts = 4
1271 verifyMinDelay = 1 * time.Second
1272 verifyMaxDelay = 5 * time.Second
1273)
1274
1275func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1276 regs, err := db.GetRegistrations(i.Db,
1277 orm.FilterEq("domain", domain),
1278 orm.FilterEq("did", did),
1279 )
1280 if err != nil {
1281 return fmt.Errorf("look up registration: %w", err)
1282 }
1283 if len(regs) != 1 {
1284 return fmt.Errorf("no registration for %s by %s", domain, did)
1285 }
1286 if regs[0].Registered != nil {
1287 return nil
1288 }
1289
1290 err = retry.Do(
1291 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1292 retry.Context(ctx),
1293 retry.Attempts(verifyAttempts),
1294 retry.Delay(verifyMinDelay),
1295 retry.MaxDelay(verifyMaxDelay),
1296 retry.DelayType(retry.BackOffDelay),
1297 retry.LastErrorOnly(true),
1298 )
1299 if err != nil {
1300 return fmt.Errorf("verify: %w", err)
1301 }
1302 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1303}
1304
1305func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1306 spindles, err := db.GetSpindles(ctx, i.Db,
1307 orm.FilterEq("instance", instance),
1308 orm.FilterEq("owner", did),
1309 )
1310 if err != nil {
1311 return fmt.Errorf("look up spindle: %w", err)
1312 }
1313 if len(spindles) != 1 {
1314 return fmt.Errorf("no spindle for %s by %s", instance, did)
1315 }
1316 if spindles[0].Verified != nil {
1317 return nil
1318 }
1319
1320 err = retry.Do(
1321 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1322 retry.Context(ctx),
1323 retry.Attempts(verifyAttempts),
1324 retry.Delay(verifyMinDelay),
1325 retry.MaxDelay(verifyMaxDelay),
1326 retry.DelayType(retry.BackOffDelay),
1327 retry.LastErrorOnly(true),
1328 )
1329 if err != nil {
1330 return fmt.Errorf("verify: %w", err)
1331 }
1332 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1333 return err
1334}
1335
1336const sweepConcurrency = 4
1337
1338func (i *Ingester) SweepPendingVerifications() {
1339 l := i.Logger.With("handler", "SweepPendingVerifications")
1340
1341 var g errgroup.Group
1342 g.SetLimit(sweepConcurrency)
1343
1344 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1345 if err != nil {
1346 l.Error("failed to list unverified knots", "err", err)
1347 } else {
1348 for _, reg := range regs {
1349 g.Go(func() error {
1350 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1351 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1352 }
1353 return nil
1354 })
1355 }
1356 }
1357
1358 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1359 if err != nil {
1360 l.Error("failed to list unverified spindles", "err", err)
1361 g.Wait()
1362 return
1363 }
1364 for _, s := range spindles {
1365 g.Go(func() error {
1366 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1367 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1368 }
1369 return nil
1370 })
1371 }
1372 g.Wait()
1373}
1374
1375func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1376 did := e.Did
1377 rkey := e.Commit.RKey
1378
1379 var err error
1380
1381 l = l.With("handler", "ingestIssue")
1382
1383 switch e.Commit.Operation {
1384 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1385 raw := json.RawMessage(e.Commit.Record)
1386 record := tangled.RepoIssue{}
1387 err = json.Unmarshal(raw, &record)
1388 if err != nil {
1389 l.Error("invalid record", "err", err)
1390 return err
1391 }
1392
1393 issue := models.IssueFromRecord(did, rkey, record)
1394
1395 if issue.RepoDid == "" {
1396 return fmt.Errorf("issue record has no repo field")
1397 }
1398 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1399 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1400 }
1401
1402 if err := i.Validator.ValidateIssue(&issue); err != nil {
1403 return fmt.Errorf("failed to validate issue: %w", err)
1404 }
1405
1406 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1407 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1408 if repoErr == nil && repo.RepoDid != "" {
1409 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1410 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1411 }
1412 }
1413 }
1414
1415 tx, err := i.Db.BeginTx(ctx, nil)
1416 if err != nil {
1417 l.Error("failed to begin transaction", "err", err)
1418 return err
1419 }
1420 defer tx.Rollback()
1421
1422 err = db.PutIssue(tx, &issue)
1423 if err != nil {
1424 l.Error("failed to create issue", "err", err)
1425 return err
1426 }
1427
1428 err = tx.Commit()
1429 if err != nil {
1430 l.Error("failed to commit txn", "err", err)
1431 return err
1432 }
1433
1434 l.Info("ingested record")
1435 return nil
1436
1437 case jmodels.CommitOperationDelete:
1438 tx, err := i.Db.BeginTx(ctx, nil)
1439 if err != nil {
1440 l.Error("failed to begin transaction", "err", err)
1441 return err
1442 }
1443 defer tx.Rollback()
1444
1445 if err := db.DeleteIssues(
1446 tx,
1447 did,
1448 rkey,
1449 ); err != nil {
1450 l.Error("failed to delete", "err", err)
1451 return fmt.Errorf("failed to delete issue record: %w", err)
1452 }
1453 if err := tx.Commit(); err != nil {
1454 l.Error("failed to commit txn", "err", err)
1455 return err
1456 }
1457
1458 l.Info("ingested record")
1459 return nil
1460 }
1461
1462 return nil
1463}
1464
1465func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1466 did := e.Did
1467 rkey := e.Commit.RKey
1468
1469 var err error
1470
1471 l = l.With("handler", "ingestPull")
1472
1473 switch e.Commit.Operation {
1474 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1475 raw := json.RawMessage(e.Commit.Record)
1476 record := tangled.RepoPull{}
1477 err = json.Unmarshal(raw, &record)
1478 if err != nil {
1479 l.Error("invalid record", "err", err)
1480 return err
1481 }
1482
1483 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1484 if err != nil {
1485 l.Error("failed to resolve did", "err", err)
1486 return err
1487 }
1488
1489 // go through and fetch all blobs in parallel
1490 readers := make([]*io.ReadCloser, len(record.Rounds))
1491 var mu sync.Mutex
1492
1493 g, gctx := errgroup.WithContext(ctx)
1494
1495 for idx, b := range record.Rounds {
1496 g.Go(func() error {
1497 // for some reason, a blob is empty
1498 if b.PatchBlob == nil {
1499 return fmt.Errorf("missing patchBlob in round %d", idx)
1500 }
1501
1502 ownerPds := ownerId.PDSEndpoint()
1503 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1504 q := url.Query()
1505 q.Set("cid", b.PatchBlob.Ref.String())
1506 q.Set("did", did)
1507 url.RawQuery = q.Encode()
1508
1509 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1510 if err != nil {
1511 l.Error("failed to create request")
1512 return err
1513 }
1514 req.Header.Set("Content-Type", "application/json")
1515
1516 resp, err := http.DefaultClient.Do(req)
1517 if err != nil {
1518 l.Error("failed to make request")
1519 return err
1520 }
1521
1522 mu.Lock()
1523 readers[idx] = &resp.Body
1524 mu.Unlock()
1525
1526 return nil
1527 })
1528 }
1529
1530 if err := g.Wait(); err != nil {
1531 for _, r := range readers {
1532 if r != nil && *r != nil {
1533 (*r).Close()
1534 }
1535 }
1536 return err
1537 }
1538
1539 defer func() {
1540 for _, r := range readers {
1541 if r != nil && *r != nil {
1542 (*r).Close()
1543 }
1544 }
1545 }()
1546
1547 pull, err := models.PullFromRecord(did, rkey, record, readers)
1548 if err != nil {
1549 return fmt.Errorf("failed to parse pull from record: %w", err)
1550 }
1551 if err := i.Validator.ValidatePull(pull); err != nil {
1552 return fmt.Errorf("failed to validate pull: %w", err)
1553 }
1554
1555 tx, err := i.Db.BeginTx(ctx, nil)
1556 if err != nil {
1557 l.Error("failed to begin transaction", "err", err)
1558 return err
1559 }
1560 defer tx.Rollback()
1561
1562 err = db.PutPull(tx, pull)
1563 if err != nil {
1564 l.Error("failed to create pull", "err", err)
1565 return err
1566 }
1567
1568 err = tx.Commit()
1569 if err != nil {
1570 l.Error("failed to commit txn", "err", err)
1571 return err
1572 }
1573
1574 l.Info("ingested record")
1575 return nil
1576
1577 case jmodels.CommitOperationDelete:
1578 tx, err := i.Db.BeginTx(ctx, nil)
1579 if err != nil {
1580 l.Error("failed to begin transaction", "err", err)
1581 return err
1582 }
1583 defer tx.Rollback()
1584
1585 if err := db.AbandonPulls(
1586 tx,
1587 orm.FilterEq("owner_did", did),
1588 orm.FilterEq("rkey", rkey),
1589 ); err != nil {
1590 l.Error("failed to abandon", "err", err)
1591 return fmt.Errorf("failed to abandon pull record: %w", err)
1592 }
1593 if err := tx.Commit(); err != nil {
1594 l.Error("failed to commit txn", "err", err)
1595 return err
1596 }
1597
1598 l.Info("ingested record")
1599 return nil
1600 }
1601
1602 return nil
1603}
1604
1605// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1606func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error {
1607 l = l.With("handler", "ingestIssueComment")
1608
1609 switch e.Commit.Operation {
1610 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1611 // no-op. sh.tangled.repo.issue.comment is deprecated
1612
1613 case jmodels.CommitOperationDelete:
1614 if err := db.PurgeComments(
1615 i.Db,
1616 orm.FilterEq("did", e.Did),
1617 orm.FilterEq("collection", e.Commit.Collection),
1618 orm.FilterEq("rkey", e.Commit.RKey),
1619 ); err != nil {
1620 return fmt.Errorf("failed to delete comment record: %w", err)
1621 }
1622 }
1623
1624 l.Info("ingested record")
1625 return nil
1626}
1627
1628// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1629func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error {
1630 l = l.With("handler", "ingestPullComment")
1631
1632 switch e.Commit.Operation {
1633 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1634 // no-op. sh.tangled.repo.pull.comment is deprecated
1635
1636 case jmodels.CommitOperationDelete:
1637 if err := db.PurgeComments(
1638 i.Db,
1639 orm.FilterEq("did", e.Did),
1640 orm.FilterEq("collection", e.Commit.Collection),
1641 orm.FilterEq("rkey", e.Commit.RKey),
1642 ); err != nil {
1643 return fmt.Errorf("failed to delete comment record: %w", err)
1644 }
1645 }
1646
1647 l.Info("ingested record")
1648 return nil
1649}
1650
1651func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error {
1652 did := e.Did
1653 rkey := e.Commit.RKey
1654 cid := e.Commit.CID
1655
1656 var err error
1657
1658 l = l.With("handler", "ingestComment")
1659
1660 ctx := context.Background()
1661
1662 switch e.Commit.Operation {
1663 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1664 raw := json.RawMessage(e.Commit.Record)
1665 record := tangled.FeedComment{}
1666 err = json.Unmarshal(raw, &record)
1667 if err != nil {
1668 return fmt.Errorf("invalid record: %w", err)
1669 }
1670
1671 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1672 if err != nil {
1673 return fmt.Errorf("failed to parse comment from record: %w", err)
1674 }
1675
1676 if err := comment.Validate(); err != nil {
1677 return fmt.Errorf("failed to validate comment: %w", err)
1678 }
1679
1680 var references []syntax.ATURI
1681 if comment.Body.Original != nil {
1682 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1683 }
1684
1685 tx, err := i.Db.Begin()
1686 if err != nil {
1687 return fmt.Errorf("failed to start transaction: %w", err)
1688 }
1689 defer tx.Rollback()
1690
1691 _, err = db.PutComment(tx, comment, references)
1692 if err != nil {
1693 return fmt.Errorf("failed to create comment: %w", err)
1694 }
1695
1696 if err := tx.Commit(); err != nil {
1697 return err
1698 }
1699
1700 case jmodels.CommitOperationDelete:
1701 if err := db.DeleteComments(
1702 i.Db,
1703 orm.FilterEq("did", did),
1704 orm.FilterEq("collection", e.Commit.Collection),
1705 orm.FilterEq("rkey", rkey),
1706 ); err != nil {
1707 return fmt.Errorf("failed to delete comment record: %w", err)
1708 }
1709 }
1710
1711 l.Info("ingested record")
1712 return nil
1713}
1714
1715func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error {
1716 did := e.Did
1717 rkey := e.Commit.RKey
1718
1719 l = l.With("handler", "ingestReaction")
1720
1721 switch e.Commit.Operation {
1722 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1723 raw := json.RawMessage(e.Commit.Record)
1724 record := tangled.FeedReaction{}
1725 if err := json.Unmarshal(raw, &record); err != nil {
1726 return fmt.Errorf("invalid record: %w", err)
1727 }
1728
1729 subjectUri, err := syntax.ParseATURI(record.Subject)
1730 if err != nil {
1731 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1732 }
1733 subjectUri = models.NormalizeReactionSubject(subjectUri)
1734
1735 kind, ok := models.ParseReactionKind(record.Reaction)
1736 if !ok {
1737 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1738 }
1739
1740 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1741 if parseErr != nil {
1742 created = time.Now()
1743 }
1744
1745 tx, err := i.Db.Begin()
1746 if err != nil {
1747 return fmt.Errorf("failed to start transaction: %w", err)
1748 }
1749 defer tx.Rollback()
1750
1751 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil {
1752 return fmt.Errorf("failed to clear existing reaction: %w", err)
1753 }
1754 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil {
1755 return fmt.Errorf("failed to add reaction: %w", err)
1756 }
1757
1758 if err := tx.Commit(); err != nil {
1759 return err
1760 }
1761
1762 case jmodels.CommitOperationDelete:
1763 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1764 return fmt.Errorf("failed to delete reaction record: %w", err)
1765 }
1766 }
1767
1768 l.Info("ingested record")
1769 return nil
1770}
1771
1772func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error {
1773 did := e.Did
1774 rkey := e.Commit.RKey
1775
1776 var err error
1777
1778 l = l.With("handler", "ingestLabelDefinition")
1779
1780 switch e.Commit.Operation {
1781 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1782 raw := json.RawMessage(e.Commit.Record)
1783 record := tangled.LabelDefinition{}
1784 err = json.Unmarshal(raw, &record)
1785 if err != nil {
1786 return fmt.Errorf("invalid record: %w", err)
1787 }
1788
1789 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1790 if err != nil {
1791 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1792 }
1793
1794 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1795 return fmt.Errorf("failed to validate labeldef: %w", err)
1796 }
1797
1798 _, err = db.AddLabelDefinition(i.Db, def)
1799 if err != nil {
1800 return fmt.Errorf("failed to create labeldef: %w", err)
1801 }
1802
1803 l.Info("ingested record")
1804 return nil
1805
1806 case jmodels.CommitOperationDelete:
1807 if err := db.DeleteLabelDefinition(
1808 i.Db,
1809 orm.FilterEq("did", did),
1810 orm.FilterEq("rkey", rkey),
1811 ); err != nil {
1812 return fmt.Errorf("failed to delete labeldef record: %w", err)
1813 }
1814
1815 l.Info("ingested record")
1816 return nil
1817 }
1818
1819 return nil
1820}
1821
1822func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1823 did := e.Did
1824 rkey := e.Commit.RKey
1825
1826 var err error
1827
1828 l = l.With("handler", "ingestLabelOp")
1829
1830 switch e.Commit.Operation {
1831 case jmodels.CommitOperationCreate:
1832 raw := json.RawMessage(e.Commit.Record)
1833 record := tangled.LabelOp{}
1834 err = json.Unmarshal(raw, &record)
1835 if err != nil {
1836 return fmt.Errorf("invalid record: %w", err)
1837 }
1838
1839 subject := syntax.ATURI(record.Subject)
1840 collection := subject.Collection()
1841
1842 var repo *models.Repo
1843 switch collection {
1844 case tangled.RepoIssueNSID:
1845 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1846 if err != nil || len(i) != 1 {
1847 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1848 }
1849 repo = i[0].Repo
1850 case tangled.RepoPullNSID:
1851 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1852 if err != nil || len(p) != 1 {
1853 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1854 }
1855 repo = p[0].Repo
1856 default:
1857 return fmt.Errorf("unsupported label subject: %s", collection)
1858 }
1859
1860 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1861 if err != nil {
1862 return fmt.Errorf("failed to build label application ctx: %w", err)
1863 }
1864
1865 ops := models.LabelOpsFromRecord(did, rkey, record)
1866
1867 for _, o := range ops {
1868 def, ok := actx.Defs[o.OperandKey]
1869 if !ok {
1870 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1871 }
1872 if err := i.Validator.ValidateLabelOp(ctx, def, repo, &o); err != nil {
1873 if !errors.Is(err, knotacl.ErrKnotUnreachable) {
1874 return fmt.Errorf("failed to validate labelop: %w", err)
1875 }
1876 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", err)
1877 }
1878 }
1879
1880 tx, err := i.Db.Begin()
1881 if err != nil {
1882 return err
1883 }
1884 defer tx.Rollback()
1885
1886 for _, o := range ops {
1887 _, err = db.AddLabelOp(tx, &o)
1888 if err != nil {
1889 return fmt.Errorf("failed to add labelop: %w", err)
1890 }
1891 }
1892
1893 if err = tx.Commit(); err != nil {
1894 return err
1895 }
1896
1897 l.Info("ingested record")
1898 }
1899
1900 return nil
1901}