Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/knotacl"
31 "tangled.org/core/appview/mentions"
32 "tangled.org/core/appview/models"
33 "tangled.org/core/appview/notify"
34 "tangled.org/core/appview/repoverify"
35 "tangled.org/core/appview/serververify"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/orm"
38 "tangled.org/core/rbac"
39)
40
41type Ingester struct {
42 Ctx context.Context
43 Db *db.DB
44 Enforcer *rbac.Enforcer
45 Acl *knotacl.Service
46 IdResolver *idresolver.Resolver
47 Cache *cache.Cache
48 Config *config.Config
49 Logger *slog.Logger
50 MentionsResolver *mentions.Resolver
51 Notifier notify.Notifier
52 Verifier repoverify.Verifier
53}
54
55type processFunc func(ctx context.Context, e *jmodels.Event) error
56
57func (i *Ingester) Ingest() processFunc {
58 return func(ctx context.Context, e *jmodels.Event) error {
59 var err error
60
61 l := i.Logger.With("kind", e.Kind)
62 switch e.Kind {
63 case jmodels.EventKindAccount:
64 // TODO: sync account state to db
65 if e.Account.Active {
66 break
67 }
68 // TODO: revoke sessions by DID
69 if *e.Account.Status == "deactivated" {
70 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
71 }
72 case jmodels.EventKindIdentity:
73 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
74 case jmodels.EventKindCommit:
75 l = l.With(
76 "nsid", e.Commit.Collection,
77 "did", e.Did,
78 "rkey", e.Commit.RKey,
79 "op", e.Commit.Operation,
80 )
81 switch e.Commit.Collection {
82 case tangled.GraphFollowNSID:
83 err = i.ingestFollow(e, l)
84 case tangled.GraphVouchNSID:
85 err = i.ingestVouch(ctx, e, l)
86 case tangled.FeedStarNSID:
87 err = i.ingestStar(ctx, e, l)
88 case tangled.FeedReactionNSID:
89 err = i.ingestReaction(e, l)
90 case tangled.PublicKeyNSID:
91 err = i.ingestPublicKey(e, l)
92 case tangled.RepoArtifactNSID:
93 err = i.ingestArtifact(ctx, e, l)
94 case tangled.ActorProfileNSID:
95 err = i.ingestProfile(ctx, e, l)
96 case tangled.SpindleMemberNSID:
97 err = i.ingestSpindleMember(ctx, e, l)
98 case tangled.SpindleNSID:
99 err = i.ingestSpindle(ctx, e, l)
100 case tangled.KnotMemberNSID:
101 err = i.ingestKnotMember(ctx, e, l)
102 case tangled.KnotNSID:
103 err = i.ingestKnot(ctx, e, l)
104 case tangled.StringNSID:
105 err = i.ingestString(e, l)
106 case tangled.RepoIssueNSID:
107 err = i.ingestIssue(ctx, e, l)
108 case tangled.RepoPullNSID:
109 err = i.ingestPull(ctx, e, l)
110 case tangled.FeedCommentNSID:
111 err = i.ingestComment(e, l)
112 case tangled.RepoIssueCommentNSID:
113 err = i.ingestIssueComment(e, l)
114 case tangled.RepoPullCommentNSID:
115 err = i.ingestPullComment(e, l)
116 case tangled.LabelDefinitionNSID:
117 err = i.ingestLabelDefinition(e, l)
118 case tangled.LabelOpNSID:
119 err = i.ingestLabelOp(ctx, e, l)
120 case tangled.RepoNSID:
121 err = i.ingestRepo(ctx, e, l)
122 }
123 }
124
125 if err != nil {
126 l.Warn("failed to ingest record, skipping", "err", err)
127 }
128
129 lastTimeUs := e.TimeUS + 1
130 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
131 l.Error("failed to save cursor", "err", saveErr)
132 }
133
134 return nil
135 }
136}
137
138func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
139 if strings.HasPrefix(ref, "did:") {
140 return db.GetRepoByDid(i.Db, ref)
141 }
142 return db.GetRepoByAtUri(i.Db, ref)
143}
144
145func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
146 var legacy struct {
147 Subject *string `json:"subject"`
148 SubjectDid *string `json:"subjectDid"`
149 }
150 if err := json.Unmarshal(raw, &legacy); err != nil {
151 return false, err
152 }
153
154 switch {
155 case legacy.SubjectDid != nil:
156 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
157 if err != nil {
158 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
159 return false, nil
160 }
161 star.SubjectType = models.StarSubjectRepo
162 star.Subject = repo.RepoDid
163 return true, nil
164
165 case legacy.Subject != nil:
166 uri, err := syntax.ParseATURI(*legacy.Subject)
167 if err != nil {
168 return false, fmt.Errorf("invalid old-format star subject: %w", err)
169 }
170 switch uri.Collection().String() {
171 case tangled.RepoNSID:
172 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
173 if err != nil {
174 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
175 return false, nil
176 }
177 star.SubjectType = models.StarSubjectRepo
178 star.Subject = repo.RepoDid
179 return true, nil
180 default:
181 star.SubjectType = models.StarSubjectString
182 star.Subject = *legacy.Subject
183 return true, nil
184 }
185
186 default:
187 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
188 }
189}
190
191func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
192 var err error
193 did := e.Did
194
195 l = l.With("handler", "ingestStar")
196
197 switch e.Commit.Operation {
198 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
199 raw := json.RawMessage(e.Commit.Record)
200 record := tangled.FeedStar{}
201 unmarshalErr := json.Unmarshal(raw, &record)
202
203 star := &models.Star{
204 Did: did,
205 Rkey: e.Commit.RKey,
206 }
207
208 switch {
209 case unmarshalErr != nil:
210 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
211 if resolveErr != nil {
212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
213 return unmarshalErr
214 }
215 if !resolved {
216 return nil
217 }
218
219 case record.Subject == nil:
220 return fmt.Errorf("star record has nil subject")
221
222 case record.Subject.FeedStar_Repo != nil:
223 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
224 if repoErr != nil {
225 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
226 return nil
227 }
228 star.SubjectType = models.StarSubjectRepo
229 star.Subject = repo.RepoDid
230
231 case record.Subject.FeedStar_String != nil:
232 star.SubjectType = models.StarSubjectString
233 star.Subject = record.Subject.FeedStar_String.Uri
234
235 default:
236 return fmt.Errorf("star record has empty subject union")
237 }
238
239 err = db.AddStar(i.Db, star)
240 case jmodels.CommitOperationDelete:
241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
242 }
243
244 if err != nil {
245 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
246 }
247
248 l.Info("ingested record")
249 return nil
250}
251
252func (i *Ingester) ingestFollow(e *jmodels.Event, l *slog.Logger) error {
253 var err error
254 did := e.Did
255
256 l = l.With("handler", "ingestFollow")
257
258 switch e.Commit.Operation {
259 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
260 raw := json.RawMessage(e.Commit.Record)
261 record := tangled.GraphFollow{}
262 err = json.Unmarshal(raw, &record)
263 if err != nil {
264 l.Error("invalid record", "err", err)
265 return err
266 }
267
268 err = db.AddFollow(i.Db, &models.Follow{
269 UserDid: did,
270 SubjectDid: record.Subject,
271 Rkey: e.Commit.RKey,
272 })
273 case jmodels.CommitOperationDelete:
274 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
275 }
276
277 if err != nil {
278 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
279 }
280
281 l.Info("ingested record")
282 return nil
283}
284
285func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
286 var err error
287 did := e.Did
288
289 l = l.With("handler", "ingestVouch")
290
291 switch e.Commit.Operation {
292 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
293 raw := json.RawMessage(e.Commit.Record)
294 record := tangled.GraphVouch{}
295 err = json.Unmarshal(raw, &record)
296 if err != nil {
297 l.Error("invalid record", "err", err)
298 return err
299 }
300
301 // rkey is the subject_did being vouched for/denounced
302 subjectDID := e.Commit.RKey
303
304 _, err = syntax.ParseDID(subjectDID)
305 if err != nil {
306 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
307 return fmt.Errorf("invalid subject_did: %w", err)
308 }
309
310 if did == subjectDID {
311 l.Warn("attempted self-vouch", "did", did)
312 return fmt.Errorf("cannot vouch for self")
313 }
314
315 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
316 if err != nil {
317 return err
318 }
319
320 if subjectId.Handle.IsInvalidHandle() {
321 return err
322 }
323
324 kind, err := models.ParseVouchKind(record.Kind)
325 if err != nil {
326 l.Error("invalid kind", "kind", kind)
327 return fmt.Errorf("invalid kind: %s", kind)
328 }
329
330 recordCid, err := cid.Parse(e.Commit.CID)
331 if err != nil {
332 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
333 return fmt.Errorf("invalid cid: %w", err)
334 }
335
336 var evidences []syntax.ATURI
337 for _, raw := range record.Evidences {
338 uri, parseErr := syntax.ParseATURI(raw)
339 if parseErr != nil {
340 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
341 continue
342 }
343 evidences = append(evidences, uri)
344 }
345
346 tx, txErr := i.Db.Begin()
347 if txErr != nil {
348 return fmt.Errorf("failed to start transaction: %w", txErr)
349 }
350
351 addErr := db.AddVouch(tx, &models.Vouch{
352 Did: syntax.DID(did),
353 SubjectDid: subjectId.DID,
354 Cid: recordCid,
355 Kind: kind,
356 Reason: record.Reason,
357 Evidences: evidences,
358 })
359 if addErr != nil {
360 tx.Rollback()
361 err = addErr
362 } else {
363 err = tx.Commit()
364 }
365
366 case jmodels.CommitOperationDelete:
367 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
368 }
369
370 if err != nil {
371 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
372 }
373
374 l.Info("ingested record")
375 return nil
376}
377
378func (i *Ingester) ingestPublicKey(e *jmodels.Event, l *slog.Logger) error {
379 did := e.Did
380 var err error
381
382 l = l.With("handler", "ingestPublicKey")
383
384 switch e.Commit.Operation {
385 case jmodels.CommitOperationCreate:
386 l.Debug("processing add of pubkey")
387 raw := json.RawMessage(e.Commit.Record)
388 record := tangled.PublicKey{}
389 err = json.Unmarshal(raw, &record)
390 if err != nil {
391 l.Error("invalid record", "err", err)
392 return err
393 }
394
395 name := record.Name
396 key := record.Key
397 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
398 case jmodels.CommitOperationUpdate:
399 l.Debug("processing update of pubkey")
400 raw := json.RawMessage(e.Commit.Record)
401 record := tangled.PublicKey{}
402 err = json.Unmarshal(raw, &record)
403 if err != nil {
404 l.Error("invalid record", "err", err)
405 return err
406 }
407
408 name := record.Name
409 key := record.Key
410 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
411 case jmodels.CommitOperationDelete:
412 l.Debug("processing delete of pubkey")
413 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
414 }
415
416 if err != nil {
417 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
418 }
419
420 l.Info("ingested record")
421 return nil
422}
423
424func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
425 did := e.Did
426 var err error
427
428 l = l.With("handler", "ingestArtifact")
429
430 switch e.Commit.Operation {
431 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
432 raw := json.RawMessage(e.Commit.Record)
433 record := tangled.RepoArtifact{}
434 err = json.Unmarshal(raw, &record)
435 if err != nil {
436 l.Error("invalid record", "err", err)
437 return err
438 }
439
440 var repo *models.Repo
441 if record.RepoDid != nil && *record.RepoDid != "" {
442 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
443 if err != nil && !errors.Is(err, sql.ErrNoRows) {
444 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
445 }
446 }
447 if repo == nil && record.Repo != nil {
448 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
449 if parseErr != nil {
450 return parseErr
451 }
452 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
453 if err != nil {
454 return err
455 }
456 }
457 if repo == nil {
458 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
459 }
460
461 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, did, "repo:push")
462 if permErr != nil {
463 l.Warn("ingesting artifact without permission check", "did", did, "repo", repo.RepoIdentifier(), "err", permErr)
464 } else if !allowed {
465 l.Info("skipping unauthorized artifact", "did", did, "repo", repo.RepoIdentifier())
466 return nil
467 }
468
469 repoDid := repo.RepoDid
470 if repoDid == "" && record.RepoDid != nil {
471 repoDid = *record.RepoDid
472 }
473 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
474 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
475 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
476 }
477 }
478
479 createdAt, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
480 if parseErr != nil {
481 createdAt = time.Now()
482 }
483
484 artifact := models.Artifact{
485 Did: did,
486 Rkey: e.Commit.RKey,
487 RepoDid: syntax.DID(repo.RepoDid),
488 Tag: plumbing.Hash(record.Tag),
489 CreatedAt: createdAt,
490 BlobCid: cid.Cid(record.Artifact.Ref),
491 Name: record.Name,
492 Size: uint64(record.Artifact.Size),
493 MimeType: record.Artifact.MimeType,
494 }
495
496 err = db.AddArtifact(i.Db, artifact)
497 case jmodels.CommitOperationDelete:
498 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
499 }
500
501 if err != nil {
502 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
503 }
504
505 l.Info("ingested record")
506 return nil
507}
508
509func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
510 did := e.Did
511 var err error
512
513 l = l.With("handler", "ingestProfile")
514
515 if e.Commit.RKey != "self" {
516 return fmt.Errorf("ingestProfile only ingests `self` record")
517 }
518
519 switch e.Commit.Operation {
520 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
521 raw := json.RawMessage(e.Commit.Record)
522 record := tangled.ActorProfile{}
523 err = json.Unmarshal(raw, &record)
524 if err != nil {
525 l.Error("invalid record", "err", err)
526 return err
527 }
528
529 avatar := ""
530 if record.Avatar != nil {
531 avatar = record.Avatar.Ref.String()
532 }
533
534 description := ""
535 if record.Description != nil {
536 description = *record.Description
537 }
538
539 includeBluesky := record.Bluesky
540
541 pronouns := ""
542 if record.Pronouns != nil {
543 pronouns = *record.Pronouns
544 }
545
546 location := ""
547 if record.Location != nil {
548 location = *record.Location
549 }
550
551 var links [5]string
552 for i, l := range record.Links {
553 if i < 5 {
554 links[i] = l
555 }
556 }
557
558 var stats [2]models.VanityStat
559 for i, s := range record.Stats {
560 if i < 2 {
561 stats[i].Kind = models.ParseVanityStatKind(s)
562 }
563 }
564
565 var pinned [6]string
566 for i, r := range record.PinnedRepositories {
567 if i < 6 {
568 pinned[i] = r
569 }
570 }
571
572 var preferredHandle syntax.Handle
573 if record.PreferredHandle != nil {
574 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
575 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
576 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
577 preferredHandle = h
578 }
579 }
580 }
581
582 profile := models.Profile{
583 Did: did,
584 Avatar: avatar,
585 Description: description,
586 IncludeBluesky: includeBluesky,
587 Location: location,
588 Links: links,
589 Stats: stats,
590 PinnedRepos: pinned,
591 Pronouns: pronouns,
592 PreferredHandle: preferredHandle,
593 }
594
595 tx, err := i.Db.Begin()
596 if err != nil {
597 return fmt.Errorf("failed to start transaction: %w", err)
598 }
599
600 err = db.ValidateProfile(tx, &profile)
601 if err != nil {
602 return fmt.Errorf("invalid profile record")
603 }
604
605 err = db.UpsertProfile(tx, &profile)
606 if err == nil && i.Cache != nil {
607 pipe := i.Cache.Pipeline()
608 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
609 if preferredHandle != "" {
610 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
611 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
612 } else {
613 pipe.Del(ctx, didKey)
614 }
615 if _, execErr := pipe.Exec(ctx); execErr != nil {
616 l.Warn("failed to update preferred handle cache", "err", execErr)
617 }
618 }
619 case jmodels.CommitOperationDelete:
620 tx, beginErr := i.Db.Begin()
621 if beginErr != nil {
622 return fmt.Errorf("failed to start transaction: %w", beginErr)
623 }
624
625 priorHandle, phErr := db.GetPreferredHandle(tx, did)
626 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
627 l.Warn("failed to read prior preferred handle", "err", phErr)
628 }
629
630 err = db.DeleteProfile(tx, did)
631 if err == nil && i.Cache != nil {
632 pipe := i.Cache.Pipeline()
633 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
634 if priorHandle != "" {
635 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
636 }
637 if _, execErr := pipe.Exec(ctx); execErr != nil {
638 l.Warn("failed to evict preferred handle cache", "err", execErr)
639 }
640 }
641 }
642
643 if err != nil {
644 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
645 }
646
647 l.Info("ingested record")
648 return nil
649}
650
651func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
652 did := e.Did
653 var err error
654
655 l = l.With("handler", "ingestSpindleMember")
656
657 switch e.Commit.Operation {
658 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
659 raw := json.RawMessage(e.Commit.Record)
660 record := tangled.SpindleMember{}
661 err = json.Unmarshal(raw, &record)
662 if err != nil {
663 l.Error("invalid record", "err", err)
664 return err
665 }
666
667 // only spindle owner can invite to spindles
668 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
669 if err != nil {
670 return fmt.Errorf("failed to check invite permission: %w", err)
671 }
672 if !ok {
673 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
674 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
675 }
676 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
677 if err != nil {
678 return fmt.Errorf("failed to re-check invite permission: %w", err)
679 }
680 if !ok {
681 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
682 }
683 }
684
685 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
686 if err != nil {
687 return err
688 }
689
690 if memberId.Handle.IsInvalidHandle() {
691 return fmt.Errorf("invalid handle for member %s", record.Subject)
692 }
693
694 existing, err := db.GetSpindleMembers(i.Db,
695 orm.FilterEq("did", did),
696 orm.FilterEq("rkey", e.Commit.RKey),
697 )
698 if err != nil {
699 return fmt.Errorf("failed to look up existing member: %w", err)
700 }
701 if len(existing) > 1 {
702 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
703 }
704
705 tx, err := i.Db.Begin()
706 if err != nil {
707 return fmt.Errorf("failed to start txn: %w", err)
708 }
709 committed := false
710 defer func() {
711 if committed {
712 return
713 }
714 tx.Rollback()
715 i.Enforcer.E.LoadPolicy()
716 }()
717
718 if len(existing) == 1 {
719 prev := existing[0]
720 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
721 if err = db.RemoveSpindleMember(tx,
722 orm.FilterEq("did", did),
723 orm.FilterEq("rkey", e.Commit.RKey),
724 ); err != nil {
725 return fmt.Errorf("failed to remove stale row: %w", err)
726 }
727 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
728 return fmt.Errorf("failed to remove stale ACL: %w", err)
729 }
730 }
731 }
732
733 if err = db.AddSpindleMember(tx, models.SpindleMember{
734 Did: syntax.DID(did),
735 Rkey: e.Commit.RKey,
736 Instance: record.Instance,
737 Subject: memberId.DID,
738 }); err != nil {
739 return fmt.Errorf("failed to add to db: %w", err)
740 }
741
742 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
743 return fmt.Errorf("failed to update ACLs: %w", err)
744 }
745
746 if err = tx.Commit(); err != nil {
747 return fmt.Errorf("failed to commit txn: %w", err)
748 }
749
750 if err = i.Enforcer.E.SavePolicy(); err != nil {
751 return fmt.Errorf("failed to save ACLs: %w", err)
752 }
753 committed = true
754
755 l.Info("upserted spindle member")
756 case jmodels.CommitOperationDelete:
757 rkey := e.Commit.RKey
758
759 // get record from db first
760 members, err := db.GetSpindleMembers(
761 i.Db,
762 orm.FilterEq("did", did),
763 orm.FilterEq("rkey", rkey),
764 )
765 if err != nil || len(members) != 1 {
766 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
767 }
768 member := members[0]
769
770 tx, err := i.Db.Begin()
771 if err != nil {
772 return fmt.Errorf("failed to start txn: %w", err)
773 }
774 committed := false
775 defer func() {
776 if committed {
777 return
778 }
779 tx.Rollback()
780 i.Enforcer.E.LoadPolicy()
781 }()
782
783 // remove record by rkey && update enforcer
784 if err = db.RemoveSpindleMember(
785 tx,
786 orm.FilterEq("did", did),
787 orm.FilterEq("rkey", rkey),
788 ); err != nil {
789 return fmt.Errorf("failed to remove from db: %w", err)
790 }
791
792 // update enforcer
793 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
794 if err != nil {
795 return fmt.Errorf("failed to update ACLs: %w", err)
796 }
797
798 if err = tx.Commit(); err != nil {
799 return fmt.Errorf("failed to commit txn: %w", err)
800 }
801
802 if err = i.Enforcer.E.SavePolicy(); err != nil {
803 return fmt.Errorf("failed to save ACLs: %w", err)
804 }
805 committed = true
806
807 l.Info("removed spindle member")
808 }
809
810 return nil
811}
812
813func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
814 did := e.Did
815 var err error
816
817 l = l.With("handler", "ingestSpindle")
818
819 switch e.Commit.Operation {
820 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
821 raw := json.RawMessage(e.Commit.Record)
822 record := tangled.Spindle{}
823 err = json.Unmarshal(raw, &record)
824 if err != nil {
825 l.Error("invalid record", "err", err)
826 return err
827 }
828
829 instance := e.Commit.RKey
830
831 err := db.AddSpindle(i.Db, models.Spindle{
832 Owner: syntax.DID(did),
833 Instance: instance,
834 })
835 if err != nil {
836 l.Error("failed to add spindle to db", "err", err, "instance", instance)
837 return err
838 }
839
840 if err := i.verifySpindle(ctx, instance, did); err != nil {
841 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
842 }
843
844 l.Info("ingested record", "instance", instance)
845 return nil
846
847 case jmodels.CommitOperationDelete:
848 instance := e.Commit.RKey
849
850 // get record from db first
851 spindles, err := db.GetSpindles(
852 ctx,
853 i.Db,
854 orm.FilterEq("owner", did),
855 orm.FilterEq("instance", instance),
856 )
857 if err != nil || len(spindles) != 1 {
858 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
859 }
860 spindle := spindles[0]
861
862 tx, err := i.Db.Begin()
863 if err != nil {
864 return fmt.Errorf("failed to start txn: %w", err)
865 }
866 defer func() {
867 tx.Rollback()
868 i.Enforcer.E.LoadPolicy()
869 }()
870
871 // remove spindle members first
872 err = db.RemoveSpindleMember(
873 tx,
874 orm.FilterEq("owner", did),
875 orm.FilterEq("instance", instance),
876 )
877 if err != nil {
878 return fmt.Errorf("failed to remove spindle members: %w", err)
879 }
880
881 err = db.DeleteSpindle(
882 tx,
883 orm.FilterEq("owner", did),
884 orm.FilterEq("instance", instance),
885 )
886 if err != nil {
887 return fmt.Errorf("failed to delete spindle: %w", err)
888 }
889
890 if spindle.Verified != nil {
891 err = i.Enforcer.RemoveSpindle(instance)
892 if err != nil {
893 return fmt.Errorf("failed to remove spindle from enforcer: %w", err)
894 }
895 }
896
897 err = tx.Commit()
898 if err != nil {
899 return fmt.Errorf("failed to commit txn: %w", err)
900 }
901
902 err = i.Enforcer.E.SavePolicy()
903 if err != nil {
904 return fmt.Errorf("failed to save ACLs: %w", err)
905 }
906
907 l.Info("ingested record", "instance", instance)
908 }
909
910 return nil
911}
912
913func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error {
914 did := e.Did
915 rkey := e.Commit.RKey
916
917 var err error
918
919 l = l.With("handler", "ingestString")
920
921 switch e.Commit.Operation {
922 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
923 raw := json.RawMessage(e.Commit.Record)
924 record := tangled.String{}
925 err = json.Unmarshal(raw, &record)
926 if err != nil {
927 l.Error("invalid record", "err", err)
928 return err
929 }
930
931 string := models.StringFromRecord(did, rkey, record)
932
933 if err = string.Validate(); err != nil {
934 l.Error("invalid record", "err", err)
935 return err
936 }
937
938 if err = db.AddString(i.Db, string); err != nil {
939 l.Error("failed to add string", "err", err)
940 return err
941 }
942
943 l.Info("ingested record")
944 return nil
945
946 case jmodels.CommitOperationDelete:
947 if err := db.DeleteString(
948 i.Db,
949 orm.FilterEq("did", did),
950 orm.FilterEq("rkey", rkey),
951 ); err != nil {
952 l.Error("failed to delete", "err", err)
953 return fmt.Errorf("failed to delete string record: %w", err)
954 }
955
956 l.Info("ingested record")
957 return nil
958 }
959
960 return nil
961}
962
963func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
964 did := e.Did
965 var err error
966
967 l = l.With("handler", "ingestKnotMember")
968
969 switch e.Commit.Operation {
970 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
971 raw := json.RawMessage(e.Commit.Record)
972 record := tangled.KnotMember{}
973 err = json.Unmarshal(raw, &record)
974 if err != nil {
975 l.Error("invalid record", "err", err)
976 return err
977 }
978
979 // only knot owner can invite to knots
980 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
981 if err != nil {
982 return fmt.Errorf("failed to check invite permission: %w", err)
983 }
984 if !ok {
985 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
986 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
987 }
988 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
989 if err != nil {
990 return fmt.Errorf("failed to re-check invite permission: %w", err)
991 }
992 if !ok {
993 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
994 }
995 }
996
997 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
998 if err != nil {
999 return err
1000 }
1001
1002 if memberId.Handle.IsInvalidHandle() {
1003 return fmt.Errorf("invalid handle for member %s", record.Subject)
1004 }
1005
1006 existing, err := db.GetKnotMembers(i.Db,
1007 orm.FilterEq("did", did),
1008 orm.FilterEq("rkey", e.Commit.RKey),
1009 )
1010 if err != nil {
1011 return fmt.Errorf("failed to look up existing member: %w", err)
1012 }
1013 if len(existing) > 1 {
1014 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
1015 }
1016
1017 tx, err := i.Db.Begin()
1018 if err != nil {
1019 return fmt.Errorf("failed to start txn: %w", err)
1020 }
1021 committed := false
1022 defer func() {
1023 if committed {
1024 return
1025 }
1026 tx.Rollback()
1027 i.Enforcer.E.LoadPolicy()
1028 }()
1029
1030 if len(existing) == 1 {
1031 prev := existing[0]
1032 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1033 if err = db.RemoveKnotMember(tx,
1034 orm.FilterEq("did", did),
1035 orm.FilterEq("rkey", e.Commit.RKey),
1036 ); err != nil {
1037 return fmt.Errorf("failed to remove stale row: %w", err)
1038 }
1039 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1040 return fmt.Errorf("failed to remove stale ACL: %w", err)
1041 }
1042 }
1043 }
1044
1045 if err = db.AddKnotMember(tx, models.KnotMember{
1046 Did: syntax.DID(did),
1047 Rkey: e.Commit.RKey,
1048 Domain: record.Domain,
1049 Subject: memberId.DID,
1050 }); err != nil {
1051 return fmt.Errorf("failed to add to db: %w", err)
1052 }
1053
1054 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1055 return fmt.Errorf("failed to update ACLs: %w", err)
1056 }
1057
1058 if err = tx.Commit(); err != nil {
1059 return fmt.Errorf("failed to commit txn: %w", err)
1060 }
1061
1062 if err = i.Enforcer.E.SavePolicy(); err != nil {
1063 return fmt.Errorf("failed to save ACLs: %w", err)
1064 }
1065 committed = true
1066
1067 l.Info("upserted knot member")
1068 case jmodels.CommitOperationDelete:
1069 rkey := e.Commit.RKey
1070
1071 members, err := db.GetKnotMembers(
1072 i.Db,
1073 orm.FilterEq("did", did),
1074 orm.FilterEq("rkey", rkey),
1075 )
1076 if err != nil {
1077 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1078 }
1079 if len(members) == 0 {
1080 l.Info("knot member already removed", "rkey", rkey)
1081 return nil
1082 }
1083 if len(members) > 1 {
1084 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1085 }
1086 member := members[0]
1087
1088 tx, err := i.Db.Begin()
1089 if err != nil {
1090 return fmt.Errorf("failed to start txn: %w", err)
1091 }
1092 committed := false
1093 defer func() {
1094 if committed {
1095 return
1096 }
1097 tx.Rollback()
1098 i.Enforcer.E.LoadPolicy()
1099 }()
1100
1101 if err = db.RemoveKnotMember(
1102 tx,
1103 orm.FilterEq("did", did),
1104 orm.FilterEq("rkey", rkey),
1105 ); err != nil {
1106 return fmt.Errorf("failed to remove from db: %w", err)
1107 }
1108
1109 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1110 return fmt.Errorf("failed to update ACLs: %w", err)
1111 }
1112
1113 if err = tx.Commit(); err != nil {
1114 return fmt.Errorf("failed to commit txn: %w", err)
1115 }
1116
1117 if err = i.Enforcer.E.SavePolicy(); err != nil {
1118 return fmt.Errorf("failed to save ACLs: %w", err)
1119 }
1120 committed = true
1121
1122 l.Info("removed knot member")
1123 }
1124
1125 return nil
1126}
1127
1128func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1129 did := e.Did
1130 var err error
1131
1132 l = l.With("handler", "ingestKnot")
1133
1134 switch e.Commit.Operation {
1135 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1136 raw := json.RawMessage(e.Commit.Record)
1137 record := tangled.Knot{}
1138 err = json.Unmarshal(raw, &record)
1139 if err != nil {
1140 l.Error("invalid record", "err", err)
1141 return err
1142 }
1143
1144 domain := e.Commit.RKey
1145
1146 err := db.AddKnot(i.Db, domain, did)
1147 if err != nil {
1148 l.Error("failed to add knot to db", "err", err, "domain", domain)
1149 return err
1150 }
1151
1152 if err := i.verifyKnot(ctx, domain, did); err != nil {
1153 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1154 }
1155
1156 l.Info("ingested record", "domain", domain)
1157 return nil
1158
1159 case jmodels.CommitOperationDelete:
1160 domain := e.Commit.RKey
1161
1162 // get record from db first
1163 registrations, err := db.GetRegistrations(
1164 i.Db,
1165 orm.FilterEq("domain", domain),
1166 orm.FilterEq("did", did),
1167 )
1168 if err != nil {
1169 return fmt.Errorf("failed to get registration: %w", err)
1170 }
1171 if len(registrations) != 1 {
1172 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1173 }
1174 registration := registrations[0]
1175
1176 tx, err := i.Db.Begin()
1177 if err != nil {
1178 return fmt.Errorf("failed to start txn: %w", err)
1179 }
1180 defer func() {
1181 tx.Rollback()
1182 i.Enforcer.E.LoadPolicy()
1183 }()
1184
1185 err = db.RemoveKnotMember(
1186 tx,
1187 orm.FilterEq("did", did),
1188 orm.FilterEq("domain", domain),
1189 )
1190 if err != nil {
1191 return fmt.Errorf("failed to remove knot members: %w", err)
1192 }
1193
1194 err = db.DeleteKnot(
1195 tx,
1196 orm.FilterEq("did", did),
1197 orm.FilterEq("domain", domain),
1198 )
1199 if err != nil {
1200 return fmt.Errorf("failed to delete knot: %w", err)
1201 }
1202
1203 err = db.RemoveReposByKnot(tx, domain)
1204 if err != nil {
1205 return fmt.Errorf("failed to remove repos by knot: %w", err)
1206 }
1207
1208 if registration.Registered != nil {
1209 err = i.Enforcer.RemoveKnot(domain)
1210 if err != nil {
1211 return fmt.Errorf("failed to remove knot from enforcer: %w", err)
1212 }
1213 }
1214
1215 err = tx.Commit()
1216 if err != nil {
1217 return fmt.Errorf("failed to commit txn: %w", err)
1218 }
1219
1220 err = i.Enforcer.E.SavePolicy()
1221 if err != nil {
1222 return fmt.Errorf("failed to save ACLs: %w", err)
1223 }
1224
1225 l.Info("ingested record", "domain", domain)
1226 }
1227
1228 return nil
1229}
1230
1231const (
1232 verifyAttempts = 4
1233 verifyMinDelay = 1 * time.Second
1234 verifyMaxDelay = 5 * time.Second
1235)
1236
1237func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1238 regs, err := db.GetRegistrations(i.Db,
1239 orm.FilterEq("domain", domain),
1240 orm.FilterEq("did", did),
1241 )
1242 if err != nil {
1243 return fmt.Errorf("look up registration: %w", err)
1244 }
1245 if len(regs) != 1 {
1246 return fmt.Errorf("no registration for %s by %s", domain, did)
1247 }
1248 if regs[0].Registered != nil {
1249 return nil
1250 }
1251
1252 err = retry.Do(
1253 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1254 retry.Context(ctx),
1255 retry.Attempts(verifyAttempts),
1256 retry.Delay(verifyMinDelay),
1257 retry.MaxDelay(verifyMaxDelay),
1258 retry.DelayType(retry.BackOffDelay),
1259 retry.LastErrorOnly(true),
1260 )
1261 if err != nil {
1262 return fmt.Errorf("verify: %w", err)
1263 }
1264 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1265}
1266
1267func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1268 spindles, err := db.GetSpindles(ctx, i.Db,
1269 orm.FilterEq("instance", instance),
1270 orm.FilterEq("owner", did),
1271 )
1272 if err != nil {
1273 return fmt.Errorf("look up spindle: %w", err)
1274 }
1275 if len(spindles) != 1 {
1276 return fmt.Errorf("no spindle for %s by %s", instance, did)
1277 }
1278 if spindles[0].Verified != nil {
1279 return nil
1280 }
1281
1282 err = retry.Do(
1283 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1284 retry.Context(ctx),
1285 retry.Attempts(verifyAttempts),
1286 retry.Delay(verifyMinDelay),
1287 retry.MaxDelay(verifyMaxDelay),
1288 retry.DelayType(retry.BackOffDelay),
1289 retry.LastErrorOnly(true),
1290 )
1291 if err != nil {
1292 return fmt.Errorf("verify: %w", err)
1293 }
1294 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1295 return err
1296}
1297
1298const sweepConcurrency = 4
1299
1300func (i *Ingester) SweepPendingVerifications() {
1301 l := i.Logger.With("handler", "SweepPendingVerifications")
1302
1303 var g errgroup.Group
1304 g.SetLimit(sweepConcurrency)
1305
1306 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1307 if err != nil {
1308 l.Error("failed to list unverified knots", "err", err)
1309 } else {
1310 for _, reg := range regs {
1311 g.Go(func() error {
1312 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1313 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1314 }
1315 return nil
1316 })
1317 }
1318 }
1319
1320 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1321 if err != nil {
1322 l.Error("failed to list unverified spindles", "err", err)
1323 g.Wait()
1324 return
1325 }
1326 for _, s := range spindles {
1327 g.Go(func() error {
1328 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1329 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1330 }
1331 return nil
1332 })
1333 }
1334 g.Wait()
1335}
1336
1337func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1338 did := e.Did
1339 rkey := e.Commit.RKey
1340
1341 var err error
1342
1343 l = l.With("handler", "ingestIssue")
1344
1345 switch e.Commit.Operation {
1346 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1347 raw := json.RawMessage(e.Commit.Record)
1348 record := tangled.RepoIssue{}
1349 err = json.Unmarshal(raw, &record)
1350 if err != nil {
1351 l.Error("invalid record", "err", err)
1352 return err
1353 }
1354
1355 issue := models.IssueFromRecord(did, rkey, record)
1356
1357 if issue.RepoDid == "" {
1358 return fmt.Errorf("issue record has no repo field")
1359 }
1360 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1361 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1362 }
1363
1364 if err := issue.Validate(); err != nil {
1365 return fmt.Errorf("failed to validate issue: %w", err)
1366 }
1367
1368 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1369 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1370 if repoErr == nil && repo.RepoDid != "" {
1371 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1372 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1373 }
1374 }
1375 }
1376
1377 tx, err := i.Db.BeginTx(ctx, nil)
1378 if err != nil {
1379 l.Error("failed to begin transaction", "err", err)
1380 return err
1381 }
1382 defer tx.Rollback()
1383
1384 err = db.PutIssue(tx, &issue)
1385 if err != nil {
1386 l.Error("failed to create issue", "err", err)
1387 return err
1388 }
1389
1390 err = tx.Commit()
1391 if err != nil {
1392 l.Error("failed to commit txn", "err", err)
1393 return err
1394 }
1395
1396 l.Info("ingested record")
1397 return nil
1398
1399 case jmodels.CommitOperationDelete:
1400 tx, err := i.Db.BeginTx(ctx, nil)
1401 if err != nil {
1402 l.Error("failed to begin transaction", "err", err)
1403 return err
1404 }
1405 defer tx.Rollback()
1406
1407 if err := db.DeleteIssues(
1408 tx,
1409 did,
1410 rkey,
1411 ); err != nil {
1412 l.Error("failed to delete", "err", err)
1413 return fmt.Errorf("failed to delete issue record: %w", err)
1414 }
1415 if err := tx.Commit(); err != nil {
1416 l.Error("failed to commit txn", "err", err)
1417 return err
1418 }
1419
1420 l.Info("ingested record")
1421 return nil
1422 }
1423
1424 return nil
1425}
1426
1427func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1428 did := e.Did
1429 rkey := e.Commit.RKey
1430
1431 var err error
1432
1433 l = l.With("handler", "ingestPull")
1434
1435 switch e.Commit.Operation {
1436 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1437 raw := json.RawMessage(e.Commit.Record)
1438 record := tangled.RepoPull{}
1439 err = json.Unmarshal(raw, &record)
1440 if err != nil {
1441 l.Error("invalid record", "err", err)
1442 return err
1443 }
1444
1445 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1446 if err != nil {
1447 l.Error("failed to resolve did", "err", err)
1448 return err
1449 }
1450
1451 // go through and fetch all blobs in parallel
1452 readers := make([]*io.ReadCloser, len(record.Rounds))
1453 var mu sync.Mutex
1454
1455 g, gctx := errgroup.WithContext(ctx)
1456
1457 for idx, b := range record.Rounds {
1458 g.Go(func() error {
1459 // for some reason, a blob is empty
1460 if b.PatchBlob == nil {
1461 return fmt.Errorf("missing patchBlob in round %d", idx)
1462 }
1463
1464 ownerPds := ownerId.PDSEndpoint()
1465 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1466 q := url.Query()
1467 q.Set("cid", b.PatchBlob.Ref.String())
1468 q.Set("did", did)
1469 url.RawQuery = q.Encode()
1470
1471 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1472 if err != nil {
1473 l.Error("failed to create request")
1474 return err
1475 }
1476 req.Header.Set("Content-Type", "application/json")
1477
1478 resp, err := http.DefaultClient.Do(req)
1479 if err != nil {
1480 l.Error("failed to make request")
1481 return err
1482 }
1483
1484 mu.Lock()
1485 readers[idx] = &resp.Body
1486 mu.Unlock()
1487
1488 return nil
1489 })
1490 }
1491
1492 if err := g.Wait(); err != nil {
1493 for _, r := range readers {
1494 if r != nil && *r != nil {
1495 (*r).Close()
1496 }
1497 }
1498 return err
1499 }
1500
1501 defer func() {
1502 for _, r := range readers {
1503 if r != nil && *r != nil {
1504 (*r).Close()
1505 }
1506 }
1507 }()
1508
1509 pull, err := models.PullFromRecord(did, rkey, record, readers)
1510 if err != nil {
1511 return fmt.Errorf("failed to parse pull from record: %w", err)
1512 }
1513 if err := pull.Validate(); err != nil {
1514 return fmt.Errorf("failed to validate pull: %w", err)
1515 }
1516 if pull.DependentOn != nil {
1517 if err := func() error {
1518 dependentPull, err := db.GetPull(
1519 i.Db,
1520 orm.FilterEq("dependent_on", pull.DependentOn.String()),
1521 )
1522 if errors.Is(err, sql.ErrNoRows) {
1523 return nil
1524 }
1525 if err != nil {
1526 return fmt.Errorf("failed to fetch pulls with same dependency: %w", err)
1527 }
1528 if dependentPull.AtUri() == pull.AtUri() {
1529 return nil
1530 }
1531 return fmt.Errorf("another pull already depends on %s, which would form a DAG, this is presently disallowed", pull.DependentOn.String())
1532 }(); err != nil {
1533 return fmt.Errorf("failed to validate pull stack: %w", err)
1534 }
1535 }
1536
1537 tx, err := i.Db.BeginTx(ctx, nil)
1538 if err != nil {
1539 l.Error("failed to begin transaction", "err", err)
1540 return err
1541 }
1542 defer tx.Rollback()
1543
1544 err = db.PutPull(tx, pull)
1545 if err != nil {
1546 l.Error("failed to create pull", "err", err)
1547 return err
1548 }
1549
1550 err = tx.Commit()
1551 if err != nil {
1552 l.Error("failed to commit txn", "err", err)
1553 return err
1554 }
1555
1556 l.Info("ingested record")
1557 return nil
1558
1559 case jmodels.CommitOperationDelete:
1560 tx, err := i.Db.BeginTx(ctx, nil)
1561 if err != nil {
1562 l.Error("failed to begin transaction", "err", err)
1563 return err
1564 }
1565 defer tx.Rollback()
1566
1567 if err := db.AbandonPulls(
1568 tx,
1569 orm.FilterEq("owner_did", did),
1570 orm.FilterEq("rkey", rkey),
1571 ); err != nil {
1572 l.Error("failed to abandon", "err", err)
1573 return fmt.Errorf("failed to abandon pull record: %w", err)
1574 }
1575 if err := tx.Commit(); err != nil {
1576 l.Error("failed to commit txn", "err", err)
1577 return err
1578 }
1579
1580 l.Info("ingested record")
1581 return nil
1582 }
1583
1584 return nil
1585}
1586
1587// ingestIssueComment ingests legacy sh.tangled.repo.issue.comment deletions
1588func (i *Ingester) ingestIssueComment(e *jmodels.Event, l *slog.Logger) error {
1589 l = l.With("handler", "ingestIssueComment")
1590
1591 switch e.Commit.Operation {
1592 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1593 // no-op. sh.tangled.repo.issue.comment is deprecated
1594
1595 case jmodels.CommitOperationDelete:
1596 if err := db.PurgeComments(
1597 i.Db,
1598 orm.FilterEq("did", e.Did),
1599 orm.FilterEq("collection", e.Commit.Collection),
1600 orm.FilterEq("rkey", e.Commit.RKey),
1601 ); err != nil {
1602 return fmt.Errorf("failed to delete comment record: %w", err)
1603 }
1604 }
1605
1606 l.Info("ingested record")
1607 return nil
1608}
1609
1610// ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions
1611func (i *Ingester) ingestPullComment(e *jmodels.Event, l *slog.Logger) error {
1612 l = l.With("handler", "ingestPullComment")
1613
1614 switch e.Commit.Operation {
1615 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1616 // no-op. sh.tangled.repo.pull.comment is deprecated
1617
1618 case jmodels.CommitOperationDelete:
1619 if err := db.PurgeComments(
1620 i.Db,
1621 orm.FilterEq("did", e.Did),
1622 orm.FilterEq("collection", e.Commit.Collection),
1623 orm.FilterEq("rkey", e.Commit.RKey),
1624 ); err != nil {
1625 return fmt.Errorf("failed to delete comment record: %w", err)
1626 }
1627 }
1628
1629 l.Info("ingested record")
1630 return nil
1631}
1632
1633func (i *Ingester) ingestComment(e *jmodels.Event, l *slog.Logger) error {
1634 did := e.Did
1635 rkey := e.Commit.RKey
1636 cid := e.Commit.CID
1637
1638 var err error
1639
1640 l = l.With("handler", "ingestComment")
1641
1642 ctx := context.Background()
1643
1644 switch e.Commit.Operation {
1645 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1646 raw := json.RawMessage(e.Commit.Record)
1647 record := tangled.FeedComment{}
1648 err = json.Unmarshal(raw, &record)
1649 if err != nil {
1650 return fmt.Errorf("invalid record: %w", err)
1651 }
1652
1653 comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record)
1654 if err != nil {
1655 return fmt.Errorf("failed to parse comment from record: %w", err)
1656 }
1657
1658 if err := comment.Validate(); err != nil {
1659 return fmt.Errorf("failed to validate comment: %w", err)
1660 }
1661
1662 var references []syntax.ATURI
1663 if comment.Body.Original != nil {
1664 _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original)
1665 }
1666
1667 tx, err := i.Db.Begin()
1668 if err != nil {
1669 return fmt.Errorf("failed to start transaction: %w", err)
1670 }
1671 defer tx.Rollback()
1672
1673 _, err = db.PutComment(tx, comment, references)
1674 if err != nil {
1675 return fmt.Errorf("failed to create comment: %w", err)
1676 }
1677
1678 if err := tx.Commit(); err != nil {
1679 return err
1680 }
1681
1682 case jmodels.CommitOperationDelete:
1683 if err := db.DeleteComments(
1684 i.Db,
1685 orm.FilterEq("did", did),
1686 orm.FilterEq("collection", e.Commit.Collection),
1687 orm.FilterEq("rkey", rkey),
1688 ); err != nil {
1689 return fmt.Errorf("failed to delete comment record: %w", err)
1690 }
1691 }
1692
1693 l.Info("ingested record")
1694 return nil
1695}
1696
1697func (i *Ingester) ingestReaction(e *jmodels.Event, l *slog.Logger) error {
1698 did := e.Did
1699 rkey := e.Commit.RKey
1700
1701 l = l.With("handler", "ingestReaction")
1702
1703 switch e.Commit.Operation {
1704 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1705 raw := json.RawMessage(e.Commit.Record)
1706 record := tangled.FeedReaction{}
1707 if err := json.Unmarshal(raw, &record); err != nil {
1708 return fmt.Errorf("invalid record: %w", err)
1709 }
1710
1711 subjectUri, err := syntax.ParseATURI(record.Subject)
1712 if err != nil {
1713 return fmt.Errorf("invalid reaction subject %q: %w", record.Subject, err)
1714 }
1715 subjectUri = models.NormalizeReactionSubject(subjectUri)
1716
1717 kind, ok := models.ParseReactionKind(record.Reaction)
1718 if !ok {
1719 return fmt.Errorf("invalid reaction kind: %q", record.Reaction)
1720 }
1721
1722 created, parseErr := time.Parse(time.RFC3339, record.CreatedAt)
1723 if parseErr != nil {
1724 created = time.Now()
1725 }
1726
1727 tx, err := i.Db.Begin()
1728 if err != nil {
1729 return fmt.Errorf("failed to start transaction: %w", err)
1730 }
1731 defer tx.Rollback()
1732
1733 if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil {
1734 return fmt.Errorf("failed to clear existing reaction: %w", err)
1735 }
1736 if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil {
1737 return fmt.Errorf("failed to add reaction: %w", err)
1738 }
1739
1740 if err := tx.Commit(); err != nil {
1741 return err
1742 }
1743
1744 case jmodels.CommitOperationDelete:
1745 if err := db.DeleteReactionByRkey(i.Db, did, rkey); err != nil {
1746 return fmt.Errorf("failed to delete reaction record: %w", err)
1747 }
1748 }
1749
1750 l.Info("ingested record")
1751 return nil
1752}
1753
1754func (i *Ingester) ingestLabelDefinition(e *jmodels.Event, l *slog.Logger) error {
1755 did := e.Did
1756 rkey := e.Commit.RKey
1757
1758 var err error
1759
1760 l = l.With("handler", "ingestLabelDefinition")
1761
1762 switch e.Commit.Operation {
1763 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1764 raw := json.RawMessage(e.Commit.Record)
1765 record := tangled.LabelDefinition{}
1766 err = json.Unmarshal(raw, &record)
1767 if err != nil {
1768 return fmt.Errorf("invalid record: %w", err)
1769 }
1770
1771 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1772 if err != nil {
1773 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1774 }
1775
1776 if err := def.Validate(); err != nil {
1777 return fmt.Errorf("failed to validate labeldef: %w", err)
1778 }
1779
1780 _, err = db.AddLabelDefinition(i.Db, def)
1781 if err != nil {
1782 return fmt.Errorf("failed to create labeldef: %w", err)
1783 }
1784
1785 l.Info("ingested record")
1786 return nil
1787
1788 case jmodels.CommitOperationDelete:
1789 if err := db.DeleteLabelDefinition(
1790 i.Db,
1791 orm.FilterEq("did", did),
1792 orm.FilterEq("rkey", rkey),
1793 ); err != nil {
1794 return fmt.Errorf("failed to delete labeldef record: %w", err)
1795 }
1796
1797 l.Info("ingested record")
1798 return nil
1799 }
1800
1801 return nil
1802}
1803
1804func (i *Ingester) ingestLabelOp(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
1805 did := e.Did
1806 rkey := e.Commit.RKey
1807
1808 var err error
1809
1810 l = l.With("handler", "ingestLabelOp")
1811
1812 switch e.Commit.Operation {
1813 case jmodels.CommitOperationCreate:
1814 raw := json.RawMessage(e.Commit.Record)
1815 record := tangled.LabelOp{}
1816 err = json.Unmarshal(raw, &record)
1817 if err != nil {
1818 return fmt.Errorf("invalid record: %w", err)
1819 }
1820
1821 subject := syntax.ATURI(record.Subject)
1822 collection := subject.Collection()
1823
1824 var repo *models.Repo
1825 switch collection {
1826 case tangled.RepoIssueNSID:
1827 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1828 if err != nil || len(i) != 1 {
1829 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1830 }
1831 repo = i[0].Repo
1832 case tangled.RepoPullNSID:
1833 p, err := db.GetPulls(i.Db, orm.FilterEq("at_uri", subject))
1834 if err != nil || len(p) != 1 {
1835 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(p))
1836 }
1837 repo = p[0].Repo
1838 default:
1839 return fmt.Errorf("unsupported label subject: %s", collection)
1840 }
1841
1842 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1843 if err != nil {
1844 return fmt.Errorf("failed to build label application ctx: %w", err)
1845 }
1846
1847 ops := models.LabelOpsFromRecord(did, rkey, record)
1848
1849 for _, o := range ops {
1850 def, ok := actx.Defs[o.OperandKey]
1851 if !ok {
1852 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1853 }
1854 // validate permissions: only collaborators can apply labels currently
1855 //
1856 // TODO: introduce a repo:triage permission
1857 allowed, permErr := i.Acl.HasRepoPermissionErr(ctx, repo, o.Did, "repo:push")
1858 if permErr != nil {
1859 if !errors.Is(permErr, knotacl.ErrKnotUnreachable) {
1860 return fmt.Errorf("enforcing permission: %w", permErr)
1861 }
1862 l.Warn("ingesting labelop without permission check", "did", o.Did, "err", permErr)
1863 } else if !allowed {
1864 return fmt.Errorf("unauthorized label operation")
1865 }
1866
1867 if err := def.ValidateOperandValue(&o); err != nil {
1868 return fmt.Errorf("failed to validate labelop: %w", err)
1869 }
1870 }
1871
1872 tx, err := i.Db.Begin()
1873 if err != nil {
1874 return err
1875 }
1876 defer tx.Rollback()
1877
1878 for _, o := range ops {
1879 _, err = db.AddLabelOp(tx, &o)
1880 if err != nil {
1881 return fmt.Errorf("failed to add labelop: %w", err)
1882 }
1883 }
1884
1885 if err = tx.Commit(); err != nil {
1886 return err
1887 }
1888
1889 l.Info("ingested record")
1890 }
1891
1892 return nil
1893}