Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "sync"
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/log"
17 "tangled.org/core/rbac"
18 "tangled.org/core/spindle/db"
19 "tangled.org/core/spindle/git"
20 "tangled.org/core/tapc"
21)
22
23const (
24 maxPendingPerRepo = 64
25 pendingCollabTTL = 10 * time.Minute
26)
27
28type pendingCollabEvent struct {
29 evt *tapc.RecordEventData
30 at time.Time
31}
32
33type Tap struct {
34 logger *slog.Logger
35 spindle *Spindle
36 tap tapc.Client
37 pendingMu sync.Mutex
38 pendingCollabs map[syntax.DID][]pendingCollabEvent
39}
40
41func NewTapClient(s *Spindle) *Tap {
42 return &Tap{
43 logger: log.SubLogger(s.l, "tapclient"),
44 spindle: s,
45 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword),
46 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent),
47 }
48}
49
50func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error {
51 if len(dids) == 0 {
52 return nil
53 }
54 return t.tap.AddRepos(ctx, dids)
55}
56
57func (t *Tap) Start(connCtx context.Context) {
58 go t.tap.Connect(connCtx, &tapc.SimpleIndexer{
59 EventHandler: t.processEvent,
60 ConnectHandler: t.onConnect,
61 })
62 go t.purgePendingCollabsLoop(t.spindle.rootCtx)
63}
64
65func (t *Tap) onConnect(ctx context.Context) {
66 t.spindle.declareTapInterest(ctx)
67}
68
69func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
70 if evt.Type != tapc.EvtRecord || evt.Record == nil {
71 return nil
72 }
73 switch evt.Record.Collection.String() {
74 case tangled.RepoNSID:
75 return t.processRepo(ctx, evt.Record)
76 case tangled.RepoCollaboratorNSID:
77 return t.processCollaborator(ctx, evt.Record)
78 }
79 return nil
80}
81
82func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
83 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey)
84
85 ownerDid := evt.Did
86 rkey := evt.Rkey
87
88 switch evt.Action {
89 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
90 record := tangled.Repo{}
91 if err := json.Unmarshal(evt.Record, &record); err != nil {
92 l.Warn("skipping invalid repo record", "err", err)
93 return nil
94 }
95
96 hostname := t.spindle.cfg.Server.Hostname
97 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
98 knownRepo := priorErr == nil
99
100 if record.Spindle == nil || *record.Spindle != hostname {
101 if knownRepo {
102 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle)
103 return t.teardownRepo(l, prior, ownerDid, rkey)
104 }
105 return nil
106 }
107
108 if record.RepoDid == nil || *record.RepoDid == "" {
109 l.Warn("skipping repo record without repoDid")
110 return nil
111 }
112 repoDid, err := syntax.ParseDID(*record.RepoDid)
113 if err != nil {
114 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err)
115 return nil
116 }
117
118 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
119 l.Error("failed to add repo policy", "err", err)
120 return fmt.Errorf("add repo policy: %w", err)
121 }
122
123 src := eventconsumer.NewKnotSource(record.Knot)
124 t.spindle.ks.AddSource(t.spindle.rootCtx, src)
125
126 repo := db.Repo{
127 Knot: record.Knot,
128 Owner: ownerDid,
129 Rkey: rkey,
130 RepoDid: repoDid,
131 CreatedAt: record.CreatedAt,
132 }
133
134 if err := t.spindle.db.AddRepo(repo); err != nil {
135 l.Error("failed to add repo row", "err", err)
136 return fmt.Errorf("add repo: %w", err)
137 }
138
139 // setup sparse sync
140 repoCloneUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid)
141 repoPath := t.spindle.newRepoPath(repo.RepoDid)
142 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil {
143 return fmt.Errorf("setting up sparse-clone git repo: %w", err)
144 }
145
146 legacyName := ""
147 if record.Name != nil {
148 legacyName = *record.Name
149 }
150 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid)
151 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid)
152
153 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil {
154 l.Warn("collapse rename siblings failed", "err", err)
155 } else if removed > 0 {
156 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed)
157 }
158
159 if e := t.spindle.embedTap; e == nil || !e.closed.Load() {
160 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil {
161 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err)
162 }
163 }
164 t.spindle.jc.AddDid(ownerDid.String())
165
166 t.drainPendingCollabs(ctx, repoDid)
167
168 case tapc.RecordDeleteAction:
169 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
170 if err != nil {
171 l.Info("skipping delete for unknown repo")
172 return nil
173 }
174 return t.teardownRepo(l, repo, ownerDid, rkey)
175 }
176 return nil
177}
178
179func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error {
180 if repo.RepoDid != "" {
181 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid)
182 if err != nil {
183 l.Error("failed to list collaborators for cleanup", "err", err)
184 return fmt.Errorf("list collaborators: %w", err)
185 }
186 for _, c := range collabs {
187 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
188 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err)
189 return fmt.Errorf("remove collaborator policy: %w", err)
190 }
191 }
192 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil {
193 l.Error("failed to clear collaborator rows", "err", err)
194 return err
195 }
196 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
197 l.Error("failed to remove repo policy", "err", err)
198 return fmt.Errorf("remove repo policy: %w", err)
199 }
200 }
201 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil {
202 l.Error("failed to delete repo row", "err", err)
203 return fmt.Errorf("delete repo row: %w", err)
204 }
205 // TODO: clear sparse-synced git repo
206 return nil
207}
208
209func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error {
210 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey)
211
212 switch evt.Action {
213 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
214 record := tangled.RepoCollaborator{}
215 if err := json.Unmarshal(evt.Record, &record); err != nil {
216 l.Warn("skipping invalid collaborator record", "err", err)
217 return nil
218 }
219
220 actor := evt.Did
221 rkey := evt.Rkey
222
223 subjectDid, err := syntax.ParseDID(record.Subject)
224 if err != nil {
225 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err)
226 return nil
227 }
228 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil {
229 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err)
230 return nil
231 }
232
233 repoRefDid, err := syntax.ParseDID(record.Repo)
234 if err != nil {
235 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err)
236 return nil
237 }
238 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid)
239 if errors.Is(lookupErr, sql.ErrNoRows) {
240 t.bufferCollab(repoRefDid, evt)
241 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid)
242 return nil
243 }
244 if lookupErr != nil {
245 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr)
246 }
247 repoDid := repo.RepoDid
248 ownerDid := repo.Owner
249
250 if actor != ownerDid {
251 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid)
252 return nil
253 }
254
255 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String())
256 if err != nil {
257 l.Error("invite permission check failed", "err", err)
258 return fmt.Errorf("invite check: %w", err)
259 }
260 if !ok {
261 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid)
262 return nil
263 }
264
265 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey)
266 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid)
267
268 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
269 l.Error("failed to add collaborator policy", "err", err)
270 return fmt.Errorf("add collaborator policy: %w", err)
271 }
272 if staleSubject {
273 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil {
274 l.Error("failed to remove stale collaborator policy", "err", err)
275 return fmt.Errorf("remove stale collaborator: %w", err)
276 }
277 }
278 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{
279 OwnerDid: actor,
280 Rkey: rkey,
281 Subject: subjectDid,
282 RepoDid: repoDid,
283 }); err != nil {
284 l.Error("failed to persist collaborator row", "err", err)
285 return fmt.Errorf("track collaborator: %w", err)
286 }
287
288 case tapc.RecordDeleteAction:
289 actor := evt.Did
290 rkey := evt.Rkey
291
292 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey)
293 if err != nil {
294 l.Info("skipping delete for unknown collaborator record")
295 return nil
296 }
297 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil {
298 l.Error("failed to remove collaborator policy", "err", err)
299 return fmt.Errorf("remove collaborator policy: %w", err)
300 }
301 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil {
302 l.Error("failed to delete collaborator row", "err", err)
303 return fmt.Errorf("delete collaborator row: %w", err)
304 }
305 }
306 return nil
307}
308
309func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) {
310 t.pendingMu.Lock()
311 defer t.pendingMu.Unlock()
312 list := t.pendingCollabs[repoDid]
313 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()})
314 if len(list) > maxPendingPerRepo {
315 list = list[len(list)-maxPendingPerRepo:]
316 }
317 t.pendingCollabs[repoDid] = list
318}
319
320func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) {
321 t.pendingMu.Lock()
322 list := t.pendingCollabs[repoDid]
323 delete(t.pendingCollabs, repoDid)
324 t.pendingMu.Unlock()
325 if len(list) == 0 {
326 return
327 }
328 cutoff := time.Now().Add(-pendingCollabTTL)
329 for _, p := range list {
330 if p.at.Before(cutoff) {
331 continue
332 }
333 if err := t.processCollaborator(ctx, p.evt); err != nil {
334 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err)
335 }
336 }
337}
338
339func (t *Tap) purgePendingCollabsLoop(ctx context.Context) {
340 ticker := time.NewTicker(pendingCollabTTL / 2)
341 defer ticker.Stop()
342 for {
343 select {
344 case <-ctx.Done():
345 return
346 case <-ticker.C:
347 t.purgeStalePendingCollabs()
348 }
349 }
350}
351
352func (t *Tap) purgeStalePendingCollabs() {
353 cutoff := time.Now().Add(-pendingCollabTTL)
354 t.pendingMu.Lock()
355 defer t.pendingMu.Unlock()
356 expired := 0
357 for did, list := range t.pendingCollabs {
358 kept := list[:0]
359 for _, p := range list {
360 if !p.at.Before(cutoff) {
361 kept = append(kept, p)
362 } else {
363 expired++
364 }
365 }
366 if len(kept) == 0 {
367 delete(t.pendingCollabs, did)
368 } else {
369 t.pendingCollabs[did] = kept
370 }
371 }
372 if expired > 0 {
373 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL)
374 }
375}