Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "sync"
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/log"
17 "tangled.org/core/rbac"
18 "tangled.org/core/spindle/db"
19 "tangled.org/core/tapc"
20)
21
22const (
23 maxPendingPerRepo = 64
24 pendingCollabTTL = 10 * time.Minute
25)
26
27type pendingCollabEvent struct {
28 evt *tapc.RecordEventData
29 at time.Time
30}
31
32type Tap struct {
33 logger *slog.Logger
34 spindle *Spindle
35 tap tapc.Client
36 pendingMu sync.Mutex
37 pendingCollabs map[syntax.DID][]pendingCollabEvent
38}
39
40func NewTapClient(s *Spindle) *Tap {
41 return &Tap{
42 logger: log.SubLogger(s.l, "tapclient"),
43 spindle: s,
44 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword),
45 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent),
46 }
47}
48
49func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error {
50 if len(dids) == 0 {
51 return nil
52 }
53 return t.tap.AddRepos(ctx, dids)
54}
55
56func (t *Tap) Start(ctx context.Context) {
57 go t.tap.Connect(ctx, &tapc.SimpleIndexer{
58 EventHandler: t.processEvent,
59 ConnectHandler: t.onConnect,
60 })
61 go t.purgePendingCollabsLoop(ctx)
62}
63
64func (t *Tap) onConnect(ctx context.Context) {
65 t.spindle.declareTapInterest(ctx)
66}
67
68func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
69 if evt.Type != tapc.EvtRecord || evt.Record == nil {
70 return nil
71 }
72 switch evt.Record.Collection.String() {
73 case tangled.RepoNSID:
74 return t.processRepo(ctx, evt.Record)
75 case tangled.RepoCollaboratorNSID:
76 return t.processCollaborator(ctx, evt.Record)
77 }
78 return nil
79}
80
81func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
82 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey)
83
84 ownerDid := evt.Did
85 rkey := evt.Rkey
86
87 switch evt.Action {
88 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
89 record := tangled.Repo{}
90 if err := json.Unmarshal(evt.Record, &record); err != nil {
91 l.Warn("skipping invalid repo record", "err", err)
92 return nil
93 }
94
95 hostname := t.spindle.cfg.Server.Hostname
96 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
97 knownRepo := priorErr == nil
98
99 if record.Spindle == nil || *record.Spindle != hostname {
100 if knownRepo {
101 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle)
102 return t.teardownRepo(l, prior, ownerDid, rkey)
103 }
104 return nil
105 }
106
107 if record.RepoDid == nil || *record.RepoDid == "" {
108 l.Warn("skipping repo record without repoDid")
109 return nil
110 }
111 repoDid, err := syntax.ParseDID(*record.RepoDid)
112 if err != nil {
113 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err)
114 return nil
115 }
116
117 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
118 l.Error("failed to add repo policy", "err", err)
119 return fmt.Errorf("add repo policy: %w", err)
120 }
121
122 src := eventconsumer.NewKnotSource(record.Knot)
123 t.spindle.ks.AddSource(t.spindle.rootCtx, src)
124
125 if err := t.spindle.db.AddRepo(db.Repo{
126 Knot: record.Knot,
127 Owner: ownerDid,
128 Rkey: rkey,
129 RepoDid: repoDid,
130 CreatedAt: record.CreatedAt,
131 }); err != nil {
132 l.Error("failed to add repo row", "err", err)
133 return fmt.Errorf("add repo: %w", err)
134 }
135
136 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil {
137 l.Warn("collapse rename siblings failed", "err", err)
138 } else if removed > 0 {
139 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed)
140 }
141
142 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil {
143 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err)
144 }
145
146 t.drainPendingCollabs(ctx, repoDid)
147
148 case tapc.RecordDeleteAction:
149 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
150 if err != nil {
151 l.Info("skipping delete for unknown repo")
152 return nil
153 }
154 return t.teardownRepo(l, repo, ownerDid, rkey)
155 }
156 return nil
157}
158
159func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error {
160 if repo.RepoDid != "" {
161 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid)
162 if err != nil {
163 l.Error("failed to list collaborators for cleanup", "err", err)
164 return fmt.Errorf("list collaborators: %w", err)
165 }
166 for _, c := range collabs {
167 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
168 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err)
169 return fmt.Errorf("remove collaborator policy: %w", err)
170 }
171 }
172 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil {
173 l.Error("failed to clear collaborator rows", "err", err)
174 return err
175 }
176 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
177 l.Error("failed to remove repo policy", "err", err)
178 return fmt.Errorf("remove repo policy: %w", err)
179 }
180 }
181 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil {
182 l.Error("failed to delete repo row", "err", err)
183 return fmt.Errorf("delete repo row: %w", err)
184 }
185 return nil
186}
187
188func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error {
189 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey)
190
191 switch evt.Action {
192 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
193 record := tangled.RepoCollaborator{}
194 if err := json.Unmarshal(evt.Record, &record); err != nil {
195 l.Warn("skipping invalid collaborator record", "err", err)
196 return nil
197 }
198
199 actor := evt.Did
200 rkey := evt.Rkey
201
202 subjectDid, err := syntax.ParseDID(record.Subject)
203 if err != nil {
204 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err)
205 return nil
206 }
207 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil {
208 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err)
209 return nil
210 }
211
212 repoRefDid, err := syntax.ParseDID(record.Repo)
213 if err != nil {
214 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err)
215 return nil
216 }
217 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid)
218 if errors.Is(lookupErr, sql.ErrNoRows) {
219 t.bufferCollab(repoRefDid, evt)
220 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid)
221 return nil
222 }
223 if lookupErr != nil {
224 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr)
225 }
226 repoDid := repo.RepoDid
227 ownerDid := repo.Owner
228
229 if actor != ownerDid {
230 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid)
231 return nil
232 }
233
234 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String())
235 if err != nil {
236 l.Error("invite permission check failed", "err", err)
237 return fmt.Errorf("invite check: %w", err)
238 }
239 if !ok {
240 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid)
241 return nil
242 }
243
244 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey)
245 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid)
246
247 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
248 l.Error("failed to add collaborator policy", "err", err)
249 return fmt.Errorf("add collaborator policy: %w", err)
250 }
251 if staleSubject {
252 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil {
253 l.Error("failed to remove stale collaborator policy", "err", err)
254 return fmt.Errorf("remove stale collaborator: %w", err)
255 }
256 }
257 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{
258 OwnerDid: actor,
259 Rkey: rkey,
260 Subject: subjectDid,
261 RepoDid: repoDid,
262 }); err != nil {
263 l.Error("failed to persist collaborator row", "err", err)
264 return fmt.Errorf("track collaborator: %w", err)
265 }
266
267 case tapc.RecordDeleteAction:
268 actor := evt.Did
269 rkey := evt.Rkey
270
271 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey)
272 if err != nil {
273 l.Info("skipping delete for unknown collaborator record")
274 return nil
275 }
276 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil {
277 l.Error("failed to remove collaborator policy", "err", err)
278 return fmt.Errorf("remove collaborator policy: %w", err)
279 }
280 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil {
281 l.Error("failed to delete collaborator row", "err", err)
282 return fmt.Errorf("delete collaborator row: %w", err)
283 }
284 }
285 return nil
286}
287
288func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) {
289 t.pendingMu.Lock()
290 defer t.pendingMu.Unlock()
291 list := t.pendingCollabs[repoDid]
292 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()})
293 if len(list) > maxPendingPerRepo {
294 list = list[len(list)-maxPendingPerRepo:]
295 }
296 t.pendingCollabs[repoDid] = list
297}
298
299func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) {
300 t.pendingMu.Lock()
301 list := t.pendingCollabs[repoDid]
302 delete(t.pendingCollabs, repoDid)
303 t.pendingMu.Unlock()
304 if len(list) == 0 {
305 return
306 }
307 cutoff := time.Now().Add(-pendingCollabTTL)
308 for _, p := range list {
309 if p.at.Before(cutoff) {
310 continue
311 }
312 if err := t.processCollaborator(ctx, p.evt); err != nil {
313 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err)
314 }
315 }
316}
317
318func (t *Tap) purgePendingCollabsLoop(ctx context.Context) {
319 ticker := time.NewTicker(pendingCollabTTL / 2)
320 defer ticker.Stop()
321 for {
322 select {
323 case <-ctx.Done():
324 return
325 case <-ticker.C:
326 t.purgeStalePendingCollabs()
327 }
328 }
329}
330
331func (t *Tap) purgeStalePendingCollabs() {
332 cutoff := time.Now().Add(-pendingCollabTTL)
333 t.pendingMu.Lock()
334 defer t.pendingMu.Unlock()
335 for did, list := range t.pendingCollabs {
336 kept := list[:0]
337 for _, p := range list {
338 if !p.at.Before(cutoff) {
339 kept = append(kept, p)
340 }
341 }
342 if len(kept) == 0 {
343 delete(t.pendingCollabs, did)
344 } else {
345 t.pendingCollabs[did] = kept
346 }
347 }
348}