Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "maps"
12 "net/http"
13 "net/url"
14 "slices"
15 "strings"
16 "sync"
17
18 "time"
19
20 "github.com/avast/retry-go/v4"
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 jmodels "github.com/bluesky-social/jetstream/pkg/models"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/ipfs/go-cid"
25 "golang.org/x/sync/errgroup"
26 "tangled.org/core/api/tangled"
27 "tangled.org/core/appview/cache"
28 "tangled.org/core/appview/config"
29 "tangled.org/core/appview/db"
30 "tangled.org/core/appview/models"
31 "tangled.org/core/appview/notify"
32 "tangled.org/core/appview/repoverify"
33 "tangled.org/core/appview/serververify"
34 "tangled.org/core/appview/validator"
35 "tangled.org/core/idresolver"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38)
39
40type Ingester struct {
41 Ctx context.Context
42 Db *db.DB
43 Enforcer *rbac.Enforcer
44 IdResolver *idresolver.Resolver
45 Cache *cache.Cache
46 Config *config.Config
47 Logger *slog.Logger
48 Validator *validator.Validator
49 Notifier notify.Notifier
50 Verifier repoverify.Verifier
51}
52
53type processFunc func(ctx context.Context, e *jmodels.Event) error
54
55func (i *Ingester) Ingest() processFunc {
56 return func(ctx context.Context, e *jmodels.Event) error {
57 var err error
58
59 l := i.Logger.With("kind", e.Kind)
60 switch e.Kind {
61 case jmodels.EventKindAccount:
62 // TODO: sync account state to db
63 if e.Account.Active {
64 break
65 }
66 // TODO: revoke sessions by DID
67 if *e.Account.Status == "deactivated" {
68 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
69 }
70 case jmodels.EventKindIdentity:
71 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
72 case jmodels.EventKindCommit:
73 switch e.Commit.Collection {
74 case tangled.GraphFollowNSID:
75 err = i.ingestFollow(e)
76 case tangled.GraphVouchNSID:
77 err = i.ingestVouch(ctx, e)
78 case tangled.FeedStarNSID:
79 err = i.ingestStar(ctx, e)
80 case tangled.PublicKeyNSID:
81 err = i.ingestPublicKey(e)
82 case tangled.RepoArtifactNSID:
83 err = i.ingestArtifact(ctx, e)
84 case tangled.ActorProfileNSID:
85 err = i.ingestProfile(ctx, e)
86 case tangled.SpindleMemberNSID:
87 err = i.ingestSpindleMember(ctx, e)
88 case tangled.SpindleNSID:
89 err = i.ingestSpindle(ctx, e)
90 case tangled.KnotMemberNSID:
91 err = i.ingestKnotMember(ctx, e)
92 case tangled.KnotNSID:
93 err = i.ingestKnot(ctx, e)
94 case tangled.StringNSID:
95 err = i.ingestString(e)
96 case tangled.RepoIssueNSID:
97 err = i.ingestIssue(ctx, e)
98 case tangled.RepoPullNSID:
99 err = i.ingestPull(ctx, e)
100 case tangled.RepoIssueCommentNSID:
101 err = i.ingestIssueComment(e)
102 case tangled.LabelDefinitionNSID:
103 err = i.ingestLabelDefinition(e)
104 case tangled.LabelOpNSID:
105 err = i.ingestLabelOp(e)
106 case tangled.RepoNSID:
107 err = i.ingestRepo(ctx, e)
108 }
109 l = i.Logger.With("nsid", e.Commit.Collection)
110 }
111
112 if err != nil {
113 l.Warn("failed to ingest record, skipping", "err", err)
114 }
115
116 lastTimeUs := e.TimeUS + 1
117 if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
118 l.Error("failed to save cursor", "err", saveErr)
119 }
120
121 return nil
122 }
123}
124
125func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
126 if strings.HasPrefix(ref, "did:") {
127 return db.GetRepoByDid(i.Db, ref)
128 }
129 return db.GetRepoByAtUri(i.Db, ref)
130}
131
132func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
133 var legacy struct {
134 Subject *string `json:"subject"`
135 SubjectDid *string `json:"subjectDid"`
136 }
137 if err := json.Unmarshal(raw, &legacy); err != nil {
138 return false, err
139 }
140
141 switch {
142 case legacy.SubjectDid != nil:
143 repo, err := i.resolveRepoRef(*legacy.SubjectDid)
144 if err != nil {
145 l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
146 return false, nil
147 }
148 star.SubjectType = models.StarSubjectRepo
149 star.Subject = repo.RepoDid
150 return true, nil
151
152 case legacy.Subject != nil:
153 uri, err := syntax.ParseATURI(*legacy.Subject)
154 if err != nil {
155 return false, fmt.Errorf("invalid old-format star subject: %w", err)
156 }
157 switch uri.Collection().String() {
158 case tangled.RepoNSID:
159 repo, err := db.GetRepoByAtUri(i.Db, uri.String())
160 if err != nil {
161 l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
162 return false, nil
163 }
164 star.SubjectType = models.StarSubjectRepo
165 star.Subject = repo.RepoDid
166 return true, nil
167 default:
168 star.SubjectType = models.StarSubjectString
169 star.Subject = *legacy.Subject
170 return true, nil
171 }
172
173 default:
174 return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
175 }
176}
177
178func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
179 var err error
180 did := e.Did
181
182 l := i.Logger.With("handler", "ingestStar")
183 l = l.With("nsid", e.Commit.Collection)
184
185 switch e.Commit.Operation {
186 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
187 raw := json.RawMessage(e.Commit.Record)
188 record := tangled.FeedStar{}
189 unmarshalErr := json.Unmarshal(raw, &record)
190
191 star := &models.Star{
192 Did: did,
193 Rkey: e.Commit.RKey,
194 }
195
196 switch {
197 case unmarshalErr != nil:
198 resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
199 if resolveErr != nil {
200 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
201 return unmarshalErr
202 }
203 if !resolved {
204 return nil
205 }
206
207 case record.Subject == nil:
208 return fmt.Errorf("star record has nil subject")
209
210 case record.Subject.FeedStar_Repo != nil:
211 repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
212 if repoErr != nil {
213 l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
214 return nil
215 }
216 star.SubjectType = models.StarSubjectRepo
217 star.Subject = repo.RepoDid
218
219 case record.Subject.FeedStar_String != nil:
220 star.SubjectType = models.StarSubjectString
221 star.Subject = record.Subject.FeedStar_String.Uri
222
223 default:
224 return fmt.Errorf("star record has empty subject union")
225 }
226
227 err = db.AddStar(i.Db, star)
228 case jmodels.CommitOperationDelete:
229 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
230 }
231
232 if err != nil {
233 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
234 }
235
236 return nil
237}
238
239func (i *Ingester) ingestFollow(e *jmodels.Event) error {
240 var err error
241 did := e.Did
242
243 l := i.Logger.With("handler", "ingestFollow")
244 l = l.With("nsid", e.Commit.Collection)
245
246 switch e.Commit.Operation {
247 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
248 raw := json.RawMessage(e.Commit.Record)
249 record := tangled.GraphFollow{}
250 err = json.Unmarshal(raw, &record)
251 if err != nil {
252 l.Error("invalid record", "err", err)
253 return err
254 }
255
256 err = db.AddFollow(i.Db, &models.Follow{
257 UserDid: did,
258 SubjectDid: record.Subject,
259 Rkey: e.Commit.RKey,
260 })
261 case jmodels.CommitOperationDelete:
262 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
263 }
264
265 if err != nil {
266 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
267 }
268
269 return nil
270}
271
272func (i *Ingester) ingestVouch(ctx context.Context, e *jmodels.Event) error {
273 var err error
274 did := e.Did
275
276 l := i.Logger.With("handler", "ingestVouch")
277 l = l.With("nsid", e.Commit.Collection)
278 l.Info("ingesting vouch")
279
280 switch e.Commit.Operation {
281 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
282 raw := json.RawMessage(e.Commit.Record)
283 record := tangled.GraphVouch{}
284 err = json.Unmarshal(raw, &record)
285 if err != nil {
286 l.Error("invalid record", "err", err)
287 return err
288 }
289
290 // rkey is the subject_did being vouched for/denounced
291 subjectDID := e.Commit.RKey
292
293 _, err = syntax.ParseDID(subjectDID)
294 if err != nil {
295 l.Error("invalid subject_did in rkey", "err", err, "rkey", subjectDID)
296 return fmt.Errorf("invalid subject_did: %w", err)
297 }
298
299 if did == subjectDID {
300 l.Warn("attempted self-vouch", "did", did)
301 return fmt.Errorf("cannot vouch for self")
302 }
303
304 subjectId, err := i.IdResolver.ResolveIdent(ctx, subjectDID)
305 if err != nil {
306 return err
307 }
308
309 if subjectId.Handle.IsInvalidHandle() {
310 return err
311 }
312
313 kind, err := models.ParseVouchKind(record.Kind)
314 if err != nil {
315 l.Error("invalid kind", "kind", kind)
316 return fmt.Errorf("invalid kind: %s", kind)
317 }
318
319 recordCid, err := cid.Parse(e.Commit.CID)
320 if err != nil {
321 l.Error("invalid cid", "err", err, "cid", e.Commit.CID)
322 return fmt.Errorf("invalid cid: %w", err)
323 }
324
325 var evidences []syntax.ATURI
326 for _, raw := range record.Evidences {
327 uri, parseErr := syntax.ParseATURI(raw)
328 if parseErr != nil {
329 l.Warn("invalid evidence AT-URI, skipping", "uri", raw, "err", parseErr)
330 continue
331 }
332 evidences = append(evidences, uri)
333 }
334
335 tx, txErr := i.Db.Begin()
336 if txErr != nil {
337 return fmt.Errorf("failed to start transaction: %w", txErr)
338 }
339
340 addErr := db.AddVouch(tx, &models.Vouch{
341 Did: syntax.DID(did),
342 SubjectDid: subjectId.DID,
343 Cid: recordCid,
344 Kind: kind,
345 Reason: record.Reason,
346 Evidences: evidences,
347 })
348 if addErr != nil {
349 tx.Rollback()
350 err = addErr
351 } else {
352 err = tx.Commit()
353 }
354
355 case jmodels.CommitOperationDelete:
356 err = db.DeleteVouchByRkey(i.Db, did, e.Commit.RKey)
357 }
358
359 if err != nil {
360 return fmt.Errorf("failed to %s vouch record: %w", e.Commit.Operation, err)
361 }
362
363 return nil
364}
365
366func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
367 did := e.Did
368 var err error
369
370 l := i.Logger.With("handler", "ingestPublicKey")
371 l = l.With("nsid", e.Commit.Collection)
372
373 switch e.Commit.Operation {
374 case jmodels.CommitOperationCreate:
375 l.Debug("processing add of pubkey")
376 raw := json.RawMessage(e.Commit.Record)
377 record := tangled.PublicKey{}
378 err = json.Unmarshal(raw, &record)
379 if err != nil {
380 l.Error("invalid record", "err", err)
381 return err
382 }
383
384 name := record.Name
385 key := record.Key
386 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
387 case jmodels.CommitOperationUpdate:
388 l.Debug("processing update of pubkey")
389 raw := json.RawMessage(e.Commit.Record)
390 record := tangled.PublicKey{}
391 err = json.Unmarshal(raw, &record)
392 if err != nil {
393 l.Error("invalid record", "err", err)
394 return err
395 }
396
397 name := record.Name
398 key := record.Key
399 err = db.UpdatePublicKey(i.Db, did, name, key, e.Commit.RKey)
400 case jmodels.CommitOperationDelete:
401 l.Debug("processing delete of pubkey")
402 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
403 }
404
405 if err != nil {
406 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
407 }
408
409 return nil
410}
411
412func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
413 did := e.Did
414 var err error
415
416 l := i.Logger.With("handler", "ingestArtifact")
417 l = l.With("nsid", e.Commit.Collection)
418
419 switch e.Commit.Operation {
420 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
421 raw := json.RawMessage(e.Commit.Record)
422 record := tangled.RepoArtifact{}
423 err = json.Unmarshal(raw, &record)
424 if err != nil {
425 l.Error("invalid record", "err", err)
426 return err
427 }
428
429 var repo *models.Repo
430 if record.RepoDid != nil && *record.RepoDid != "" {
431 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid)
432 if err != nil && !errors.Is(err, sql.ErrNoRows) {
433 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err)
434 }
435 }
436 if repo == nil && record.Repo != nil {
437 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
438 if parseErr != nil {
439 return parseErr
440 }
441 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String())
442 if err != nil {
443 return err
444 }
445 }
446 if repo == nil {
447 return fmt.Errorf("artifact record has neither valid repoDid nor repo field")
448 }
449
450 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push")
451 if err != nil || !ok {
452 return err
453 }
454
455 repoDid := repo.RepoDid
456 if repoDid == "" && record.RepoDid != nil {
457 repoDid = *record.RepoDid
458 }
459 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
460 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
461 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
462 }
463 }
464
465 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
466 if err != nil {
467 createdAt = time.Now()
468 }
469
470 artifact := models.Artifact{
471 Did: did,
472 Rkey: e.Commit.RKey,
473 RepoDid: syntax.DID(repo.RepoDid),
474 Tag: plumbing.Hash(record.Tag),
475 CreatedAt: createdAt,
476 BlobCid: cid.Cid(record.Artifact.Ref),
477 Name: record.Name,
478 Size: uint64(record.Artifact.Size),
479 MimeType: record.Artifact.MimeType,
480 }
481
482 err = db.AddArtifact(i.Db, artifact)
483 case jmodels.CommitOperationDelete:
484 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
485 }
486
487 if err != nil {
488 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
489 }
490
491 return nil
492}
493
494func (i *Ingester) ingestProfile(ctx context.Context, e *jmodels.Event) error {
495 did := e.Did
496 var err error
497
498 l := i.Logger.With("handler", "ingestProfile")
499 l = l.With("nsid", e.Commit.Collection)
500
501 if e.Commit.RKey != "self" {
502 return fmt.Errorf("ingestProfile only ingests `self` record")
503 }
504
505 switch e.Commit.Operation {
506 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
507 raw := json.RawMessage(e.Commit.Record)
508 record := tangled.ActorProfile{}
509 err = json.Unmarshal(raw, &record)
510 if err != nil {
511 l.Error("invalid record", "err", err)
512 return err
513 }
514
515 avatar := ""
516 if record.Avatar != nil {
517 avatar = record.Avatar.Ref.String()
518 }
519
520 description := ""
521 if record.Description != nil {
522 description = *record.Description
523 }
524
525 includeBluesky := record.Bluesky
526
527 pronouns := ""
528 if record.Pronouns != nil {
529 pronouns = *record.Pronouns
530 }
531
532 location := ""
533 if record.Location != nil {
534 location = *record.Location
535 }
536
537 var links [5]string
538 for i, l := range record.Links {
539 if i < 5 {
540 links[i] = l
541 }
542 }
543
544 var stats [2]models.VanityStat
545 for i, s := range record.Stats {
546 if i < 2 {
547 stats[i].Kind = models.ParseVanityStatKind(s)
548 }
549 }
550
551 var pinned [6]string
552 for i, r := range record.PinnedRepositories {
553 if i < 6 {
554 pinned[i] = r
555 }
556 }
557
558 var preferredHandle syntax.Handle
559 if record.PreferredHandle != nil {
560 if h, err := syntax.ParseHandle(*record.PreferredHandle); err == nil {
561 ident, identErr := i.IdResolver.ResolveIdent(ctx, did)
562 if identErr == nil && slices.Contains(ident.AlsoKnownAs, "at://"+string(h)) {
563 preferredHandle = h
564 }
565 }
566 }
567
568 profile := models.Profile{
569 Did: did,
570 Avatar: avatar,
571 Description: description,
572 IncludeBluesky: includeBluesky,
573 Location: location,
574 Links: links,
575 Stats: stats,
576 PinnedRepos: pinned,
577 Pronouns: pronouns,
578 PreferredHandle: preferredHandle,
579 }
580
581 tx, err := i.Db.Begin()
582 if err != nil {
583 return fmt.Errorf("failed to start transaction: %w", err)
584 }
585
586 err = db.ValidateProfile(tx, &profile)
587 if err != nil {
588 return fmt.Errorf("invalid profile record")
589 }
590
591 err = db.UpsertProfile(tx, &profile)
592 if err == nil && i.Cache != nil {
593 pipe := i.Cache.Pipeline()
594 didKey := fmt.Sprintf(cache.PreferredHandleByDid, did)
595 if preferredHandle != "" {
596 pipe.Set(ctx, didKey, string(preferredHandle), cache.PreferredHandleTTL)
597 pipe.Set(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(preferredHandle)), did, cache.PreferredHandleTTL)
598 } else {
599 pipe.Del(ctx, didKey)
600 }
601 if _, execErr := pipe.Exec(ctx); execErr != nil {
602 l.Warn("failed to update preferred handle cache", "err", execErr)
603 }
604 }
605 case jmodels.CommitOperationDelete:
606 tx, beginErr := i.Db.Begin()
607 if beginErr != nil {
608 return fmt.Errorf("failed to start transaction: %w", beginErr)
609 }
610
611 priorHandle, phErr := db.GetPreferredHandle(tx, did)
612 if phErr != nil && !errors.Is(phErr, sql.ErrNoRows) {
613 l.Warn("failed to read prior preferred handle", "err", phErr)
614 }
615
616 err = db.DeleteProfile(tx, did)
617 if err == nil && i.Cache != nil {
618 pipe := i.Cache.Pipeline()
619 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByDid, did))
620 if priorHandle != "" {
621 pipe.Del(ctx, fmt.Sprintf(cache.PreferredHandleByHandle, string(priorHandle)))
622 }
623 if _, execErr := pipe.Exec(ctx); execErr != nil {
624 l.Warn("failed to evict preferred handle cache", "err", execErr)
625 }
626 }
627 }
628
629 if err != nil {
630 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
631 }
632
633 return nil
634}
635
636func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
637 did := e.Did
638 var err error
639
640 l := i.Logger.With("handler", "ingestSpindleMember")
641 l = l.With("nsid", e.Commit.Collection)
642
643 switch e.Commit.Operation {
644 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
645 raw := json.RawMessage(e.Commit.Record)
646 record := tangled.SpindleMember{}
647 err = json.Unmarshal(raw, &record)
648 if err != nil {
649 l.Error("invalid record", "err", err)
650 return err
651 }
652
653 // only spindle owner can invite to spindles
654 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
655 if err != nil {
656 return fmt.Errorf("failed to check invite permission: %w", err)
657 }
658 if !ok {
659 if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil {
660 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
661 }
662 ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
663 if err != nil {
664 return fmt.Errorf("failed to re-check invite permission: %w", err)
665 }
666 if !ok {
667 return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance)
668 }
669 }
670
671 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
672 if err != nil {
673 return err
674 }
675
676 if memberId.Handle.IsInvalidHandle() {
677 return fmt.Errorf("invalid handle for member %s", record.Subject)
678 }
679
680 existing, err := db.GetSpindleMembers(i.Db,
681 orm.FilterEq("did", did),
682 orm.FilterEq("rkey", e.Commit.RKey),
683 )
684 if err != nil {
685 return fmt.Errorf("failed to look up existing member: %w", err)
686 }
687 if len(existing) > 1 {
688 return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey)
689 }
690
691 tx, err := i.Db.Begin()
692 if err != nil {
693 return fmt.Errorf("failed to start txn: %w", err)
694 }
695 committed := false
696 defer func() {
697 if committed {
698 return
699 }
700 tx.Rollback()
701 i.Enforcer.E.LoadPolicy()
702 }()
703
704 if len(existing) == 1 {
705 prev := existing[0]
706 if prev.Instance != record.Instance || prev.Subject != memberId.DID {
707 if err = db.RemoveSpindleMember(tx,
708 orm.FilterEq("did", did),
709 orm.FilterEq("rkey", e.Commit.RKey),
710 ); err != nil {
711 return fmt.Errorf("failed to remove stale row: %w", err)
712 }
713 if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil {
714 return fmt.Errorf("failed to remove stale ACL: %w", err)
715 }
716 }
717 }
718
719 if err = db.AddSpindleMember(tx, models.SpindleMember{
720 Did: syntax.DID(did),
721 Rkey: e.Commit.RKey,
722 Instance: record.Instance,
723 Subject: memberId.DID,
724 }); err != nil {
725 return fmt.Errorf("failed to add to db: %w", err)
726 }
727
728 if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil {
729 return fmt.Errorf("failed to update ACLs: %w", err)
730 }
731
732 if err = tx.Commit(); err != nil {
733 return fmt.Errorf("failed to commit txn: %w", err)
734 }
735
736 if err = i.Enforcer.E.SavePolicy(); err != nil {
737 return fmt.Errorf("failed to save ACLs: %w", err)
738 }
739 committed = true
740
741 l.Info("upserted spindle member")
742 case jmodels.CommitOperationDelete:
743 rkey := e.Commit.RKey
744
745 // get record from db first
746 members, err := db.GetSpindleMembers(
747 i.Db,
748 orm.FilterEq("did", did),
749 orm.FilterEq("rkey", rkey),
750 )
751 if err != nil || len(members) != 1 {
752 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
753 }
754 member := members[0]
755
756 tx, err := i.Db.Begin()
757 if err != nil {
758 return fmt.Errorf("failed to start txn: %w", err)
759 }
760 committed := false
761 defer func() {
762 if committed {
763 return
764 }
765 tx.Rollback()
766 i.Enforcer.E.LoadPolicy()
767 }()
768
769 // remove record by rkey && update enforcer
770 if err = db.RemoveSpindleMember(
771 tx,
772 orm.FilterEq("did", did),
773 orm.FilterEq("rkey", rkey),
774 ); err != nil {
775 return fmt.Errorf("failed to remove from db: %w", err)
776 }
777
778 // update enforcer
779 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
780 if err != nil {
781 return fmt.Errorf("failed to update ACLs: %w", err)
782 }
783
784 if err = tx.Commit(); err != nil {
785 return fmt.Errorf("failed to commit txn: %w", err)
786 }
787
788 if err = i.Enforcer.E.SavePolicy(); err != nil {
789 return fmt.Errorf("failed to save ACLs: %w", err)
790 }
791 committed = true
792
793 l.Info("removed spindle member")
794 }
795
796 return nil
797}
798
799func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
800 did := e.Did
801 var err error
802
803 l := i.Logger.With("handler", "ingestSpindle")
804 l = l.With("nsid", e.Commit.Collection)
805
806 switch e.Commit.Operation {
807 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
808 raw := json.RawMessage(e.Commit.Record)
809 record := tangled.Spindle{}
810 err = json.Unmarshal(raw, &record)
811 if err != nil {
812 l.Error("invalid record", "err", err)
813 return err
814 }
815
816 instance := e.Commit.RKey
817
818 err := db.AddSpindle(i.Db, models.Spindle{
819 Owner: syntax.DID(did),
820 Instance: instance,
821 })
822 if err != nil {
823 l.Error("failed to add spindle to db", "err", err, "instance", instance)
824 return err
825 }
826
827 if err := i.verifySpindle(ctx, instance, did); err != nil {
828 l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err)
829 }
830
831 return nil
832
833 case jmodels.CommitOperationDelete:
834 instance := e.Commit.RKey
835
836 // get record from db first
837 spindles, err := db.GetSpindles(
838 ctx,
839 i.Db,
840 orm.FilterEq("owner", did),
841 orm.FilterEq("instance", instance),
842 )
843 if err != nil || len(spindles) != 1 {
844 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
845 }
846 spindle := spindles[0]
847
848 tx, err := i.Db.Begin()
849 if err != nil {
850 return err
851 }
852 defer func() {
853 tx.Rollback()
854 i.Enforcer.E.LoadPolicy()
855 }()
856
857 // remove spindle members first
858 err = db.RemoveSpindleMember(
859 tx,
860 orm.FilterEq("owner", did),
861 orm.FilterEq("instance", instance),
862 )
863 if err != nil {
864 return err
865 }
866
867 err = db.DeleteSpindle(
868 tx,
869 orm.FilterEq("owner", did),
870 orm.FilterEq("instance", instance),
871 )
872 if err != nil {
873 return err
874 }
875
876 if spindle.Verified != nil {
877 err = i.Enforcer.RemoveSpindle(instance)
878 if err != nil {
879 return err
880 }
881 }
882
883 err = tx.Commit()
884 if err != nil {
885 return err
886 }
887
888 err = i.Enforcer.E.SavePolicy()
889 if err != nil {
890 return err
891 }
892 }
893
894 return nil
895}
896
897func (i *Ingester) ingestString(e *jmodels.Event) error {
898 did := e.Did
899 rkey := e.Commit.RKey
900
901 var err error
902
903 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
904 l.Info("ingesting record")
905
906 switch e.Commit.Operation {
907 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
908 raw := json.RawMessage(e.Commit.Record)
909 record := tangled.String{}
910 err = json.Unmarshal(raw, &record)
911 if err != nil {
912 l.Error("invalid record", "err", err)
913 return err
914 }
915
916 string := models.StringFromRecord(did, rkey, record)
917
918 if err = i.Validator.ValidateString(&string); err != nil {
919 l.Error("invalid record", "err", err)
920 return err
921 }
922
923 if err = db.AddString(i.Db, string); err != nil {
924 l.Error("failed to add string", "err", err)
925 return err
926 }
927
928 return nil
929
930 case jmodels.CommitOperationDelete:
931 if err := db.DeleteString(
932 i.Db,
933 orm.FilterEq("did", did),
934 orm.FilterEq("rkey", rkey),
935 ); err != nil {
936 l.Error("failed to delete", "err", err)
937 return fmt.Errorf("failed to delete string record: %w", err)
938 }
939
940 return nil
941 }
942
943 return nil
944}
945
946func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error {
947 did := e.Did
948 var err error
949
950 l := i.Logger.With("handler", "ingestKnotMember")
951 l = l.With("nsid", e.Commit.Collection)
952
953 switch e.Commit.Operation {
954 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
955 raw := json.RawMessage(e.Commit.Record)
956 record := tangled.KnotMember{}
957 err = json.Unmarshal(raw, &record)
958 if err != nil {
959 l.Error("invalid record", "err", err)
960 return err
961 }
962
963 // only knot owner can invite to knots
964 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
965 if err != nil {
966 return fmt.Errorf("failed to check invite permission: %w", err)
967 }
968 if !ok {
969 if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil {
970 return fmt.Errorf("invite denied and verify failed: %w", verifyErr)
971 }
972 ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
973 if err != nil {
974 return fmt.Errorf("failed to re-check invite permission: %w", err)
975 }
976 if !ok {
977 return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain)
978 }
979 }
980
981 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
982 if err != nil {
983 return err
984 }
985
986 if memberId.Handle.IsInvalidHandle() {
987 return fmt.Errorf("invalid handle for member %s", record.Subject)
988 }
989
990 existing, err := db.GetKnotMembers(i.Db,
991 orm.FilterEq("did", did),
992 orm.FilterEq("rkey", e.Commit.RKey),
993 )
994 if err != nil {
995 return fmt.Errorf("failed to look up existing member: %w", err)
996 }
997 if len(existing) > 1 {
998 return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey)
999 }
1000
1001 tx, err := i.Db.Begin()
1002 if err != nil {
1003 return fmt.Errorf("failed to start txn: %w", err)
1004 }
1005 committed := false
1006 defer func() {
1007 if committed {
1008 return
1009 }
1010 tx.Rollback()
1011 i.Enforcer.E.LoadPolicy()
1012 }()
1013
1014 if len(existing) == 1 {
1015 prev := existing[0]
1016 if prev.Domain != record.Domain || prev.Subject != memberId.DID {
1017 if err = db.RemoveKnotMember(tx,
1018 orm.FilterEq("did", did),
1019 orm.FilterEq("rkey", e.Commit.RKey),
1020 ); err != nil {
1021 return fmt.Errorf("failed to remove stale row: %w", err)
1022 }
1023 if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil {
1024 return fmt.Errorf("failed to remove stale ACL: %w", err)
1025 }
1026 }
1027 }
1028
1029 if err = db.AddKnotMember(tx, models.KnotMember{
1030 Did: syntax.DID(did),
1031 Rkey: e.Commit.RKey,
1032 Domain: record.Domain,
1033 Subject: memberId.DID,
1034 }); err != nil {
1035 return fmt.Errorf("failed to add to db: %w", err)
1036 }
1037
1038 if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil {
1039 return fmt.Errorf("failed to update ACLs: %w", err)
1040 }
1041
1042 if err = tx.Commit(); err != nil {
1043 return fmt.Errorf("failed to commit txn: %w", err)
1044 }
1045
1046 if err = i.Enforcer.E.SavePolicy(); err != nil {
1047 return fmt.Errorf("failed to save ACLs: %w", err)
1048 }
1049 committed = true
1050
1051 l.Info("upserted knot member")
1052 case jmodels.CommitOperationDelete:
1053 rkey := e.Commit.RKey
1054
1055 members, err := db.GetKnotMembers(
1056 i.Db,
1057 orm.FilterEq("did", did),
1058 orm.FilterEq("rkey", rkey),
1059 )
1060 if err != nil {
1061 return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err)
1062 }
1063 if len(members) == 0 {
1064 l.Info("knot member already removed", "rkey", rkey)
1065 return nil
1066 }
1067 if len(members) > 1 {
1068 return fmt.Errorf("multiple knot members with rkey %s", rkey)
1069 }
1070 member := members[0]
1071
1072 tx, err := i.Db.Begin()
1073 if err != nil {
1074 return fmt.Errorf("failed to start txn: %w", err)
1075 }
1076 committed := false
1077 defer func() {
1078 if committed {
1079 return
1080 }
1081 tx.Rollback()
1082 i.Enforcer.E.LoadPolicy()
1083 }()
1084
1085 if err = db.RemoveKnotMember(
1086 tx,
1087 orm.FilterEq("did", did),
1088 orm.FilterEq("rkey", rkey),
1089 ); err != nil {
1090 return fmt.Errorf("failed to remove from db: %w", err)
1091 }
1092
1093 if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil {
1094 return fmt.Errorf("failed to update ACLs: %w", err)
1095 }
1096
1097 if err = tx.Commit(); err != nil {
1098 return fmt.Errorf("failed to commit txn: %w", err)
1099 }
1100
1101 if err = i.Enforcer.E.SavePolicy(); err != nil {
1102 return fmt.Errorf("failed to save ACLs: %w", err)
1103 }
1104 committed = true
1105
1106 l.Info("removed knot member")
1107 }
1108
1109 return nil
1110}
1111
1112func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error {
1113 did := e.Did
1114 var err error
1115
1116 l := i.Logger.With("handler", "ingestKnot")
1117 l = l.With("nsid", e.Commit.Collection)
1118
1119 switch e.Commit.Operation {
1120 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1121 raw := json.RawMessage(e.Commit.Record)
1122 record := tangled.Knot{}
1123 err = json.Unmarshal(raw, &record)
1124 if err != nil {
1125 l.Error("invalid record", "err", err)
1126 return err
1127 }
1128
1129 domain := e.Commit.RKey
1130
1131 err := db.AddKnot(i.Db, domain, did)
1132 if err != nil {
1133 l.Error("failed to add knot to db", "err", err, "domain", domain)
1134 return err
1135 }
1136
1137 if err := i.verifyKnot(ctx, domain, did); err != nil {
1138 l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err)
1139 }
1140
1141 return nil
1142
1143 case jmodels.CommitOperationDelete:
1144 domain := e.Commit.RKey
1145
1146 // get record from db first
1147 registrations, err := db.GetRegistrations(
1148 i.Db,
1149 orm.FilterEq("domain", domain),
1150 orm.FilterEq("did", did),
1151 )
1152 if err != nil {
1153 return fmt.Errorf("failed to get registration: %w", err)
1154 }
1155 if len(registrations) != 1 {
1156 return fmt.Errorf("got incorrect number of registrations: %d, expected 1", len(registrations))
1157 }
1158 registration := registrations[0]
1159
1160 tx, err := i.Db.Begin()
1161 if err != nil {
1162 return err
1163 }
1164 defer func() {
1165 tx.Rollback()
1166 i.Enforcer.E.LoadPolicy()
1167 }()
1168
1169 err = db.RemoveKnotMember(
1170 tx,
1171 orm.FilterEq("did", did),
1172 orm.FilterEq("domain", domain),
1173 )
1174 if err != nil {
1175 return err
1176 }
1177
1178 err = db.DeleteKnot(
1179 tx,
1180 orm.FilterEq("did", did),
1181 orm.FilterEq("domain", domain),
1182 )
1183 if err != nil {
1184 return err
1185 }
1186
1187 if registration.Registered != nil {
1188 err = i.Enforcer.RemoveKnot(domain)
1189 if err != nil {
1190 return err
1191 }
1192 }
1193
1194 err = tx.Commit()
1195 if err != nil {
1196 return err
1197 }
1198
1199 err = i.Enforcer.E.SavePolicy()
1200 if err != nil {
1201 return err
1202 }
1203 }
1204
1205 return nil
1206}
1207
1208const (
1209 verifyAttempts = 4
1210 verifyMinDelay = 1 * time.Second
1211 verifyMaxDelay = 5 * time.Second
1212)
1213
1214func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error {
1215 regs, err := db.GetRegistrations(i.Db,
1216 orm.FilterEq("domain", domain),
1217 orm.FilterEq("did", did),
1218 )
1219 if err != nil {
1220 return fmt.Errorf("look up registration: %w", err)
1221 }
1222 if len(regs) != 1 {
1223 return fmt.Errorf("no registration for %s by %s", domain, did)
1224 }
1225 if regs[0].Registered != nil {
1226 return nil
1227 }
1228
1229 err = retry.Do(
1230 func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) },
1231 retry.Context(ctx),
1232 retry.Attempts(verifyAttempts),
1233 retry.Delay(verifyMinDelay),
1234 retry.MaxDelay(verifyMaxDelay),
1235 retry.DelayType(retry.BackOffDelay),
1236 retry.LastErrorOnly(true),
1237 )
1238 if err != nil {
1239 return fmt.Errorf("verify: %w", err)
1240 }
1241 return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
1242}
1243
1244func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error {
1245 spindles, err := db.GetSpindles(ctx, i.Db,
1246 orm.FilterEq("instance", instance),
1247 orm.FilterEq("owner", did),
1248 )
1249 if err != nil {
1250 return fmt.Errorf("look up spindle: %w", err)
1251 }
1252 if len(spindles) != 1 {
1253 return fmt.Errorf("no spindle for %s by %s", instance, did)
1254 }
1255 if spindles[0].Verified != nil {
1256 return nil
1257 }
1258
1259 err = retry.Do(
1260 func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) },
1261 retry.Context(ctx),
1262 retry.Attempts(verifyAttempts),
1263 retry.Delay(verifyMinDelay),
1264 retry.MaxDelay(verifyMaxDelay),
1265 retry.DelayType(retry.BackOffDelay),
1266 retry.LastErrorOnly(true),
1267 )
1268 if err != nil {
1269 return fmt.Errorf("verify: %w", err)
1270 }
1271 _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
1272 return err
1273}
1274
1275const sweepConcurrency = 4
1276
1277func (i *Ingester) SweepPendingVerifications() {
1278 l := i.Logger.With("handler", "SweepPendingVerifications")
1279
1280 var g errgroup.Group
1281 g.SetLimit(sweepConcurrency)
1282
1283 regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil))
1284 if err != nil {
1285 l.Error("failed to list unverified knots", "err", err)
1286 } else {
1287 for _, reg := range regs {
1288 g.Go(func() error {
1289 if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil {
1290 l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err)
1291 }
1292 return nil
1293 })
1294 }
1295 }
1296
1297 spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil))
1298 if err != nil {
1299 l.Error("failed to list unverified spindles", "err", err)
1300 g.Wait()
1301 return
1302 }
1303 for _, s := range spindles {
1304 g.Go(func() error {
1305 if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil {
1306 l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err)
1307 }
1308 return nil
1309 })
1310 }
1311 g.Wait()
1312}
1313
1314func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
1315 did := e.Did
1316 rkey := e.Commit.RKey
1317
1318 var err error
1319
1320 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1321 l.Info("ingesting record")
1322
1323 switch e.Commit.Operation {
1324 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1325 raw := json.RawMessage(e.Commit.Record)
1326 record := tangled.RepoIssue{}
1327 err = json.Unmarshal(raw, &record)
1328 if err != nil {
1329 l.Error("invalid record", "err", err)
1330 return err
1331 }
1332
1333 issue := models.IssueFromRecord(did, rkey, record)
1334
1335 if issue.RepoDid == "" {
1336 return fmt.Errorf("issue record has no repo field")
1337 }
1338 if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1339 return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1340 }
1341
1342 if err := i.Validator.ValidateIssue(&issue); err != nil {
1343 return fmt.Errorf("failed to validate issue: %w", err)
1344 }
1345
1346 if record.Repo != "" && !strings.HasPrefix(record.Repo, "did:") {
1347 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo)
1348 if repoErr == nil && repo.RepoDid != "" {
1349 if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1350 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1351 }
1352 }
1353 }
1354
1355 tx, err := i.Db.BeginTx(ctx, nil)
1356 if err != nil {
1357 l.Error("failed to begin transaction", "err", err)
1358 return err
1359 }
1360 defer tx.Rollback()
1361
1362 err = db.PutIssue(tx, &issue)
1363 if err != nil {
1364 l.Error("failed to create issue", "err", err)
1365 return err
1366 }
1367
1368 err = tx.Commit()
1369 if err != nil {
1370 l.Error("failed to commit txn", "err", err)
1371 return err
1372 }
1373
1374 return nil
1375
1376 case jmodels.CommitOperationDelete:
1377 tx, err := i.Db.BeginTx(ctx, nil)
1378 if err != nil {
1379 l.Error("failed to begin transaction", "err", err)
1380 return err
1381 }
1382 defer tx.Rollback()
1383
1384 if err := db.DeleteIssues(
1385 tx,
1386 did,
1387 rkey,
1388 ); err != nil {
1389 l.Error("failed to delete", "err", err)
1390 return fmt.Errorf("failed to delete issue record: %w", err)
1391 }
1392 if err := tx.Commit(); err != nil {
1393 l.Error("failed to commit txn", "err", err)
1394 return err
1395 }
1396
1397 return nil
1398 }
1399
1400 return nil
1401}
1402
1403func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
1404 did := e.Did
1405 rkey := e.Commit.RKey
1406
1407 var err error
1408
1409 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1410 l.Info("ingesting record")
1411
1412 switch e.Commit.Operation {
1413 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1414 raw := json.RawMessage(e.Commit.Record)
1415 record := tangled.RepoPull{}
1416 err = json.Unmarshal(raw, &record)
1417 if err != nil {
1418 l.Error("invalid record", "err", err)
1419 return err
1420 }
1421
1422 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
1423 if err != nil {
1424 l.Error("failed to resolve did")
1425 return err
1426 }
1427
1428 // go through and fetch all blobs in parallel
1429 readers := make([]*io.ReadCloser, len(record.Rounds))
1430 var mu sync.Mutex
1431
1432 g, gctx := errgroup.WithContext(ctx)
1433
1434 for idx, b := range record.Rounds {
1435 g.Go(func() error {
1436 // for some reason, a blob is empty
1437 if b.PatchBlob == nil {
1438 return fmt.Errorf("missing patchBlob in round %d", idx)
1439 }
1440
1441 ownerPds := ownerId.PDSEndpoint()
1442 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
1443 q := url.Query()
1444 q.Set("cid", b.PatchBlob.Ref.String())
1445 q.Set("did", did)
1446 url.RawQuery = q.Encode()
1447
1448 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
1449 if err != nil {
1450 l.Error("failed to create request")
1451 return err
1452 }
1453 req.Header.Set("Content-Type", "application/json")
1454
1455 resp, err := http.DefaultClient.Do(req)
1456 if err != nil {
1457 l.Error("failed to make request")
1458 return err
1459 }
1460
1461 mu.Lock()
1462 readers[idx] = &resp.Body
1463 mu.Unlock()
1464
1465 return nil
1466 })
1467 }
1468
1469 if err := g.Wait(); err != nil {
1470 for _, r := range readers {
1471 if r != nil && *r != nil {
1472 (*r).Close()
1473 }
1474 }
1475 return err
1476 }
1477
1478 defer func() {
1479 for _, r := range readers {
1480 if r != nil && *r != nil {
1481 (*r).Close()
1482 }
1483 }
1484 }()
1485
1486 pull, err := models.PullFromRecord(did, rkey, record, readers)
1487 if err != nil {
1488 return fmt.Errorf("failed to parse pull from record: %w", err)
1489 }
1490 if err := i.Validator.ValidatePull(pull); err != nil {
1491 return fmt.Errorf("failed to validate pull: %w", err)
1492 }
1493
1494 tx, err := i.Db.BeginTx(ctx, nil)
1495 if err != nil {
1496 l.Error("failed to begin transaction", "err", err)
1497 return err
1498 }
1499 defer tx.Rollback()
1500
1501 err = db.PutPull(tx, pull)
1502 if err != nil {
1503 l.Error("failed to create pull", "err", err)
1504 return err
1505 }
1506
1507 err = tx.Commit()
1508 if err != nil {
1509 l.Error("failed to commit txn", "err", err)
1510 return err
1511 }
1512
1513 return nil
1514
1515 case jmodels.CommitOperationDelete:
1516 tx, err := i.Db.BeginTx(ctx, nil)
1517 if err != nil {
1518 l.Error("failed to begin transaction", "err", err)
1519 return err
1520 }
1521 defer tx.Rollback()
1522
1523 if err := db.AbandonPulls(
1524 tx,
1525 orm.FilterEq("owner_did", did),
1526 orm.FilterEq("rkey", rkey),
1527 ); err != nil {
1528 l.Error("failed to abandon", "err", err)
1529 return fmt.Errorf("failed to abandon pull record: %w", err)
1530 }
1531 if err := tx.Commit(); err != nil {
1532 l.Error("failed to commit txn", "err", err)
1533 return err
1534 }
1535
1536 return nil
1537 }
1538
1539 return nil
1540}
1541
1542func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1543 did := e.Did
1544 rkey := e.Commit.RKey
1545
1546 var err error
1547
1548 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1549 l.Info("ingesting record")
1550
1551 switch e.Commit.Operation {
1552 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1553 raw := json.RawMessage(e.Commit.Record)
1554 record := tangled.RepoIssueComment{}
1555 err = json.Unmarshal(raw, &record)
1556 if err != nil {
1557 return fmt.Errorf("invalid record: %w", err)
1558 }
1559
1560 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1561 if err != nil {
1562 return fmt.Errorf("failed to parse comment from record: %w", err)
1563 }
1564
1565 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1566 return fmt.Errorf("failed to validate comment: %w", err)
1567 }
1568
1569 tx, err := i.Db.Begin()
1570 if err != nil {
1571 return fmt.Errorf("failed to start transaction: %w", err)
1572 }
1573 defer tx.Rollback()
1574
1575 _, err = db.AddIssueComment(tx, *comment)
1576 if err != nil {
1577 return fmt.Errorf("failed to create issue comment: %w", err)
1578 }
1579
1580 return tx.Commit()
1581
1582 case jmodels.CommitOperationDelete:
1583 if err := db.DeleteIssueComments(
1584 i.Db,
1585 orm.FilterEq("did", did),
1586 orm.FilterEq("rkey", rkey),
1587 ); err != nil {
1588 return fmt.Errorf("failed to delete issue comment record: %w", err)
1589 }
1590
1591 return nil
1592 }
1593
1594 return nil
1595}
1596
1597func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1598 did := e.Did
1599 rkey := e.Commit.RKey
1600
1601 var err error
1602
1603 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1604 l.Info("ingesting record")
1605
1606 switch e.Commit.Operation {
1607 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1608 raw := json.RawMessage(e.Commit.Record)
1609 record := tangled.LabelDefinition{}
1610 err = json.Unmarshal(raw, &record)
1611 if err != nil {
1612 return fmt.Errorf("invalid record: %w", err)
1613 }
1614
1615 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1616 if err != nil {
1617 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1618 }
1619
1620 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1621 return fmt.Errorf("failed to validate labeldef: %w", err)
1622 }
1623
1624 _, err = db.AddLabelDefinition(i.Db, def)
1625 if err != nil {
1626 return fmt.Errorf("failed to create labeldef: %w", err)
1627 }
1628
1629 return nil
1630
1631 case jmodels.CommitOperationDelete:
1632 if err := db.DeleteLabelDefinition(
1633 i.Db,
1634 orm.FilterEq("did", did),
1635 orm.FilterEq("rkey", rkey),
1636 ); err != nil {
1637 return fmt.Errorf("failed to delete labeldef record: %w", err)
1638 }
1639
1640 return nil
1641 }
1642
1643 return nil
1644}
1645
1646func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1647 did := e.Did
1648 rkey := e.Commit.RKey
1649
1650 var err error
1651
1652 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1653 l.Info("ingesting record")
1654
1655 switch e.Commit.Operation {
1656 case jmodels.CommitOperationCreate:
1657 raw := json.RawMessage(e.Commit.Record)
1658 record := tangled.LabelOp{}
1659 err = json.Unmarshal(raw, &record)
1660 if err != nil {
1661 return fmt.Errorf("invalid record: %w", err)
1662 }
1663
1664 subject := syntax.ATURI(record.Subject)
1665 collection := subject.Collection()
1666
1667 var repo *models.Repo
1668 switch collection {
1669 case tangled.RepoIssueNSID:
1670 i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1671 if err != nil || len(i) != 1 {
1672 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1673 }
1674 repo = i[0].Repo
1675 default:
1676 return fmt.Errorf("unsupported label subject: %s", collection)
1677 }
1678
1679 actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1680 if err != nil {
1681 return fmt.Errorf("failed to build label application ctx: %w", err)
1682 }
1683
1684 ops := models.LabelOpsFromRecord(did, rkey, record)
1685
1686 for _, o := range ops {
1687 def, ok := actx.Defs[o.OperandKey]
1688 if !ok {
1689 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1690 }
1691 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1692 return fmt.Errorf("failed to validate labelop: %w", err)
1693 }
1694 }
1695
1696 tx, err := i.Db.Begin()
1697 if err != nil {
1698 return err
1699 }
1700 defer tx.Rollback()
1701
1702 for _, o := range ops {
1703 _, err = db.AddLabelOp(tx, &o)
1704 if err != nil {
1705 return fmt.Errorf("failed to add labelop: %w", err)
1706 }
1707 }
1708
1709 if err = tx.Commit(); err != nil {
1710 return err
1711 }
1712 }
1713
1714 return nil
1715}