Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "sync"
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/log"
17 "tangled.org/core/rbac"
18 "tangled.org/core/spindle/db"
19 "tangled.org/core/tapc"
20)
21
22const (
23 maxPendingPerRepo = 64
24 pendingCollabTTL = 10 * time.Minute
25)
26
27type pendingCollabEvent struct {
28 evt *tapc.RecordEventData
29 at time.Time
30}
31
32type Tap struct {
33 logger *slog.Logger
34 spindle *Spindle
35 tap tapc.Client
36 pendingMu sync.Mutex
37 pendingCollabs map[syntax.DID][]pendingCollabEvent
38}
39
40func NewTapClient(s *Spindle) *Tap {
41 return &Tap{
42 logger: log.SubLogger(s.l, "tapclient"),
43 spindle: s,
44 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword),
45 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent),
46 }
47}
48
49func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error {
50 if len(dids) == 0 {
51 return nil
52 }
53 return t.tap.AddRepos(ctx, dids)
54}
55
56func (t *Tap) Start(connCtx context.Context) {
57 go t.tap.Connect(connCtx, &tapc.SimpleIndexer{
58 EventHandler: t.processEvent,
59 ConnectHandler: t.onConnect,
60 })
61 go t.purgePendingCollabsLoop(t.spindle.rootCtx)
62}
63
64func (t *Tap) onConnect(ctx context.Context) {
65 t.spindle.declareTapInterest(ctx)
66}
67
68func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
69 if evt.Type != tapc.EvtRecord || evt.Record == nil {
70 return nil
71 }
72 switch evt.Record.Collection.String() {
73 case tangled.RepoNSID:
74 return t.processRepo(ctx, evt.Record)
75 case tangled.RepoCollaboratorNSID:
76 return t.processCollaborator(ctx, evt.Record)
77 }
78 return nil
79}
80
81func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
82 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey)
83
84 ownerDid := evt.Did
85 rkey := evt.Rkey
86
87 switch evt.Action {
88 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
89 record := tangled.Repo{}
90 if err := json.Unmarshal(evt.Record, &record); err != nil {
91 l.Warn("skipping invalid repo record", "err", err)
92 return nil
93 }
94
95 hostname := t.spindle.cfg.Server.Hostname
96 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
97 knownRepo := priorErr == nil
98
99 if record.Spindle == nil || *record.Spindle != hostname {
100 if knownRepo {
101 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle)
102 return t.teardownRepo(l, prior, ownerDid, rkey)
103 }
104 return nil
105 }
106
107 if record.RepoDid == nil || *record.RepoDid == "" {
108 l.Warn("skipping repo record without repoDid")
109 return nil
110 }
111 repoDid, err := syntax.ParseDID(*record.RepoDid)
112 if err != nil {
113 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err)
114 return nil
115 }
116
117 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
118 l.Error("failed to add repo policy", "err", err)
119 return fmt.Errorf("add repo policy: %w", err)
120 }
121
122 src := eventconsumer.NewKnotSource(record.Knot)
123 t.spindle.ks.AddSource(t.spindle.rootCtx, src)
124
125 if err := t.spindle.db.AddRepo(db.Repo{
126 Knot: record.Knot,
127 Owner: ownerDid,
128 Rkey: rkey,
129 RepoDid: repoDid,
130 CreatedAt: record.CreatedAt,
131 }); err != nil {
132 l.Error("failed to add repo row", "err", err)
133 return fmt.Errorf("add repo: %w", err)
134 }
135
136 legacyName := ""
137 if record.Name != nil {
138 legacyName = *record.Name
139 }
140 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid)
141 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid)
142
143 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil {
144 l.Warn("collapse rename siblings failed", "err", err)
145 } else if removed > 0 {
146 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed)
147 }
148
149 if e := t.spindle.embedTap; e == nil || !e.closed.Load() {
150 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil {
151 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err)
152 }
153 }
154 t.spindle.jc.AddDid(ownerDid.String())
155
156 t.drainPendingCollabs(ctx, repoDid)
157
158 case tapc.RecordDeleteAction:
159 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
160 if err != nil {
161 l.Info("skipping delete for unknown repo")
162 return nil
163 }
164 return t.teardownRepo(l, repo, ownerDid, rkey)
165 }
166 return nil
167}
168
169func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error {
170 if repo.RepoDid != "" {
171 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid)
172 if err != nil {
173 l.Error("failed to list collaborators for cleanup", "err", err)
174 return fmt.Errorf("list collaborators: %w", err)
175 }
176 for _, c := range collabs {
177 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
178 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err)
179 return fmt.Errorf("remove collaborator policy: %w", err)
180 }
181 }
182 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil {
183 l.Error("failed to clear collaborator rows", "err", err)
184 return err
185 }
186 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
187 l.Error("failed to remove repo policy", "err", err)
188 return fmt.Errorf("remove repo policy: %w", err)
189 }
190 }
191 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil {
192 l.Error("failed to delete repo row", "err", err)
193 return fmt.Errorf("delete repo row: %w", err)
194 }
195 return nil
196}
197
198func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error {
199 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey)
200
201 switch evt.Action {
202 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
203 record := tangled.RepoCollaborator{}
204 if err := json.Unmarshal(evt.Record, &record); err != nil {
205 l.Warn("skipping invalid collaborator record", "err", err)
206 return nil
207 }
208
209 actor := evt.Did
210 rkey := evt.Rkey
211
212 subjectDid, err := syntax.ParseDID(record.Subject)
213 if err != nil {
214 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err)
215 return nil
216 }
217 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil {
218 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err)
219 return nil
220 }
221
222 repoRefDid, err := syntax.ParseDID(record.Repo)
223 if err != nil {
224 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err)
225 return nil
226 }
227 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid)
228 if errors.Is(lookupErr, sql.ErrNoRows) {
229 t.bufferCollab(repoRefDid, evt)
230 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid)
231 return nil
232 }
233 if lookupErr != nil {
234 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr)
235 }
236 repoDid := repo.RepoDid
237 ownerDid := repo.Owner
238
239 if actor != ownerDid {
240 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid)
241 return nil
242 }
243
244 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String())
245 if err != nil {
246 l.Error("invite permission check failed", "err", err)
247 return fmt.Errorf("invite check: %w", err)
248 }
249 if !ok {
250 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid)
251 return nil
252 }
253
254 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey)
255 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid)
256
257 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
258 l.Error("failed to add collaborator policy", "err", err)
259 return fmt.Errorf("add collaborator policy: %w", err)
260 }
261 if staleSubject {
262 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil {
263 l.Error("failed to remove stale collaborator policy", "err", err)
264 return fmt.Errorf("remove stale collaborator: %w", err)
265 }
266 }
267 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{
268 OwnerDid: actor,
269 Rkey: rkey,
270 Subject: subjectDid,
271 RepoDid: repoDid,
272 }); err != nil {
273 l.Error("failed to persist collaborator row", "err", err)
274 return fmt.Errorf("track collaborator: %w", err)
275 }
276
277 case tapc.RecordDeleteAction:
278 actor := evt.Did
279 rkey := evt.Rkey
280
281 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey)
282 if err != nil {
283 l.Info("skipping delete for unknown collaborator record")
284 return nil
285 }
286 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil {
287 l.Error("failed to remove collaborator policy", "err", err)
288 return fmt.Errorf("remove collaborator policy: %w", err)
289 }
290 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil {
291 l.Error("failed to delete collaborator row", "err", err)
292 return fmt.Errorf("delete collaborator row: %w", err)
293 }
294 }
295 return nil
296}
297
298func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) {
299 t.pendingMu.Lock()
300 defer t.pendingMu.Unlock()
301 list := t.pendingCollabs[repoDid]
302 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()})
303 if len(list) > maxPendingPerRepo {
304 list = list[len(list)-maxPendingPerRepo:]
305 }
306 t.pendingCollabs[repoDid] = list
307}
308
309func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) {
310 t.pendingMu.Lock()
311 list := t.pendingCollabs[repoDid]
312 delete(t.pendingCollabs, repoDid)
313 t.pendingMu.Unlock()
314 if len(list) == 0 {
315 return
316 }
317 cutoff := time.Now().Add(-pendingCollabTTL)
318 for _, p := range list {
319 if p.at.Before(cutoff) {
320 continue
321 }
322 if err := t.processCollaborator(ctx, p.evt); err != nil {
323 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err)
324 }
325 }
326}
327
328func (t *Tap) purgePendingCollabsLoop(ctx context.Context) {
329 ticker := time.NewTicker(pendingCollabTTL / 2)
330 defer ticker.Stop()
331 for {
332 select {
333 case <-ctx.Done():
334 return
335 case <-ticker.C:
336 t.purgeStalePendingCollabs()
337 }
338 }
339}
340
341func (t *Tap) purgeStalePendingCollabs() {
342 cutoff := time.Now().Add(-pendingCollabTTL)
343 t.pendingMu.Lock()
344 defer t.pendingMu.Unlock()
345 expired := 0
346 for did, list := range t.pendingCollabs {
347 kept := list[:0]
348 for _, p := range list {
349 if !p.at.Before(cutoff) {
350 kept = append(kept, p)
351 } else {
352 expired++
353 }
354 }
355 if len(kept) == 0 {
356 delete(t.pendingCollabs, did)
357 } else {
358 t.pendingCollabs[did] = kept
359 }
360 }
361 if expired > 0 {
362 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL)
363 }
364}