Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "sync"
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/log"
17 "tangled.org/core/rbac"
18 "tangled.org/core/spindle/db"
19 "tangled.org/core/tapc"
20)
21
22const (
23 maxPendingPerRepo = 64
24 pendingCollabTTL = 10 * time.Minute
25)
26
27type pendingCollabEvent struct {
28 evt *tapc.RecordEventData
29 at time.Time
30}
31
32type Tap struct {
33 logger *slog.Logger
34 spindle *Spindle
35 tap tapc.Client
36 pendingMu sync.Mutex
37 pendingCollabs map[syntax.DID][]pendingCollabEvent
38}
39
40func NewTapClient(s *Spindle) *Tap {
41 return &Tap{
42 logger: log.SubLogger(s.l, "tapclient"),
43 spindle: s,
44 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword),
45 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent),
46 }
47}
48
49func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error {
50 if len(dids) == 0 {
51 return nil
52 }
53 return t.tap.AddRepos(ctx, dids)
54}
55
56func (t *Tap) Start(ctx context.Context) {
57 go t.tap.Connect(ctx, &tapc.SimpleIndexer{
58 EventHandler: t.processEvent,
59 ConnectHandler: t.onConnect,
60 })
61 go t.purgePendingCollabsLoop(ctx)
62}
63
64func (t *Tap) onConnect(ctx context.Context) {
65 t.spindle.declareTapInterest(ctx)
66}
67
68func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
69 if evt.Type != tapc.EvtRecord || evt.Record == nil {
70 return nil
71 }
72 switch evt.Record.Collection.String() {
73 case tangled.RepoNSID:
74 return t.processRepo(ctx, evt.Record)
75 case tangled.RepoCollaboratorNSID:
76 return t.processCollaborator(ctx, evt.Record)
77 }
78 return nil
79}
80
81func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
82 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey)
83
84 ownerDid := evt.Did
85 rkey := evt.Rkey
86
87 switch evt.Action {
88 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
89 record := tangled.Repo{}
90 if err := json.Unmarshal(evt.Record, &record); err != nil {
91 l.Warn("skipping invalid repo record", "err", err)
92 return nil
93 }
94
95 hostname := t.spindle.cfg.Server.Hostname
96 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
97 knownRepo := priorErr == nil
98
99 if record.Spindle == nil || *record.Spindle != hostname {
100 if knownRepo {
101 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle)
102 return t.teardownRepo(l, prior, ownerDid, rkey)
103 }
104 return nil
105 }
106
107 if record.RepoDid == nil || *record.RepoDid == "" {
108 l.Warn("skipping repo record without repoDid")
109 return nil
110 }
111 repoDid, err := syntax.ParseDID(*record.RepoDid)
112 if err != nil {
113 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err)
114 return nil
115 }
116
117 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
118 l.Error("failed to add repo policy", "err", err)
119 return fmt.Errorf("add repo policy: %w", err)
120 }
121
122 src := eventconsumer.NewKnotSource(record.Knot)
123 t.spindle.ks.AddSource(t.spindle.rootCtx, src)
124
125 if err := t.spindle.db.AddRepo(db.Repo{
126 Knot: record.Knot,
127 Owner: ownerDid,
128 Rkey: rkey,
129 RepoDid: repoDid,
130 CreatedAt: record.CreatedAt,
131 }); err != nil {
132 l.Error("failed to add repo row", "err", err)
133 return fmt.Errorf("add repo: %w", err)
134 }
135
136 legacyName := ""
137 if record.Name != nil {
138 legacyName = *record.Name
139 }
140 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid)
141 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid)
142
143 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil {
144 l.Warn("collapse rename siblings failed", "err", err)
145 } else if removed > 0 {
146 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed)
147 }
148
149 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil {
150 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err)
151 }
152
153 t.drainPendingCollabs(ctx, repoDid)
154
155 case tapc.RecordDeleteAction:
156 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
157 if err != nil {
158 l.Info("skipping delete for unknown repo")
159 return nil
160 }
161 return t.teardownRepo(l, repo, ownerDid, rkey)
162 }
163 return nil
164}
165
166func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error {
167 if repo.RepoDid != "" {
168 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid)
169 if err != nil {
170 l.Error("failed to list collaborators for cleanup", "err", err)
171 return fmt.Errorf("list collaborators: %w", err)
172 }
173 for _, c := range collabs {
174 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
175 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err)
176 return fmt.Errorf("remove collaborator policy: %w", err)
177 }
178 }
179 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil {
180 l.Error("failed to clear collaborator rows", "err", err)
181 return err
182 }
183 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
184 l.Error("failed to remove repo policy", "err", err)
185 return fmt.Errorf("remove repo policy: %w", err)
186 }
187 }
188 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil {
189 l.Error("failed to delete repo row", "err", err)
190 return fmt.Errorf("delete repo row: %w", err)
191 }
192 return nil
193}
194
195func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error {
196 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey)
197
198 switch evt.Action {
199 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
200 record := tangled.RepoCollaborator{}
201 if err := json.Unmarshal(evt.Record, &record); err != nil {
202 l.Warn("skipping invalid collaborator record", "err", err)
203 return nil
204 }
205
206 actor := evt.Did
207 rkey := evt.Rkey
208
209 subjectDid, err := syntax.ParseDID(record.Subject)
210 if err != nil {
211 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err)
212 return nil
213 }
214 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil {
215 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err)
216 return nil
217 }
218
219 repoRefDid, err := syntax.ParseDID(record.Repo)
220 if err != nil {
221 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err)
222 return nil
223 }
224 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid)
225 if errors.Is(lookupErr, sql.ErrNoRows) {
226 t.bufferCollab(repoRefDid, evt)
227 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid)
228 return nil
229 }
230 if lookupErr != nil {
231 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr)
232 }
233 repoDid := repo.RepoDid
234 ownerDid := repo.Owner
235
236 if actor != ownerDid {
237 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid)
238 return nil
239 }
240
241 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String())
242 if err != nil {
243 l.Error("invite permission check failed", "err", err)
244 return fmt.Errorf("invite check: %w", err)
245 }
246 if !ok {
247 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid)
248 return nil
249 }
250
251 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey)
252 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid)
253
254 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
255 l.Error("failed to add collaborator policy", "err", err)
256 return fmt.Errorf("add collaborator policy: %w", err)
257 }
258 if staleSubject {
259 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil {
260 l.Error("failed to remove stale collaborator policy", "err", err)
261 return fmt.Errorf("remove stale collaborator: %w", err)
262 }
263 }
264 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{
265 OwnerDid: actor,
266 Rkey: rkey,
267 Subject: subjectDid,
268 RepoDid: repoDid,
269 }); err != nil {
270 l.Error("failed to persist collaborator row", "err", err)
271 return fmt.Errorf("track collaborator: %w", err)
272 }
273
274 case tapc.RecordDeleteAction:
275 actor := evt.Did
276 rkey := evt.Rkey
277
278 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey)
279 if err != nil {
280 l.Info("skipping delete for unknown collaborator record")
281 return nil
282 }
283 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil {
284 l.Error("failed to remove collaborator policy", "err", err)
285 return fmt.Errorf("remove collaborator policy: %w", err)
286 }
287 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil {
288 l.Error("failed to delete collaborator row", "err", err)
289 return fmt.Errorf("delete collaborator row: %w", err)
290 }
291 }
292 return nil
293}
294
295func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) {
296 t.pendingMu.Lock()
297 defer t.pendingMu.Unlock()
298 list := t.pendingCollabs[repoDid]
299 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()})
300 if len(list) > maxPendingPerRepo {
301 list = list[len(list)-maxPendingPerRepo:]
302 }
303 t.pendingCollabs[repoDid] = list
304}
305
306func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) {
307 t.pendingMu.Lock()
308 list := t.pendingCollabs[repoDid]
309 delete(t.pendingCollabs, repoDid)
310 t.pendingMu.Unlock()
311 if len(list) == 0 {
312 return
313 }
314 cutoff := time.Now().Add(-pendingCollabTTL)
315 for _, p := range list {
316 if p.at.Before(cutoff) {
317 continue
318 }
319 if err := t.processCollaborator(ctx, p.evt); err != nil {
320 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err)
321 }
322 }
323}
324
325func (t *Tap) purgePendingCollabsLoop(ctx context.Context) {
326 ticker := time.NewTicker(pendingCollabTTL / 2)
327 defer ticker.Stop()
328 for {
329 select {
330 case <-ctx.Done():
331 return
332 case <-ticker.C:
333 t.purgeStalePendingCollabs()
334 }
335 }
336}
337
338func (t *Tap) purgeStalePendingCollabs() {
339 cutoff := time.Now().Add(-pendingCollabTTL)
340 t.pendingMu.Lock()
341 defer t.pendingMu.Unlock()
342 expired := 0
343 for did, list := range t.pendingCollabs {
344 kept := list[:0]
345 for _, p := range list {
346 if !p.at.Before(cutoff) {
347 kept = append(kept, p)
348 } else {
349 expired++
350 }
351 }
352 if len(kept) == 0 {
353 delete(t.pendingCollabs, did)
354 } else {
355 t.pendingCollabs[did] = kept
356 }
357 }
358 if expired > 0 {
359 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL)
360 }
361}