Monorepo for Tangled tangled.org
11

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "sync" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/log" 17 "tangled.org/core/rbac" 18 "tangled.org/core/spindle/db" 19 "tangled.org/core/tapc" 20) 21 22const ( 23 maxPendingPerRepo = 64 24 pendingCollabTTL = 10 * time.Minute 25) 26 27type pendingCollabEvent struct { 28 evt *tapc.RecordEventData 29 at time.Time 30} 31 32type Tap struct { 33 logger *slog.Logger 34 spindle *Spindle 35 tap tapc.Client 36 pendingMu sync.Mutex 37 pendingCollabs map[syntax.DID][]pendingCollabEvent 38} 39 40func NewTapClient(s *Spindle) *Tap { 41 return &Tap{ 42 logger: log.SubLogger(s.l, "tapclient"), 43 spindle: s, 44 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword), 45 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent), 46 } 47} 48 49func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error { 50 if len(dids) == 0 { 51 return nil 52 } 53 return t.tap.AddRepos(ctx, dids) 54} 55 56func (t *Tap) Start(ctx context.Context) { 57 go t.tap.Connect(ctx, &tapc.SimpleIndexer{ 58 EventHandler: t.processEvent, 59 ConnectHandler: t.onConnect, 60 }) 61 go t.purgePendingCollabsLoop(ctx) 62} 63 64func (t *Tap) onConnect(ctx context.Context) { 65 t.spindle.declareTapInterest(ctx) 66} 67 68func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 69 if evt.Type != tapc.EvtRecord || evt.Record == nil { 70 return nil 71 } 72 switch evt.Record.Collection.String() { 73 case tangled.RepoNSID: 74 return t.processRepo(ctx, evt.Record) 75 case tangled.RepoCollaboratorNSID: 76 return t.processCollaborator(ctx, evt.Record) 77 } 78 return nil 79} 80 81func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 82 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey) 83 84 ownerDid := evt.Did 85 rkey := evt.Rkey 86 87 switch evt.Action { 88 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 89 record := tangled.Repo{} 90 if err := json.Unmarshal(evt.Record, &record); err != nil { 91 l.Warn("skipping invalid repo record", "err", err) 92 return nil 93 } 94 95 hostname := t.spindle.cfg.Server.Hostname 96 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 97 knownRepo := priorErr == nil 98 99 if record.Spindle == nil || *record.Spindle != hostname { 100 if knownRepo { 101 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle) 102 return t.teardownRepo(l, prior, ownerDid, rkey) 103 } 104 return nil 105 } 106 107 if record.RepoDid == nil || *record.RepoDid == "" { 108 l.Warn("skipping repo record without repoDid") 109 return nil 110 } 111 repoDid, err := syntax.ParseDID(*record.RepoDid) 112 if err != nil { 113 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err) 114 return nil 115 } 116 117 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 118 l.Error("failed to add repo policy", "err", err) 119 return fmt.Errorf("add repo policy: %w", err) 120 } 121 122 src := eventconsumer.NewKnotSource(record.Knot) 123 t.spindle.ks.AddSource(t.spindle.rootCtx, src) 124 125 if err := t.spindle.db.AddRepo(db.Repo{ 126 Knot: record.Knot, 127 Owner: ownerDid, 128 Rkey: rkey, 129 RepoDid: repoDid, 130 CreatedAt: record.CreatedAt, 131 }); err != nil { 132 l.Error("failed to add repo row", "err", err) 133 return fmt.Errorf("add repo: %w", err) 134 } 135 136 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil { 137 l.Warn("collapse rename siblings failed", "err", err) 138 } else if removed > 0 { 139 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed) 140 } 141 142 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 143 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 144 } 145 146 t.drainPendingCollabs(ctx, repoDid) 147 148 case tapc.RecordDeleteAction: 149 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 150 if err != nil { 151 l.Info("skipping delete for unknown repo") 152 return nil 153 } 154 return t.teardownRepo(l, repo, ownerDid, rkey) 155 } 156 return nil 157} 158 159func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error { 160 if repo.RepoDid != "" { 161 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid) 162 if err != nil { 163 l.Error("failed to list collaborators for cleanup", "err", err) 164 return fmt.Errorf("list collaborators: %w", err) 165 } 166 for _, c := range collabs { 167 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 168 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err) 169 return fmt.Errorf("remove collaborator policy: %w", err) 170 } 171 } 172 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil { 173 l.Error("failed to clear collaborator rows", "err", err) 174 return err 175 } 176 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 177 l.Error("failed to remove repo policy", "err", err) 178 return fmt.Errorf("remove repo policy: %w", err) 179 } 180 } 181 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil { 182 l.Error("failed to delete repo row", "err", err) 183 return fmt.Errorf("delete repo row: %w", err) 184 } 185 return nil 186} 187 188func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error { 189 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey) 190 191 switch evt.Action { 192 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 193 record := tangled.RepoCollaborator{} 194 if err := json.Unmarshal(evt.Record, &record); err != nil { 195 l.Warn("skipping invalid collaborator record", "err", err) 196 return nil 197 } 198 199 actor := evt.Did 200 rkey := evt.Rkey 201 202 subjectDid, err := syntax.ParseDID(record.Subject) 203 if err != nil { 204 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err) 205 return nil 206 } 207 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil { 208 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err) 209 return nil 210 } 211 212 repoRefDid, err := syntax.ParseDID(record.Repo) 213 if err != nil { 214 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err) 215 return nil 216 } 217 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid) 218 if errors.Is(lookupErr, sql.ErrNoRows) { 219 t.bufferCollab(repoRefDid, evt) 220 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid) 221 return nil 222 } 223 if lookupErr != nil { 224 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr) 225 } 226 repoDid := repo.RepoDid 227 ownerDid := repo.Owner 228 229 if actor != ownerDid { 230 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid) 231 return nil 232 } 233 234 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String()) 235 if err != nil { 236 l.Error("invite permission check failed", "err", err) 237 return fmt.Errorf("invite check: %w", err) 238 } 239 if !ok { 240 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid) 241 return nil 242 } 243 244 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey) 245 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid) 246 247 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 248 l.Error("failed to add collaborator policy", "err", err) 249 return fmt.Errorf("add collaborator policy: %w", err) 250 } 251 if staleSubject { 252 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil { 253 l.Error("failed to remove stale collaborator policy", "err", err) 254 return fmt.Errorf("remove stale collaborator: %w", err) 255 } 256 } 257 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{ 258 OwnerDid: actor, 259 Rkey: rkey, 260 Subject: subjectDid, 261 RepoDid: repoDid, 262 }); err != nil { 263 l.Error("failed to persist collaborator row", "err", err) 264 return fmt.Errorf("track collaborator: %w", err) 265 } 266 267 case tapc.RecordDeleteAction: 268 actor := evt.Did 269 rkey := evt.Rkey 270 271 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey) 272 if err != nil { 273 l.Info("skipping delete for unknown collaborator record") 274 return nil 275 } 276 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil { 277 l.Error("failed to remove collaborator policy", "err", err) 278 return fmt.Errorf("remove collaborator policy: %w", err) 279 } 280 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil { 281 l.Error("failed to delete collaborator row", "err", err) 282 return fmt.Errorf("delete collaborator row: %w", err) 283 } 284 } 285 return nil 286} 287 288func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) { 289 t.pendingMu.Lock() 290 defer t.pendingMu.Unlock() 291 list := t.pendingCollabs[repoDid] 292 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()}) 293 if len(list) > maxPendingPerRepo { 294 list = list[len(list)-maxPendingPerRepo:] 295 } 296 t.pendingCollabs[repoDid] = list 297} 298 299func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) { 300 t.pendingMu.Lock() 301 list := t.pendingCollabs[repoDid] 302 delete(t.pendingCollabs, repoDid) 303 t.pendingMu.Unlock() 304 if len(list) == 0 { 305 return 306 } 307 cutoff := time.Now().Add(-pendingCollabTTL) 308 for _, p := range list { 309 if p.at.Before(cutoff) { 310 continue 311 } 312 if err := t.processCollaborator(ctx, p.evt); err != nil { 313 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err) 314 } 315 } 316} 317 318func (t *Tap) purgePendingCollabsLoop(ctx context.Context) { 319 ticker := time.NewTicker(pendingCollabTTL / 2) 320 defer ticker.Stop() 321 for { 322 select { 323 case <-ctx.Done(): 324 return 325 case <-ticker.C: 326 t.purgeStalePendingCollabs() 327 } 328 } 329} 330 331func (t *Tap) purgeStalePendingCollabs() { 332 cutoff := time.Now().Add(-pendingCollabTTL) 333 t.pendingMu.Lock() 334 defer t.pendingMu.Unlock() 335 for did, list := range t.pendingCollabs { 336 kept := list[:0] 337 for _, p := range list { 338 if !p.at.Before(cutoff) { 339 kept = append(kept, p) 340 } 341 } 342 if len(kept) == 0 { 343 delete(t.pendingCollabs, did) 344 } else { 345 t.pendingCollabs[did] = kept 346 } 347 } 348}