Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "sync" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/log" 17 "tangled.org/core/rbac" 18 "tangled.org/core/spindle/db" 19 "tangled.org/core/tapc" 20) 21 22const ( 23 maxPendingPerRepo = 64 24 pendingCollabTTL = 10 * time.Minute 25) 26 27type pendingCollabEvent struct { 28 evt *tapc.RecordEventData 29 at time.Time 30} 31 32type Tap struct { 33 logger *slog.Logger 34 spindle *Spindle 35 tap tapc.Client 36 pendingMu sync.Mutex 37 pendingCollabs map[syntax.DID][]pendingCollabEvent 38} 39 40func NewTapClient(s *Spindle) *Tap { 41 return &Tap{ 42 logger: log.SubLogger(s.l, "tapclient"), 43 spindle: s, 44 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword), 45 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent), 46 } 47} 48 49func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error { 50 if len(dids) == 0 { 51 return nil 52 } 53 return t.tap.AddRepos(ctx, dids) 54} 55 56func (t *Tap) Start(ctx context.Context) { 57 go t.tap.Connect(ctx, &tapc.SimpleIndexer{ 58 EventHandler: t.processEvent, 59 ConnectHandler: t.onConnect, 60 }) 61 go t.purgePendingCollabsLoop(ctx) 62} 63 64func (t *Tap) onConnect(ctx context.Context) { 65 t.spindle.declareTapInterest(ctx) 66} 67 68func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 69 if evt.Type != tapc.EvtRecord || evt.Record == nil { 70 return nil 71 } 72 switch evt.Record.Collection.String() { 73 case tangled.RepoNSID: 74 return t.processRepo(ctx, evt.Record) 75 case tangled.RepoCollaboratorNSID: 76 return t.processCollaborator(ctx, evt.Record) 77 } 78 return nil 79} 80 81func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 82 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey) 83 84 ownerDid := evt.Did 85 rkey := evt.Rkey 86 87 switch evt.Action { 88 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 89 record := tangled.Repo{} 90 if err := json.Unmarshal(evt.Record, &record); err != nil { 91 l.Warn("skipping invalid repo record", "err", err) 92 return nil 93 } 94 95 hostname := t.spindle.cfg.Server.Hostname 96 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 97 knownRepo := priorErr == nil 98 99 if record.Spindle == nil || *record.Spindle != hostname { 100 if knownRepo { 101 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle) 102 return t.teardownRepo(l, prior, ownerDid, rkey) 103 } 104 return nil 105 } 106 107 if record.RepoDid == nil || *record.RepoDid == "" { 108 l.Warn("skipping repo record without repoDid") 109 return nil 110 } 111 repoDid, err := syntax.ParseDID(*record.RepoDid) 112 if err != nil { 113 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err) 114 return nil 115 } 116 117 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 118 l.Error("failed to add repo policy", "err", err) 119 return fmt.Errorf("add repo policy: %w", err) 120 } 121 122 src := eventconsumer.NewKnotSource(record.Knot) 123 t.spindle.ks.AddSource(t.spindle.rootCtx, src) 124 125 if err := t.spindle.db.AddRepo(db.Repo{ 126 Knot: record.Knot, 127 Owner: ownerDid, 128 Rkey: rkey, 129 RepoDid: repoDid, 130 CreatedAt: record.CreatedAt, 131 }); err != nil { 132 l.Error("failed to add repo row", "err", err) 133 return fmt.Errorf("add repo: %w", err) 134 } 135 136 legacyName := "" 137 if record.Name != nil { 138 legacyName = *record.Name 139 } 140 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid) 141 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid) 142 143 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil { 144 l.Warn("collapse rename siblings failed", "err", err) 145 } else if removed > 0 { 146 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed) 147 } 148 149 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 150 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 151 } 152 153 t.drainPendingCollabs(ctx, repoDid) 154 155 case tapc.RecordDeleteAction: 156 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 157 if err != nil { 158 l.Info("skipping delete for unknown repo") 159 return nil 160 } 161 return t.teardownRepo(l, repo, ownerDid, rkey) 162 } 163 return nil 164} 165 166func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error { 167 if repo.RepoDid != "" { 168 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid) 169 if err != nil { 170 l.Error("failed to list collaborators for cleanup", "err", err) 171 return fmt.Errorf("list collaborators: %w", err) 172 } 173 for _, c := range collabs { 174 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 175 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err) 176 return fmt.Errorf("remove collaborator policy: %w", err) 177 } 178 } 179 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil { 180 l.Error("failed to clear collaborator rows", "err", err) 181 return err 182 } 183 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 184 l.Error("failed to remove repo policy", "err", err) 185 return fmt.Errorf("remove repo policy: %w", err) 186 } 187 } 188 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil { 189 l.Error("failed to delete repo row", "err", err) 190 return fmt.Errorf("delete repo row: %w", err) 191 } 192 return nil 193} 194 195func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error { 196 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey) 197 198 switch evt.Action { 199 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 200 record := tangled.RepoCollaborator{} 201 if err := json.Unmarshal(evt.Record, &record); err != nil { 202 l.Warn("skipping invalid collaborator record", "err", err) 203 return nil 204 } 205 206 actor := evt.Did 207 rkey := evt.Rkey 208 209 subjectDid, err := syntax.ParseDID(record.Subject) 210 if err != nil { 211 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err) 212 return nil 213 } 214 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil { 215 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err) 216 return nil 217 } 218 219 repoRefDid, err := syntax.ParseDID(record.Repo) 220 if err != nil { 221 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err) 222 return nil 223 } 224 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid) 225 if errors.Is(lookupErr, sql.ErrNoRows) { 226 t.bufferCollab(repoRefDid, evt) 227 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid) 228 return nil 229 } 230 if lookupErr != nil { 231 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr) 232 } 233 repoDid := repo.RepoDid 234 ownerDid := repo.Owner 235 236 if actor != ownerDid { 237 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid) 238 return nil 239 } 240 241 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String()) 242 if err != nil { 243 l.Error("invite permission check failed", "err", err) 244 return fmt.Errorf("invite check: %w", err) 245 } 246 if !ok { 247 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid) 248 return nil 249 } 250 251 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey) 252 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid) 253 254 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 255 l.Error("failed to add collaborator policy", "err", err) 256 return fmt.Errorf("add collaborator policy: %w", err) 257 } 258 if staleSubject { 259 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil { 260 l.Error("failed to remove stale collaborator policy", "err", err) 261 return fmt.Errorf("remove stale collaborator: %w", err) 262 } 263 } 264 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{ 265 OwnerDid: actor, 266 Rkey: rkey, 267 Subject: subjectDid, 268 RepoDid: repoDid, 269 }); err != nil { 270 l.Error("failed to persist collaborator row", "err", err) 271 return fmt.Errorf("track collaborator: %w", err) 272 } 273 274 case tapc.RecordDeleteAction: 275 actor := evt.Did 276 rkey := evt.Rkey 277 278 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey) 279 if err != nil { 280 l.Info("skipping delete for unknown collaborator record") 281 return nil 282 } 283 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil { 284 l.Error("failed to remove collaborator policy", "err", err) 285 return fmt.Errorf("remove collaborator policy: %w", err) 286 } 287 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil { 288 l.Error("failed to delete collaborator row", "err", err) 289 return fmt.Errorf("delete collaborator row: %w", err) 290 } 291 } 292 return nil 293} 294 295func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) { 296 t.pendingMu.Lock() 297 defer t.pendingMu.Unlock() 298 list := t.pendingCollabs[repoDid] 299 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()}) 300 if len(list) > maxPendingPerRepo { 301 list = list[len(list)-maxPendingPerRepo:] 302 } 303 t.pendingCollabs[repoDid] = list 304} 305 306func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) { 307 t.pendingMu.Lock() 308 list := t.pendingCollabs[repoDid] 309 delete(t.pendingCollabs, repoDid) 310 t.pendingMu.Unlock() 311 if len(list) == 0 { 312 return 313 } 314 cutoff := time.Now().Add(-pendingCollabTTL) 315 for _, p := range list { 316 if p.at.Before(cutoff) { 317 continue 318 } 319 if err := t.processCollaborator(ctx, p.evt); err != nil { 320 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err) 321 } 322 } 323} 324 325func (t *Tap) purgePendingCollabsLoop(ctx context.Context) { 326 ticker := time.NewTicker(pendingCollabTTL / 2) 327 defer ticker.Stop() 328 for { 329 select { 330 case <-ctx.Done(): 331 return 332 case <-ticker.C: 333 t.purgeStalePendingCollabs() 334 } 335 } 336} 337 338func (t *Tap) purgeStalePendingCollabs() { 339 cutoff := time.Now().Add(-pendingCollabTTL) 340 t.pendingMu.Lock() 341 defer t.pendingMu.Unlock() 342 expired := 0 343 for did, list := range t.pendingCollabs { 344 kept := list[:0] 345 for _, p := range list { 346 if !p.at.Before(cutoff) { 347 kept = append(kept, p) 348 } else { 349 expired++ 350 } 351 } 352 if len(kept) == 0 { 353 delete(t.pendingCollabs, did) 354 } else { 355 t.pendingCollabs[did] = kept 356 } 357 } 358 if expired > 0 { 359 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL) 360 } 361}