Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/knotacl"
31 "tangled.org/core/appview/mentions"
32 "tangled.org/core/appview/models"
33 "tangled.org/core/appview/notify"
34 "tangled.org/core/appview/repoverify"
35 "tangled.org/core/appview/serververify"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/orm"
38 "tangled.org/core/rbac"
39)
40
41type Ingester struct {
42 Ctx context.Context
43 Db *db.DB
44 Enforcer *rbac.Enforcer
45 Acl *knotacl.Service
46 IdResolver *idresolver.Resolver
47 Cache *cache.Cache
48 Config *config.Config
49 Logger *slog.Logger
50 MentionsResolver *mentions.Resolver
51 Notifier notify.Notifier
52 Verifier repoverify.Verifier
53}
54
55type processFunc func(ctx context.Context, e *jmodels.Event) error
56
57func (i *Ingester) Ingest() processFunc {
58 return func(ctx context.Context, e *jmodels.Event) error {
59 var err error
60
61 l := i.Logger.With("kind", e.Kind)
62 switch e.Kind {
63 case jmodels.EventKindAccount:
64 // TODO: sync account state to db
65 if e.Account.Active {
66 break
67 }
68 // TODO: revoke sessions by DID
69 if *e.Account.Status == "deactivated" {
70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
71 }
72 case jmodels.EventKindIdentity:
73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
74 case jmodels.EventKindCommit:
75 l = l.With(
76 "nsid", e.Commit.Collection,
77 "did", e.Did,
78 "rkey", e.Commit.RKey,
79 "op", e.Commit.Operation,
80 )
81 switch e.Commit.Collection {
82 case tangled.GraphFollowNSID:
83 err = i.ingestFollow(e, l)
84 case tangled.GraphVouchNSID:
85 err = i.ingestVouch(ctx, e, l)
86 case tangled.FeedStarNSID:
87 err = i.ingestStar(ctx, e, l)
88 case tangled.FeedReactionNSID:
89 err = i.ingestReaction(e, l)
90 case tangled.PublicKeyNSID:
91 err = i.ingestPublicKey(e, l)
92 case tangled.RepoArtifactNSID:
93 err = i.ingestArtifact(ctx, e, l)
94 case tangled.ActorProfileNSID:
95 err = i.ingestProfile(ctx, e, l)
96 case tangled.SpindleMemberNSID:
97 err = i.ingestSpindleMember(ctx, e, l)
98 case tangled.SpindleNSID:
99 err = i.ingestSpindle(ctx, e, l)
100 case tangled.KnotMemberNSID:
101 err = i.ingestKnotMember(ctx, e, l)
102 case tangled.KnotNSID:
103 err = i.ingestKnot(ctx, e, l)
104 case tangled.StringNSID:
105 err = i.ingestString(e, l)
106 case tangled.RepoIssueNSID:
107 err = i.ingestIssue(ctx, e, l)
108 case tangled.RepoPullNSID:
109 err = i.ingestPull(ctx, e, l)
110 case tangled.FeedCommentNSID:
111 err = i.ingestComment(e, l)
112 case tangled.RepoIssueCommentNSID:
113 err = i.ingestIssueComment(e, l)
114 case tangled.RepoPullCommentNSID:
115 err = i.ingestPullComment(e, l)
116 case tangled.LabelDefinitionNSID:
117 err = i.ingestLabelDefinition(e, l)
118 case tangled.LabelOpNSID:
119 err = i.ingestLabelOp(ctx, e, l)
120 case tangled.RepoNSID:
121 err = i.ingestRepo(ctx, e, l)
122 }
123 }
124
125 if err != nil {
126 l.Warn("failed to ingest record, skipping", "err", err)
127 }
128
129 lastTimeUs := e.TimeUS + 1
130 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
131 l.Error("failed to save cursor", "err", saveErr)
132 }
133
134 return nil
135 }
136}
137
138func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
139 if strings.HasPrefix(ref, "did:") {
140 return db.GetRepoByDid(i.Db, ref)
141 }
142 return db.GetRepoByAtUri(i.Db, ref)
143}
144
145func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
146 var legacy struct {
147 Subject *string `json:"subject"`
148 SubjectDid *string `json:"subjectDid"`
149 }
150 if err := json.Unmarshal(raw, &legacy); err != nil {
151 return false, err
152 }
153
154 switch {
155 case legacy.SubjectDid != nil:
156 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
157 if err != nil {
158 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
159 return false, nil
160 }
161 star.SubjectType = models.StarSubjectRepo
162 star.Subject = repo.RepoDid
163 return true, nil
164
165 case legacy.Subject != nil:
166 uri, err := syntax.ParseATURI(*legacy.Subject)
167 if err != nil {
168 return false, fmt.Errorf("invalid old-format star subject: %w", err)
169 }
170 switch uri.Collection().String() {
171 case tangled.RepoNSID:
172 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
173 if err != nil {
174 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
175 return false, nil
176 }
177 star.SubjectType = models.StarSubjectRepo
178 star.Subject = repo.RepoDid
179 return true, nil
180 default:
181 star.SubjectType = models.StarSubjectString
182 star.Subject = *legacy.Subject
183 return true, nil
184 }
185
186 default:
187 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
188 }
189}
190
191func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
192 var err error
193 did := e.Did
194
195 l = l.With("handler", "ingestStar")
196
197 switch e.Commit.Operation {
198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
199 raw := json.RawMessage(e.Commit.Record)
200 record := tangled.FeedStar{}
201 unmarshalErr := json.Unmarshal(raw, &record)
202
203 star := models.Star{
204 Did: did,
205 Rkey: e.Commit.RKey,
206 }
207
208 switch {
209 case unmarshalErr != nil:
210 resolved, resolveErr := i.resolveOldFormatStar(raw, &star, l)
211 if resolveErr != nil {
212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
213 return unmarshalErr
214 }
215 if !resolved {
216 return nil
217 }
218
219 case record.Subject == nil:
220 return fmt.Errorf("star record has nil subject")
221
222 case record.Subject.FeedStar_Repo != nil:
223 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
224 if repoErr != nil {
225 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
226 return nil
227 }
228 star.SubjectType = models.StarSubjectRepo
229 star.Subject = repo.RepoDid
230
231 case record.Subject.FeedStar_String != nil:
232 star.SubjectType = models.StarSubjectString
233 star.Subject = record.Subject.FeedStar_String.Uri
234
235 default:
236 return fmt.Errorf("star record has empty subject union")
237 }
238
239 err = db.UpsertStar(i.Db, star)
240 case jmodels.CommitOperationDelete:
241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
242 }
243
244 if err != nil {
245 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
246 }
247 l.Info("processed star", "operation", e.Commit.Operation, "rkey", e.Commit.RKey)
248
249 l.Info("ingested record")
250 return nil
251}
252
253func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error {
254 var err error
255 did := e.Did
256
257 l = l.With("handler", "ingestFollow")
258
259 switch e.Commit.Operation {
260 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
261 raw := json.RawMessage(e.Commit.Record)
262 record := tangled.GraphFollow{}
263 err = json.Unmarshal(raw, &record)
264 if err != nil {
265 l.Error("invalid record", "err", err)
266 return err
267 }
268
269 err = db.UpsertFollow(i.Db, models.Follow{
270 UserDid: did,
271 SubjectDid: record.Subject,
272 Rkey: e.Commit.RKey,
273 })
274 case jmodels.CommitOperationDelete:
275 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
276 }
277
278 if err != nil {
279 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
280 }
281 l.Info("processed follow", "operation", e.Commit.Operation, "rkey", e.Commit.RKey)
282
283 l.Info("ingested record")
284 return nil
285}
286
287func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
288 var err error
289 did := e.Did
290
291 l = l.With("handler", "ingestVouch")
292
293 switch e.Commit.Operation {
294 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
295 raw := json.RawMessage(e.Commit.Record)
296 record := tangled.GraphVouch{}
297 err = json.Unmarshal(raw, &record)
298 if err != nil {
299 l.Error("invalid record", "err", err)
300 return err
301 }
302
303 // rkey is the subject_did being vouched for/denounced
304 subjectDID := e.Commit.RKey
305
306 _, err = syntax.ParseDID(subjectDID)
307 if err != nil {
308 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
309 return fmt.Errorf("invalid subject_did: %w", err)
310 }
311
312 if did == subjectDID {
313 l.Warn("attempted self-vouch", "did", did)
314 return fmt.Errorf("cannot vouch for self")
315 }
316
317 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
318 if err != nil {
319 return err
320 }
321
322 if subjectId.Handle.IsInvalidHandle() {
323 return err
324 }
325
326 kind, err := models.ParseVouchKind(record.Kind)
327 if err != nil {
328 l.Error("invalid kind", "kind", kind)
329 return fmt.Errorf("invalid kind: %s", kind)
330 }
331
332 recordCid, err := cid.Parse(e.Commit.CID)
333 if err != nil {
334 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
335 return fmt.Errorf("invalid cid: %w", err)
336 }
337
338 var evidences []syntax.ATURI
339 for _, raw := range record.Evidences {
340 uri, parseErr := syntax.ParseATURI(raw)
341 if parseErr != nil {
342 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
343 continue
344 }
345 evidences = append(evidences, uri)
346 }
347
348 tx, txErr := i.Db.Begin()
349 if txErr != nil {
350 return fmt.Errorf("failed to start transaction: %w", txErr)
351 }
352
353 addErr := db.AddVouch(tx, &models.Vouch{
354 Did: syntax.DID(did),
355 SubjectDid: subjectId.DID,
356 Cid: recordCid,
357 Kind: kind,
358 Reason: record.Reason,
359 Evidences: evidences,
360 })
361 if addErr != nil {
362 tx.Rollback()
363 err = addErr
364 } else {
365 err = tx.Commit()
366 }
367
368 case jmodels.CommitOperationDelete:
369 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
370 }
371
372 if err != nil {
373 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
374 }
375
376 l.Info("ingested record")
377 return nil
378}
379
380func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error {
381 did := e.Did
382 var err error
383
384 l = l.With("handler", "ingestPublicKey")
385
386 switch e.Commit.Operation {
387 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
388 l.Debug("processing add of pubkey")
389 raw := json.RawMessage(e.Commit.Record)
390 record := tangled.PublicKey{}
391 err = json.Unmarshal(raw, &record)
392 if err != nil {
393 l.Error("invalid record", "err", err)
394 return err
395 }
396 pubKey, err := models.PublicKeyFromRecord(syntax.DID(did), syntax.RecordKey(e.Commit.RKey), record)
397 if err != nil {
398 l.Error("invalid record", "err", err)
399 return err
400 }
401 if err := pubKey.Validate(); err != nil {
402 l.Error("invalid record", "err", err)
403 return err
404 }
405
406 err = db.UpsertPublicKey(i.Db, pubKey)
407 case jmodels.CommitOperationDelete:
408 l.Debug("processing delete of pubkey")
409 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
410 }
411
412 if err != nil {
413 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
414 }
415 l.Info("processed pubkey", "operation", e.Commit.Operation, "rkey", e.Commit.RKey)
416
417 l.Info("ingested record")
418 return nil
419}
420
421func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
422 did := e.Did
423 var err error
424
425 l = l.With("handler", "ingestArtifact")
426
427 switch e.Commit.Operation {
428 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
429 raw := json.RawMessage(e.Commit.Record)
430 record := tangled.RepoArtifact{}
431 err = json.Unmarshal(raw, &record)
432 if err != nil {
433 l.Error("invalid record", "err", err)
434 return err
435 }
436
437 var repo *models.Repo
438 if record.RepoDid != nil && *record.RepoDid != "" {
439 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
440 if err != nil && !errors.Is(err, sql.ErrNoRows) {
441 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
442 }
443 }
444 if repo == nil && record.Repo != nil {
445 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
446 if parseErr != nil {
447 return parseErr
448 }
449 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
450 if err != nil {
451 return err
452 }
453 }
454 if repo == nil {
455 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
456 }
457
458 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push")
459 if permErr != nil {
460 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr)
461 } else if !allowed {
462 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier())
463 return nil
464 }
465
466 repoDid := repo.RepoDid
467 if repoDid == "" && record.RepoDid != nil {
468 repoDid = *record.RepoDid
469 }
470 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
471 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
472 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
473 }
474 }
475
476 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
477 if parseErr != nil {
478 createdAt = time.Now()
479 }
480
481 artifact := models.Artifact{
482 Did: did,
483 Rkey: e.Commit.RKey,
484 RepoDid: syntax.DID(repo.RepoDid),
485 Tag: plumbing.Hash(record.Tag),
486 CreatedAt: createdAt,
487 BlobCid: cid.Cid(record.Artifact.Ref),
488 Name: record.Name,
489 Size: uint64(record.Artifact.Size),
490 MimeType: record.Artifact.MimeType,
491 }
492
493 err = db.AddArtifact(i.Db, artifact)
494 case jmodels.CommitOperationDelete:
495 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
496 }
497
498 if err != nil {
499 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
500 }
501
502 l.Info("ingested record")
503 return nil
504}
505
506func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
507 did := e.Did
508 var err error
509
510 l = l.With("handler", "ingestProfile")
511
512 if e.Commit.RKey != "self" {
513 return fmt.Errorf("ingestProfile only ingests `self` record")
514 }
515
516 switch e.Commit.Operation {
517 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
518 raw := json.RawMessage(e.Commit.Record)
519 record := tangled.ActorProfile{}
520 err = json.Unmarshal(raw, &record)
521 if err != nil {
522 l.Error("invalid record", "err", err)
523 return err
524 }
525
526 avatar := ""
527 if record.Avatar != nil {
528 avatar = record.Avatar.Ref.String()
529 }
530
531 description := ""
532 if record.Description != nil {
533 description = *record.Description
534 }
535
536 includeBluesky := record.Bluesky
537
538 pronouns := ""
539 if record.Pronouns != nil {
540 pronouns = *record.Pronouns
541 }
542
543 location := ""
544 if record.Location != nil {
545 location = *record.Location
546 }
547
548 var links [5]string
549 for i, l := range record.Links {
550 if i < 5 {
551 links[i] = l
552 }
553 }
554
555 var stats [2]models.VanityStat
556 for i, s := range record.Stats {
557 if i < 2 {
558 stats[i].Kind = models.ParseVanityStatKind(s)
559 }
560 }
561
562 var pinned [6]string
563 for i, r := range record.PinnedRepositories {
564 if i < 6 {
565 pinned[i] = r
566 }
567 }
568
569 var preferredHandle syntax.Handle
570 if record.PreferredHandle != nil {
571 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
572 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
573 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
574 preferredHandle = h
575 }
576 }
577 }
578
579 profile := models.Profile{
580 Did: did,
581 Avatar: avatar,
582 Description: description,
583 IncludeBluesky: includeBluesky,
584 Location: location,
585 Links: links,
586 Stats: stats,
587 PinnedRepos: pinned,
588 Pronouns: pronouns,
589 PreferredHandle: preferredHandle,
590 }
591
592 tx, err := i.Db.Begin()
593 if err != nil {
594 return fmt.Errorf("failed to start transaction: %w", err)
595 }
596 defer tx.Rollback()
597
598 err = db.ValidateProfile(tx, &profile)
599 if err != nil {
600 return fmt.Errorf("invalid profile record")
601 }
602
603 err = db.UpsertProfile(tx, &profile)
604 if err != nil {
605 return fmt.Errorf("upserting profile: %w", err)
606 }
607
608 err = tx.Commit()
609 if err != nil {
610 return fmt.Errorf("tx.Commit: %w", err)
611 }
612 if i.Cache != nil {
613 pipe := i.Cache.Pipeline()
614 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
615 if preferredHandle != "" {
616 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
617 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
618 } else {
619 pipe.Del(ctx, didKey)
620 }
621 if _, execErr := pipe.Exec(ctx); execErr != nil {
622 l.Warn("failed to update preferred handle cache", "err", execErr)
623 }
624 }
625 case jmodels.CommitOperationDelete:
626 tx, beginErr := i.Db.Begin()
627 if beginErr != nil {
628 return fmt.Errorf("failed to start transaction: %w", beginErr)
629 }
630
631 priorHandle, phErr := db.GetPreferredHandle(tx, did)
632 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
633 l.Warn("failed to read prior preferred handle", "err", phErr)
634 }
635
636 err = db.DeleteProfile(tx, did)
637 if err == nil && i.Cache != nil {
638 pipe := i.Cache.Pipeline()
639 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
640 if priorHandle != "" {
641 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
642 }
643 if _, execErr := pipe.Exec(ctx); execErr != nil {
644 l.Warn("failed to evict preferred handle cache", "err", execErr)
645 }
646 }
647 }
648
649 if err != nil {
650 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
651 }
652
653 l.Info("ingested record")
654 return nil
655}
656
657func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
658 did := e.Did
659 var err error
660
661 l = l.With("handler", "ingestSpindleMember")
662
663 switch e.Commit.Operation {
664 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
665 raw := json.RawMessage(e.Commit.Record)
666 record := tangled.SpindleMember{}
667 err = json.Unmarshal(raw, &record)
668 if err != nil {
669 l.Error("invalid record", "err", err)
670 return err
671 }
672
673 // only spindle owner can invite to spindles
674 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
675 if err != nil {
676 return fmt.Errorf("failed to check invite permission: %w", err)
677 }
678 if !ok {
679 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
680 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
681 }
682 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
683 if err != nil {
684 return fmt.Errorf("failed to re-check invite permission: %w", err)
685 }
686 if !ok {
687 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
688 }
689 }
690
691 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
692 if err != nil {
693 return err
694 }
695
696 if memberId.Handle.IsInvalidHandle() {
697 return fmt.Errorf("invalid handle for member %s", record.Subject)
698 }
699
700 existing, err := db.GetSpindleMembers(i.Db,
701 orm.FilterEq("did", did),
702 orm.FilterEq("rkey", e.Commit.RKey),
703 )
704 if err != nil {
705 return fmt.Errorf("failed to look up existing member: %w", err)
706 }
707 if len(existing) > 1 {
708 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
709 }
710
711 tx, err := i.Db.Begin()
712 if err != nil {
713 return fmt.Errorf("failed to start txn: %w", err)
714 }
715 committed := false
716 defer func() {
717 if committed {
718 return
719 }
720 tx.Rollback()
721 i.Enforcer.E.LoadPolicy()
722 }()
723
724 if len(existing) == 1 {
725 prev := existing[0]
726 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
727 if err = db.RemoveSpindleMember(tx,
728 orm.FilterEq("did", did),
729 orm.FilterEq("rkey", e.Commit.RKey),
730 ); err != nil {
731 return fmt.Errorf("failed to remove stale row: %w", err)
732 }
733 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
734 return fmt.Errorf("failed to remove stale ACL: %w", err)
735 }
736 }
737 }
738
739 if err = db.AddSpindleMember(tx, models.SpindleMember{
740 Did: syntax.DID(did),
741 Rkey: e.Commit.RKey,
742 Instance: record.Instance,
743 Subject: memberId.DID,
744 }); err != nil {
745 return fmt.Errorf("failed to add to db: %w", err)
746 }
747
748 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
749 return fmt.Errorf("failed to update ACLs: %w", err)
750 }
751
752 if err = tx.Commit(); err != nil {
753 return fmt.Errorf("failed to commit txn: %w", err)
754 }
755
756 if err = i.Enforcer.E.SavePolicy(); err != nil {
757 return fmt.Errorf("failed to save ACLs: %w", err)
758 }
759 committed = true
760
761 l.Info("upserted spindle member")
762 case jmodels.CommitOperationDelete:
763 rkey := e.Commit.RKey
764
765 // get record from db first
766 members, err := db.GetSpindleMembers(
767 i.Db,
768 orm.FilterEq("did", did),
769 orm.FilterEq("rkey", rkey),
770 )
771 if err != nil || len(members) != 1 {
772 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
773 }
774 member := members[0]
775
776 tx, err := i.Db.Begin()
777 if err != nil {
778 return fmt.Errorf("failed to start txn: %w", err)
779 }
780 committed := false
781 defer func() {
782 if committed {
783 return
784 }
785 tx.Rollback()
786 i.Enforcer.E.LoadPolicy()
787 }()
788
789 // remove record by rkey && update enforcer
790 if err = db.RemoveSpindleMember(
791 tx,
792 orm.FilterEq("did", did),
793 orm.FilterEq("rkey", rkey),
794 ); err != nil {
795 return fmt.Errorf("failed to remove from db: %w", err)
796 }
797
798 // update enforcer
799 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
800 if err != nil {
801 return fmt.Errorf("failed to update ACLs: %w", err)
802 }
803
804 if err = tx.Commit(); err != nil {
805 return fmt.Errorf("failed to commit txn: %w", err)
806 }
807
808 if err = i.Enforcer.E.SavePolicy(); err != nil {
809 return fmt.Errorf("failed to save ACLs: %w", err)
810 }
811 committed = true
812
813 l.Info("removed spindle member")
814 }
815
816 return nil
817}
818
819func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
820 did := e.Did
821 var err error
822
823 l = l.With("handler", "ingestSpindle")
824
825 switch e.Commit.Operation {
826 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
827 raw := json.RawMessage(e.Commit.Record)
828 record := tangled.Spindle{}
829 err = json.Unmarshal(raw, &record)
830 if err != nil {
831 l.Error("invalid record", "err", err)
832 return err
833 }
834
835 instance := e.Commit.RKey
836
837 err := db.AddSpindle(i.Db, models.Spindle{
838 Owner: syntax.DID(did),
839 Instance: instance,
840 })
841 if err != nil {
842 l.Error("failed to add spindle to db", "err", err, "instance", instance)
843 return err
844 }
845
846 if err := i.verifySpindle(ctx, instance, did); err != nil {
847 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
848 }
849
850 l.Info("ingested record", "instance", instance)
851 return nil
852
853 case jmodels.CommitOperationDelete:
854 instance := e.Commit.RKey
855
856 // get record from db first
857 spindles, err := db.GetSpindles(
858 ctx,
859 i.Db,
860 orm.FilterEq("owner", did),
861 orm.FilterEq("instance", instance),
862 )
863 if err != nil || len(spindles) != 1 {
864 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
865 }
866 spindle := spindles[0]
867
868 tx, err := i.Db.Begin()
869 if err != nil {
870 return fmt.Errorf("failed to start txn: %w", err)
871 }
872 defer func() {
873 tx.Rollback()
874 i.Enforcer.E.LoadPolicy()
875 }()
876
877 // remove spindle members first
878 err = db.RemoveSpindleMember(
879 tx,
880 orm.FilterEq("owner", did),
881 orm.FilterEq("instance", instance),
882 )
883 if err != nil {
884 return fmt.Errorf("failed to remove spindle members: %w", err)
885 }
886
887 err = db.DeleteSpindle(
888 tx,
889 orm.FilterEq("owner", did),
890 orm.FilterEq("instance", instance),
891 )
892 if err != nil {
893 return fmt.Errorf("failed to delete spindle: %w", err)
894 }
895
896 if spindle.Verified != nil {
897 err = i.Enforcer.RemoveSpindle(instance)
898 if err != nil {
899 return fmt.Errorf("failed to remove spindle from enforcer: %w", err)
900 }
901 }
902
903 err = tx.Commit()
904 if err != nil {
905 return fmt.Errorf("failed to commit txn: %w", err)
906 }
907
908 err = i.Enforcer.E.SavePolicy()
909 if err != nil {
910 return fmt.Errorf("failed to save ACLs: %w", err)
911 }
912
913 l.Info("ingested record", "instance", instance)
914 }
915
916 return nil
917}
918
919func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error {
920 did := e.Did
921 rkey := e.Commit.RKey
922
923 var err error
924
925 l = l.With("handler", "ingestString")
926
927 switch e.Commit.Operation {
928 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
929 raw := json.RawMessage(e.Commit.Record)
930 record := tangled.String{}
931 err = json.Unmarshal(raw, &record)
932 if err != nil {
933 l.Error("invalid record", "err", err)
934 return err
935 }
936
937 string := models.StringFromRecord(did, rkey, record)
938
939 if err = string.Validate(); err != nil {
940 l.Error("invalid record", "err", err)
941 return err
942 }
943
944 if err = db.AddString(i.Db, string); err != nil {
945 l.Error("failed to add string", "err", err)
946 return err
947 }
948
949 l.Info("ingested record")
950 return nil
951
952 case jmodels.CommitOperationDelete:
953 if err := db.DeleteString(
954 i.Db,
955 orm.FilterEq("did", did),
956 orm.FilterEq("rkey", rkey),
957 ); err != nil {
958 l.Error("failed to delete", "err", err)
959 return fmt.Errorf("failed to delete string record: %w", err)
960 }
961
962 l.Info("ingested record")
963 return nil
964 }
965
966 return nil
967}
968
969func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
970 did := e.Did
971 var err error
972
973 l = l.With("handler", "ingestKnotMember")
974
975 switch e.Commit.Operation {
976 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
977 raw := json.RawMessage(e.Commit.Record)
978 record := tangled.KnotMember{}
979 err = json.Unmarshal(raw, &record)
980 if err != nil {
981 l.Error("invalid record", "err", err)
982 return err
983 }
984
985 // only knot owner can invite to knots
986 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
987 if err != nil {
988 return fmt.Errorf("failed to check invite permission: %w", err)
989 }
990 if !ok {
991 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
992 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
993 }
994 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
995 if err != nil {
996 return fmt.Errorf("failed to re-check invite permission: %w", err)
997 }
998 if !ok {
999 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
1000 }
1001 }
1002
1003 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
1004 if err != nil {
1005 return err
1006 }
1007
1008 if memberId.Handle.IsInvalidHandle() {
1009 return fmt.Errorf("invalid handle for member %s", record.Subject)
1010 }
1011
1012 existing, err := db.GetKnotMembers(i.Db,
1013 orm.FilterEq("did", did),
1014 orm.FilterEq("rkey", e.Commit.RKey),
1015 )
1016 if err != nil {
1017 return fmt.Errorf("failed to look up existing member: %w", err)
1018 }
1019 if len(existing) > 1 {
1020 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1021 }
1022
1023 tx, err := i.Db.Begin()
1024 if err != nil {
1025 return fmt.Errorf("failed to start txn: %w", err)
1026 }
1027 committed := false
1028 defer func() {
1029 if committed {
1030 return
1031 }
1032 tx.Rollback()
1033 i.Enforcer.E.LoadPolicy()
1034 }()
1035
1036 if len(existing) == 1 {
1037 prev := existing[0]
1038 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1039 if err = db.RemoveKnotMember(tx,
1040 orm.FilterEq("did", did),
1041 orm.FilterEq("rkey", e.Commit.RKey),
1042 ); err != nil {
1043 return fmt.Errorf("failed to remove stale row: %w", err)
1044 }
1045 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1046 return fmt.Errorf("failed to remove stale ACL: %w", err)
1047 }
1048 }
1049 }
1050
1051 if err = db.AddKnotMember(tx, models.KnotMember{
1052 Did: syntax.DID(did),
1053 Rkey: e.Commit.RKey,
1054 Domain: record.Domain,
1055 Subject: memberId.DID,
1056 }); err != nil {
1057 return fmt.Errorf("failed to add to db: %w", err)
1058 }
1059
1060 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1061 return fmt.Errorf("failed to update ACLs: %w", err)
1062 }
1063
1064 if err = tx.Commit(); err != nil {
1065 return fmt.Errorf("failed to commit txn: %w", err)
1066 }
1067
1068 if err = i.Enforcer.E.SavePolicy(); err != nil {
1069 return fmt.Errorf("failed to save ACLs: %w", err)
1070 }
1071 committed = true
1072
1073 l.Info("upserted knot member")
1074 case jmodels.CommitOperationDelete:
1075 rkey := e.Commit.RKey
1076
1077 members, err := db.GetKnotMembers(
1078 i.Db,
1079 orm.FilterEq("did", did),
1080 orm.FilterEq("rkey", rkey),
1081 )
1082 if err != nil {
1083 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1084 }
1085 if len(members) == 0 {
1086 l.Info("knot member already removed", "rkey", rkey)
1087 return nil
1088 }
1089 if len(members) > 1 {
1090 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1091 }
1092 member := members[0]
1093
1094 tx, err := i.Db.Begin()
1095 if err != nil {
1096 return fmt.Errorf("failed to start txn: %w", err)
1097 }
1098 committed := false
1099 defer func() {
1100 if committed {
1101 return
1102 }
1103 tx.Rollback()
1104 i.Enforcer.E.LoadPolicy()
1105 }()
1106
1107 if err = db.RemoveKnotMember(
1108 tx,
1109 orm.FilterEq("did", did),
1110 orm.FilterEq("rkey", rkey),
1111 ); err != nil {
1112 return fmt.Errorf("failed to remove from db: %w", err)
1113 }
1114
1115 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1116 return fmt.Errorf("failed to update ACLs: %w", err)
1117 }
1118
1119 if err = tx.Commit(); err != nil {
1120 return fmt.Errorf("failed to commit txn: %w", err)
1121 }
1122
1123 if err = i.Enforcer.E.SavePolicy(); err != nil {
1124 return fmt.Errorf("failed to save ACLs: %w", err)
1125 }
1126 committed = true
1127
1128 l.Info("removed knot member")
1129 }
1130
1131 return nil
1132}
1133
1134func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1135 did := e.Did
1136 var err error
1137
1138 l = l.With("handler", "ingestKnot")
1139
1140 switch e.Commit.Operation {
1141 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1142 raw := json.RawMessage(e.Commit.Record)
1143 record := tangled.Knot{}
1144 err = json.Unmarshal(raw, &record)
1145 if err != nil {
1146 l.Error("invalid record", "err", err)
1147 return err
1148 }
1149
1150 domain := e.Commit.RKey
1151
1152 err := db.AddKnot(i.Db, domain, did)
1153 if err != nil {
1154 l.Error("failed to add knot to db", "err", err, "domain", domain)
1155 return err
1156 }
1157
1158 if err := i.verifyKnot(ctx, domain, did); err != nil {
1159 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1160 }
1161
1162 l.Info("ingested record", "domain", domain)
1163 return nil
1164
1165 case jmodels.CommitOperationDelete:
1166 domain := e.Commit.RKey
1167
1168 // get record from db first
1169 registrations, err := db.GetRegistrations(
1170 i.Db,
1171 orm.FilterEq("domain", domain),
1172 orm.FilterEq("did", did),
1173 )
1174 if err != nil {
1175 return fmt.Errorf("failed to get registration: %w", err)
1176 }
1177 if len(registrations) != 1 {
1178 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1179 }
1180 registration := registrations[0]
1181
1182 tx, err := i.Db.Begin()
1183 if err != nil {
1184 return fmt.Errorf("failed to start txn: %w", err)
1185 }
1186 defer func() {
1187 tx.Rollback()
1188 i.Enforcer.E.LoadPolicy()
1189 }()
1190
1191 err = db.RemoveKnotMember(
1192 tx,
1193 orm.FilterEq("did", did),
1194 orm.FilterEq("domain", domain),
1195 )
1196 if err != nil {
1197 return fmt.Errorf("failed to remove knot members: %w", err)
1198 }
1199
1200 err = db.DeleteKnot(
1201 tx,
1202 orm.FilterEq("did", did),
1203 orm.FilterEq("domain", domain),
1204 )
1205 if err != nil {
1206 return fmt.Errorf("failed to delete knot: %w", err)
1207 }
1208
1209 err = db.RemoveReposByKnot(tx, domain)
1210 if err != nil {
1211 return fmt.Errorf("failed to remove repos by knot: %w", err)
1212 }
1213
1214 if registration.Registered != nil {
1215 err = i.Enforcer.RemoveKnot(domain)
1216 if err != nil {
1217 return fmt.Errorf("failed to remove knot from enforcer: %w", err)
1218 }
1219 }
1220
1221 err = tx.Commit()
1222 if err != nil {
1223 return fmt.Errorf("failed to commit txn: %w", err)
1224 }
1225
1226 err = i.Enforcer.E.SavePolicy()
1227 if err != nil {
1228 return fmt.Errorf("failed to save ACLs: %w", err)
1229 }
1230
1231 l.Info("ingested record", "domain", domain)
1232 }
1233
1234 return nil
1235}
1236
1237const (
1238 verifyAttempts = 4
1239 verifyMinDelay = 1 * time.Second
1240 verifyMaxDelay = 5 * time.Second
1241)
1242
1243func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1244 regs, err := db.GetRegistrations(i.Db,
1245 orm.FilterEq("domain", domain),
1246 orm.FilterEq("did", did),
1247 )
1248 if err != nil {
1249 return fmt.Errorf("look up registration: %w", err)
1250 }
1251 if len(regs) != 1 {
1252 return fmt.Errorf("no registration for %s by %s", domain, did)
1253 }
1254 if regs[0].Registered != nil {
1255 return nil
1256 }
1257
1258 err = retry.Do(
1259 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1260 retry.Context(ctx),
1261 retry.Attempts(verifyAttempts),
1262 retry.Delay(verifyMinDelay),
1263 retry.MaxDelay(verifyMaxDelay),
1264 retry.DelayType(retry.BackOffDelay),
1265 retry.LastErrorOnly(true),
1266 )
1267 if err != nil {
1268 return fmt.Errorf("verify: %w", err)
1269 }
1270 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1271}
1272
1273func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1274 spindles, err := db.GetSpindles(ctx, i.Db,
1275 orm.FilterEq("instance", instance),
1276 orm.FilterEq("owner", did),
1277 )
1278 if err != nil {
1279 return fmt.Errorf("look up spindle: %w", err)
1280 }
1281 if len(spindles) != 1 {
1282 return fmt.Errorf("no spindle for %s by %s", instance, did)
1283 }
1284 if spindles[0].Verified != nil {
1285 return nil
1286 }
1287
1288 err = retry.Do(
1289 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1290 retry.Context(ctx),
1291 retry.Attempts(verifyAttempts),
1292 retry.Delay(verifyMinDelay),
1293 retry.MaxDelay(verifyMaxDelay),
1294 retry.DelayType(retry.BackOffDelay),
1295 retry.LastErrorOnly(true),
1296 )
1297 if err != nil {
1298 return fmt.Errorf("verify: %w", err)
1299 }
1300 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1301 return err
1302}
1303
1304const sweepConcurrency = 4
1305
1306func (i *Ingester) SweepPendingVerifications() {
1307 l := i.Logger.With("handler", "SweepPendingVerifications")
1308
1309 var g errgroup.Group
1310 g.SetLimit(sweepConcurrency)
1311
1312 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1313 if err != nil {
1314 l.Error("failed to list unverified knots", "err", err)
1315 } else {
1316 for _, reg := range regs {
1317 g.Go(func() error {
1318 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1319 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1320 }
1321 return nil
1322 })
1323 }
1324 }
1325
1326 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1327 if err != nil {
1328 l.Error("failed to list unverified spindles", "err", err)
1329 g.Wait()
1330 return
1331 }
1332 for _, s := range spindles {
1333 g.Go(func() error {
1334 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1335 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1336 }
1337 return nil
1338 })
1339 }
1340 g.Wait()
1341}
1342
1343func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1344 did := e.Did
1345 rkey := e.Commit.RKey
1346
1347 var err error
1348
1349 l = l.With("handler", "ingestIssue")
1350
1351 switch e.Commit.Operation {
1352 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1353 raw := json.RawMessage(e.Commit.Record)
1354 record := tangled.RepoIssue{}
1355 err = json.Unmarshal(raw, &record)
1356 if err != nil {
1357 l.Error("invalid record", "err", err)
1358 return err
1359 }
1360
1361 issue := models.IssueFromRecord(did, rkey, record)
1362
1363 if issue.RepoDid == "" {
1364 return fmt.Errorf("issue record has no repo field")
1365 }
1366 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1367 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1368 }
1369
1370 if err := issue.Validate(); err != nil {
1371 return fmt.Errorf("failed to validate issue: %w", err)
1372 }
1373
1374 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1375 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1376 if repoErr == nil && repo.RepoDid != "" {
1377 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1378 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1379 }
1380 }
1381 }
1382
1383 tx, err := i.Db.BeginTx(ctx, nil)
1384 if err != nil {
1385 l.Error("failed to begin transaction", "err", err)
1386 return err
1387 }
1388 defer tx.Rollback()
1389
1390 err = db.PutIssue(tx, &issue)
1391 if err != nil {
1392 l.Error("failed to create issue", "err", err)
1393 return err
1394 }
1395
1396 err = tx.Commit()
1397 if err != nil {
1398 l.Error("failed to commit txn", "err", err)
1399 return err
1400 }
1401
1402 l.Info("ingested record")
1403 return nil
1404
1405 case jmodels.CommitOperationDelete:
1406 tx, err := i.Db.BeginTx(ctx, nil)
1407 if err != nil {
1408 l.Error("failed to begin transaction", "err", err)
1409 return err
1410 }
1411 defer tx.Rollback()
1412
1413 if err := db.DeleteIssues(
1414 tx,
1415 did,
1416 rkey,
1417 ); err != nil {
1418 l.Error("failed to delete", "err", err)
1419 return fmt.Errorf("failed to delete issue record: %w", err)
1420 }
1421 if err := tx.Commit(); err != nil {
1422 l.Error("failed to commit txn", "err", err)
1423 return err
1424 }
1425
1426 l.Info("ingested record")
1427 return nil
1428 }
1429
1430 return nil
1431}
1432
1433func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1434 did := e.Did
1435 rkey := e.Commit.RKey
1436
1437 var err error
1438
1439 l = l.With("handler", "ingestPull")
1440
1441 switch e.Commit.Operation {
1442 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1443 raw := json.RawMessage(e.Commit.Record)
1444 record := tangled.RepoPull{}
1445 err = json.Unmarshal(raw, &record)
1446 if err != nil {
1447 l.Error("invalid record", "err", err)
1448 return err
1449 }
1450
1451 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1452 if err != nil {
1453 l.Error("failed to resolve did", "err", err)
1454 return err
1455 }
1456
1457 // go through and fetch all blobs in parallel
1458 readers := make([]*io.ReadCloser, len(record.Rounds))
1459 var mu sync.Mutex
1460
1461 g, gctx := errgroup.WithContext(ctx)
1462
1463 for idx, b := range record.Rounds {
1464 g.Go(func() error {
1465 // for some reason, a blob is empty
1466 if b.PatchBlob == nil {
1467 return fmt.Errorf("missing patchBlob in round %d", idx)
1468 }
1469
1470 ownerPds := ownerId.PDSEndpoint()
1471 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1472 q := url.Query()
1473 q.Set("cid", b.PatchBlob.Ref.String())
1474 q.Set("did", did)
1475 url.RawQuery = q.Encode()
1476
1477 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1478 if err != nil {
1479 l.Error("failed to create request")
1480 return err
1481 }
1482 req.Header.Set("Content-Type", "application/json")
1483
1484 resp, err := http.DefaultClient.Do(req)
1485 if err != nil {
1486 l.Error("failed to make request")
1487 return err
1488 }
1489
1490 mu.Lock()
1491 readers[idx] = &resp.Body
1492 mu.Unlock()
1493
1494 return nil
1495 })
1496 }
1497
1498 if err := g.Wait(); err != nil {
1499 for _, r := range readers {
1500 if r != nil && *r != nil {
1501 (*r).Close()
1502 }
1503 }
1504 return err
1505 }
1506
1507 defer func() {
1508 for _, r := range readers {
1509 if r != nil && *r != nil {
1510 (*r).Close()
1511 }
1512 }
1513 }()
1514
1515 pull, err := models.PullFromRecord(did, rkey, record, readers)
1516 if err != nil {
1517 return fmt.Errorf("failed to parse pull from record: %w", err)
1518 }
1519 if err := pull.Validate(); err != nil {
1520 return fmt.Errorf("failed to validate pull: %w", err)
1521 }
1522 if pull.DependentOn != nil {
1523 if err := func() error {
1524 dependentPull, err := db.GetPull(
1525 i.Db,
1526 orm.FilterEq("dependent_on", pull.DependentOn.String()),
1527 )
1528 if errors.Is(err, sql.ErrNoRows) {
1529 return nil
1530 }
1531 if err != nil {
1532 return fmt.Errorf("failed to fetch pulls with same dependency: %w", err)
1533 }
1534 if dependentPull.AtUri() == pull.AtUri() {
1535 return nil
1536 }
1537 return fmt.Errorf("another pull already depends on %s, which would form a DAG, this is presently disallowed", pull.DependentOn.String())
1538 }(); err != nil {
1539 return fmt.Errorf("failed to validate pull stack: %w", err)
1540 }
1541 }
1542
1543 tx, err := i.Db.BeginTx(ctx, nil)
1544 if err != nil {
1545 l.Error("failed to begin transaction", "err", err)
1546 return err
1547 }
1548 defer tx.Rollback()
1549
1550 err = db.PutPull(tx, pull)
1551 if err != nil {
1552 l.Error("failed to create pull", "err", err)
1553 return err
1554 }
1555
1556 err = tx.Commit()
1557 if err != nil {
1558 l.Error("failed to commit txn", "err", err)
1559 return err
1560 }
1561
1562 l.Info("ingested record")
1563 return nil
1564
1565 case jmodels.CommitOperationDelete:
1566 tx, err := i.Db.BeginTx(ctx, nil)
1567 if err != nil {
1568 l.Error("failed to begin transaction", "err", err)
1569 return err
1570 }
1571 defer tx.Rollback()
1572
1573 if err := db.AbandonPulls(
1574 tx,
1575 orm.FilterEq("owner_did", did),
1576 orm.FilterEq("rkey", rkey),
1577 ); err != nil {
1578 l.Error("failed to abandon", "err", err)
1579 return fmt.Errorf("failed to abandon pull record: %w", err)
1580 }
1581 if err := tx.Commit(); err != nil {
1582 l.Error("failed to commit txn", "err", err)
1583 return err
1584 }
1585
1586 l.Info("ingested record")
1587 return nil
1588 }
1589
1590 return nil
1591}
1592
1593// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1594func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error {
1595 l = l.With("handler", "ingestIssueComment")
1596
1597 switch e.Commit.Operation {
1598 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1599 // no-op. sh.tangled.repo.issue.comment is deprecated
1600
1601 case jmodels.CommitOperationDelete:
1602 if err := db.PurgeComments(
1603 i.Db,
1604 orm.FilterEq("did", e.Did),
1605 orm.FilterEq("collection", e.Commit.Collection),
1606 orm.FilterEq("rkey", e.Commit.RKey),
1607 ); err != nil {
1608 return fmt.Errorf("failed to delete comment record: %w", err)
1609 }
1610 }
1611
1612 l.Info("ingested record")
1613 return nil
1614}
1615
1616// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1617func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error {
1618 l = l.With("handler", "ingestPullComment")
1619
1620 switch e.Commit.Operation {
1621 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1622 // no-op. sh.tangled.repo.pull.comment is deprecated
1623
1624 case jmodels.CommitOperationDelete:
1625 if err := db.PurgeComments(
1626 i.Db,
1627 orm.FilterEq("did", e.Did),
1628 orm.FilterEq("collection", e.Commit.Collection),
1629 orm.FilterEq("rkey", e.Commit.RKey),
1630 ); err != nil {
1631 return fmt.Errorf("failed to delete comment record: %w", err)
1632 }
1633 }
1634
1635 l.Info("ingested record")
1636 return nil
1637}
1638
1639func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error {
1640 did := e.Did
1641 rkey := e.Commit.RKey
1642 cid := e.Commit.CID
1643
1644 var err error
1645
1646 l = l.With("handler", "ingestComment")
1647
1648 ctx := context.Background()
1649
1650 switch e.Commit.Operation {
1651 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1652 raw := json.RawMessage(e.Commit.Record)
1653 record := tangled.FeedComment{}
1654 err = json.Unmarshal(raw, &record)
1655 if err != nil {
1656 return fmt.Errorf("invalid record: %w", err)
1657 }
1658
1659 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1660 if err != nil {
1661 return fmt.Errorf("failed to parse comment from record: %w", err)
1662 }
1663
1664 if err := comment.Validate(); err != nil {
1665 return fmt.Errorf("failed to validate comment: %w", err)
1666 }
1667
1668 var references []syntax.ATURI
1669 if comment.Body.Original != nil {
1670 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1671 }
1672
1673 tx, err := i.Db.Begin()
1674 if err != nil {
1675 return fmt.Errorf("failed to start transaction: %w", err)
1676 }
1677 defer tx.Rollback()
1678
1679 _, err = db.PutComment(tx, comment, references)
1680 if err != nil {
1681 return fmt.Errorf("failed to create comment: %w", err)
1682 }
1683
1684 if err := tx.Commit(); err != nil {
1685 return err
1686 }
1687
1688 case jmodels.CommitOperationDelete:
1689 if err := db.DeleteComments(
1690 i.Db,
1691 orm.FilterEq("did", did),
1692 orm.FilterEq("collection", e.Commit.Collection),
1693 orm.FilterEq("rkey", rkey),
1694 ); err != nil {
1695 return fmt.Errorf("failed to delete comment record: %w", err)
1696 }
1697 }
1698
1699 l.Info("ingested record")
1700 return nil
1701}
1702
1703func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error {
1704 did := e.Did
1705 rkey := e.Commit.RKey
1706
1707 l = l.With("handler", "ingestReaction")
1708
1709 switch e.Commit.Operation {
1710 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1711 raw := json.RawMessage(e.Commit.Record)
1712 record := tangled.FeedReaction{}
1713 if err := json.Unmarshal(raw, &record); err != nil {
1714 return fmt.Errorf("invalid record: %w", err)
1715 }
1716
1717 subjectUri, err := syntax.ParseATURI(record.Subject)
1718 if err != nil {
1719 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1720 }
1721 subjectUri = models.NormalizeReactionSubject(subjectUri)
1722
1723 kind, ok := models.ParseReactionKind(record.Reaction)
1724 if !ok {
1725 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1726 }
1727
1728 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1729 if parseErr != nil {
1730 created = time.Now()
1731 }
1732
1733 reaction := models.Reaction{
1734 ReactedByDid: did,
1735 Rkey: rkey,
1736 ThreadAt: subjectUri,
1737 Kind: kind,
1738 Created: created,
1739 }
1740 if err := db.UpsertReaction(i.Db, reaction); err != nil {
1741 return fmt.Errorf("failed to upsert reaction: %w", err)
1742 }
1743
1744 case jmodels.CommitOperationDelete:
1745 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1746 return fmt.Errorf("failed to delete reaction record: %w", err)
1747 }
1748 }
1749
1750 l.Info("ingested record")
1751 return nil
1752}
1753
1754func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error {
1755 did := e.Did
1756 rkey := e.Commit.RKey
1757
1758 var err error
1759
1760 l = l.With("handler", "ingestLabelDefinition")
1761
1762 switch e.Commit.Operation {
1763 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1764 raw := json.RawMessage(e.Commit.Record)
1765 record := tangled.LabelDefinition{}
1766 err = json.Unmarshal(raw, &record)
1767 if err != nil {
1768 return fmt.Errorf("invalid record: %w", err)
1769 }
1770
1771 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1772 if err != nil {
1773 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1774 }
1775
1776 if err := def.Validate(); err != nil {
1777 return fmt.Errorf("failed to validate labeldef: %w", err)
1778 }
1779
1780 _, err = db.AddLabelDefinition(i.Db, def)
1781 if err != nil {
1782 return fmt.Errorf("failed to create labeldef: %w", err)
1783 }
1784
1785 l.Info("ingested record")
1786 return nil
1787
1788 case jmodels.CommitOperationDelete:
1789 if err := db.DeleteLabelDefinition(
1790 i.Db,
1791 orm.FilterEq("did", did),
1792 orm.FilterEq("rkey", rkey),
1793 ); err != nil {
1794 return fmt.Errorf("failed to delete labeldef record: %w", err)
1795 }
1796
1797 l.Info("ingested record")
1798 return nil
1799 }
1800
1801 return nil
1802}
1803
1804func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1805 did := e.Did
1806 rkey := e.Commit.RKey
1807
1808 var err error
1809
1810 l = l.With("handler", "ingestLabelOp")
1811
1812 switch e.Commit.Operation {
1813 case jmodels.CommitOperationCreate:
1814 raw := json.RawMessage(e.Commit.Record)
1815 record := tangled.LabelOp{}
1816 err = json.Unmarshal(raw, &record)
1817 if err != nil {
1818 return fmt.Errorf("invalid record: %w", err)
1819 }
1820
1821 subject := syntax.ATURI(record.Subject)
1822 collection := subject.Collection()
1823
1824 var repo *models.Repo
1825 switch collection {
1826 case tangled.RepoIssueNSID:
1827 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1828 if err != nil || len(i) != 1 {
1829 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1830 }
1831 repo = i[0].Repo
1832 case tangled.RepoPullNSID:
1833 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1834 if err != nil || len(p) != 1 {
1835 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1836 }
1837 repo = p[0].Repo
1838 default:
1839 return fmt.Errorf("unsupported label subject: %s", collection)
1840 }
1841
1842 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1843 if err != nil {
1844 return fmt.Errorf("failed to build label application ctx: %w", err)
1845 }
1846
1847 ops := models.LabelOpsFromRecord(did, rkey, record)
1848
1849 for _, o := range ops {
1850 def, ok := actx.Defs[o.OperandKey]
1851 if !ok {
1852 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1853 }
1854 // validate permissions: only collaborators can apply labels currently
1855 //
1856 // TODO: introduce a repo:triage permission
1857 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, o.Did, "repo:push")
1858 if permErr != nil {
1859 if !errors.Is(permErr, knotacl.ErrKnotUnreachable) {
1860 return fmt.Errorf("enforcing permission: %w", permErr)
1861 }
1862 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", permErr)
1863 } else if !allowed {
1864 return fmt.Errorf("unauthorized label operation")
1865 }
1866
1867 if err := def.ValidateOperandValue(&o); err != nil {
1868 return fmt.Errorf("failed to validate labelop: %w", err)
1869 }
1870 }
1871
1872 tx, err := i.Db.Begin()
1873 if err != nil {
1874 return err
1875 }
1876 defer tx.Rollback()
1877
1878 for _, o := range ops {
1879 _, err = db.AddLabelOp(tx, &o)
1880 if err != nil {
1881 return fmt.Errorf("failed to add labelop: %w", err)
1882 }
1883 }
1884
1885 if err = tx.Commit(); err != nil {
1886 return err
1887 }
1888
1889 l.Info("ingested record")
1890 }
1891
1892 return nil
1893}