Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/spindle/db"
12 "tangled.org/core/tapc"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/jetstream/pkg/models"
16)
17
18type Ingester func(ctx context.Context, e *models.Event) error
19
20func (s *Spindle) ingest() Ingester {
21 return func(ctx context.Context, e *models.Event) error {
22 if e.Kind != models.EventKindCommit {
23 return nil
24 }
25
26 var err error
27 switch e.Commit.Collection {
28 case tangled.SpindleMemberNSID:
29 err = s.ingestMember(ctx, e)
30 case tangled.RepoNSID, tangled.RepoCollaboratorNSID:
31 if evt, ok := jetstreamToTapEvent(e); ok {
32 err = s.tap.processEvent(ctx, evt)
33 }
34 }
35
36 if err != nil {
37 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey, "err", err)
38 }
39
40 lastTimeUs := e.TimeUS + 1
41 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
42 s.l.Error("failed to save cursor", "err", saveErr)
43 }
44
45 return nil
46 }
47}
48
49func jetstreamToTapEvent(e *models.Event) (tapc.Event, bool) {
50 if e.Commit == nil {
51 return tapc.Event{}, false
52 }
53 did, err := syntax.ParseDID(e.Did)
54 if err != nil {
55 return tapc.Event{}, false
56 }
57 var action tapc.RecordAction
58 switch e.Commit.Operation {
59 case models.CommitOperationCreate:
60 action = tapc.RecordCreateAction
61 case models.CommitOperationUpdate:
62 action = tapc.RecordUpdateAction
63 case models.CommitOperationDelete:
64 action = tapc.RecordDeleteAction
65 default:
66 return tapc.Event{}, false
67 }
68 return tapc.Event{
69 Type: tapc.EvtRecord,
70 Record: &tapc.RecordEventData{
71 Did: did,
72 Rkey: syntax.RecordKey(e.Commit.RKey),
73 Collection: syntax.NSID(e.Commit.Collection),
74 Action: action,
75 Record: e.Commit.Record,
76 },
77 }, true
78}
79
80func (s *Spindle) ingestMember(ctx context.Context, e *models.Event) error {
81 did := e.Did
82 rkey := e.Commit.RKey
83 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID, "did", did, "rkey", rkey)
84
85 switch e.Commit.Operation {
86 case models.CommitOperationCreate, models.CommitOperationUpdate:
87 raw := e.Commit.Record
88 record := tangled.SpindleMember{}
89 if err := json.Unmarshal(raw, &record); err != nil {
90 return fmt.Errorf("invalid record: %w", err)
91 }
92
93 domain := s.cfg.Server.Hostname
94 recordInstance := record.Instance
95
96 if recordInstance != domain {
97 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
98 }
99
100 subject, err := syntax.ParseDID(record.Subject)
101 if err != nil {
102 return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err)
103 }
104
105 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
106 if err != nil {
107 return fmt.Errorf("failed to enforce permissions: %w", err)
108 }
109 if !ok {
110 return fmt.Errorf("permission denied for %s", did)
111 }
112
113 sqlTx, err := s.db.BeginTx(ctx, nil)
114 if err != nil {
115 return fmt.Errorf("failed to start txn: %w", err)
116 }
117 committed := false
118 defer func() {
119 if !committed {
120 sqlTx.Rollback()
121 }
122 }()
123
124 existing, err := db.GetSpindleMember(sqlTx, did, rkey)
125 if err != nil && !errors.Is(err, sql.ErrNoRows) {
126 return fmt.Errorf("failed to look up existing member: %w", err)
127 }
128
129 var staleSubject string
130 if existing != nil && existing.Subject != subject {
131 staleSubject = existing.Subject.String()
132 if err := db.RemoveSpindleMember(sqlTx, did, rkey); err != nil {
133 return fmt.Errorf("failed to remove stale member row: %w", err)
134 }
135 }
136
137 if err := db.AddSpindleMember(sqlTx, db.SpindleMember{
138 Did: syntax.DID(did),
139 Rkey: rkey,
140 Instance: recordInstance,
141 Subject: subject,
142 }); err != nil {
143 return fmt.Errorf("failed to add member: %w", err)
144 }
145
146 if err := db.AddDid(sqlTx, subject.String()); err != nil {
147 return fmt.Errorf("failed to add did: %w", err)
148 }
149
150 dropStaleAcl := false
151 var staleDidDropped bool
152 if staleSubject != "" {
153 remaining, err := db.CountSpindleMembersBySubject(sqlTx, staleSubject)
154 if err != nil {
155 return fmt.Errorf("failed to count stale subject rows: %w", err)
156 }
157 if remaining == 0 {
158 dropStaleAcl = true
159 stillNeeded, err := s.e.WouldHaveAnyPolicyExcludingSpindleMember(staleSubject, rbacDomain)
160 if err != nil {
161 return fmt.Errorf("failed to check residual policies for stale subject: %w", err)
162 }
163 if !stillNeeded {
164 if err := db.RemoveDid(sqlTx, staleSubject); err != nil {
165 return fmt.Errorf("failed to remove stale did: %w", err)
166 }
167 staleDidDropped = true
168 }
169 }
170 l.Info("replaced stale spindle member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped)
171 }
172
173 if err := sqlTx.Commit(); err != nil {
174 return fmt.Errorf("failed to commit txn: %w", err)
175 }
176 committed = true
177
178 if dropStaleAcl {
179 if _, err := s.e.TryRemoveSpindleMember(rbacDomain, staleSubject); err != nil {
180 l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err)
181 }
182 }
183 if _, err := s.e.TryAddSpindleMember(rbacDomain, subject.String()); err != nil {
184 l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err)
185 }
186
187 if staleDidDropped {
188 s.jc.RemoveDid(staleSubject)
189 }
190 s.jc.AddDid(subject.String())
191 l.Info("added member from firehose", "member", subject)
192 return nil
193
194 case models.CommitOperationDelete:
195 sqlTx, err := s.db.BeginTx(ctx, nil)
196 if err != nil {
197 return fmt.Errorf("failed to start txn: %w", err)
198 }
199 committed := false
200 defer func() {
201 if !committed {
202 sqlTx.Rollback()
203 }
204 }()
205
206 record, err := db.GetSpindleMember(sqlTx, did, rkey)
207 if errors.Is(err, sql.ErrNoRows) {
208 l.Info("spindle member already removed")
209 return nil
210 }
211 if err != nil {
212 return fmt.Errorf("failed to find member: %w", err)
213 }
214
215 staleSubject := record.Subject.String()
216
217 if err := db.RemoveSpindleMember(sqlTx, did, rkey); err != nil {
218 return fmt.Errorf("failed to remove member: %w", err)
219 }
220
221 remaining, err := db.CountSpindleMembersBySubject(sqlTx, staleSubject)
222 if err != nil {
223 return fmt.Errorf("failed to count remaining member rows: %w", err)
224 }
225
226 dropAcl := false
227 var staleDidDropped bool
228 if remaining == 0 {
229 dropAcl = true
230 stillNeeded, err := s.e.WouldHaveAnyPolicyExcludingSpindleMember(staleSubject, rbacDomain)
231 if err != nil {
232 return fmt.Errorf("failed to check residual policies: %w", err)
233 }
234 if !stillNeeded {
235 if err := db.RemoveDid(sqlTx, staleSubject); err != nil {
236 return fmt.Errorf("failed to remove did: %w", err)
237 }
238 staleDidDropped = true
239 }
240 }
241
242 if err := sqlTx.Commit(); err != nil {
243 return fmt.Errorf("failed to commit txn: %w", err)
244 }
245 committed = true
246
247 if dropAcl {
248 if _, err := s.e.TryRemoveSpindleMember(rbacDomain, staleSubject); err != nil {
249 l.Error("post-commit: failed to remove member ACL", "subject", staleSubject, "err", err)
250 }
251 }
252
253 if staleDidDropped {
254 s.jc.RemoveDid(staleSubject)
255 }
256 l.Info("removed member from firehose", "member", record.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped)
257 }
258 return nil
259}