Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "tangled.org/core/api/tangled"
10 "tangled.org/core/spindle/db"
11
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/bluesky-social/jetstream/pkg/models"
14)
15
16type Ingester func(ctx context.Context, e *models.Event) error
17
18func (s *Spindle) ingest() Ingester {
19 return func(ctx context.Context, e *models.Event) error {
20 if e.Kind != models.EventKindCommit {
21 return nil
22 }
23
24 var err error
25 switch e.Commit.Collection {
26 case tangled.SpindleMemberNSID:
27 err = s.ingestMember(ctx, e)
28 }
29
30 if err != nil {
31 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err)
32 }
33
34 lastTimeUs := e.TimeUS + 1
35 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
36 s.l.Error("failed to save cursor", "err", saveErr)
37 }
38
39 return nil
40 }
41}
42
43func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
44 var err error
45 did := e.Did
46 rkey := e.Commit.RKey
47
48 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
49
50 switch e.Commit.Operation {
51 case models.CommitOperationCreate, models.CommitOperationUpdate:
52 raw := e.Commit.Record
53 record := tangled.SpindleMember{}
54 err = json.Unmarshal(raw, &record)
55 if err != nil {
56 l.Error("invalid record", "error", err)
57 return err
58 }
59
60 domain := s.cfg.Server.Hostname
61 recordInstance := record.Instance
62
63 if recordInstance != domain {
64 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
65 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
66 }
67
68 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
69 if err != nil || !ok {
70 l.Error("failed to add member", "did", did, "error", err)
71 return fmt.Errorf("failed to enforce permissions: %w", err)
72 }
73
74 if err := db.AddSpindleMember(s.db, db.SpindleMember{
75 Did: syntax.DID(did),
76 Rkey: rkey,
77 Instance: recordInstance,
78 Subject: syntax.DID(record.Subject),
79 Created: time.Now(),
80 }); err != nil {
81 l.Error("failed to add member", "error", err)
82 return fmt.Errorf("failed to add member: %w", err)
83 }
84
85 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
86 l.Error("failed to add member", "error", err)
87 return fmt.Errorf("failed to add member: %w", err)
88 }
89 l.Info("added member from firehose", "member", record.Subject)
90
91 if err := s.db.AddDid(record.Subject); err != nil {
92 l.Error("failed to add did", "error", err)
93 return fmt.Errorf("failed to add did: %w", err)
94 }
95 s.jc.AddDid(record.Subject)
96
97 return nil
98
99 case models.CommitOperationDelete:
100 record, err := db.GetSpindleMember(s.db, did, rkey)
101 if err != nil {
102 l.Error("failed to find member", "error", err)
103 return fmt.Errorf("failed to find member: %w", err)
104 }
105
106 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
107 l.Error("failed to remove member", "error", err)
108 return fmt.Errorf("failed to remove member: %w", err)
109 }
110
111 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
112 l.Error("failed to add member", "error", err)
113 return fmt.Errorf("failed to add member: %w", err)
114 }
115 l.Info("added member from firehose", "member", record.Subject)
116
117 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
118 l.Error("failed to add did", "error", err)
119 return fmt.Errorf("failed to add did: %w", err)
120 }
121 s.jc.RemoveDid(record.Subject.String())
122
123 }
124 return nil
125}