Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/kyxspm 11 kB View raw
1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "sync" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/log" 17 "tangled.org/core/rbac" 18 "tangled.org/core/spindle/db" 19 "tangled.org/core/tapc" 20) 21 22const ( 23 maxPendingPerRepo = 64 24 pendingCollabTTL = 10 * time.Minute 25) 26 27type pendingCollabEvent struct { 28 evt *tapc.RecordEventData 29 at time.Time 30} 31 32type Tap struct { 33 logger *slog.Logger 34 spindle *Spindle 35 tap tapc.Client 36 pendingMu sync.Mutex 37 pendingCollabs map[syntax.DID][]pendingCollabEvent 38} 39 40func NewTapClient(s *Spindle) *Tap { 41 return &Tap{ 42 logger: log.SubLogger(s.l, "tapclient"), 43 spindle: s, 44 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword), 45 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent), 46 } 47} 48 49func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error { 50 if len(dids) == 0 { 51 return nil 52 } 53 return t.tap.AddRepos(ctx, dids) 54} 55 56func (t *Tap) Start(connCtx context.Context) { 57 go t.tap.Connect(connCtx, &tapc.SimpleIndexer{ 58 EventHandler: t.processEvent, 59 ConnectHandler: t.onConnect, 60 }) 61 go t.purgePendingCollabsLoop(t.spindle.rootCtx) 62} 63 64func (t *Tap) onConnect(ctx context.Context) { 65 t.spindle.declareTapInterest(ctx) 66} 67 68func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 69 if evt.Type != tapc.EvtRecord || evt.Record == nil { 70 return nil 71 } 72 switch evt.Record.Collection.String() { 73 case tangled.RepoNSID: 74 return t.processRepo(ctx, evt.Record) 75 case tangled.RepoCollaboratorNSID: 76 return t.processCollaborator(ctx, evt.Record) 77 } 78 return nil 79} 80 81func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 82 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey) 83 84 ownerDid := evt.Did 85 rkey := evt.Rkey 86 87 switch evt.Action { 88 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 89 record := tangled.Repo{} 90 if err := json.Unmarshal(evt.Record, &record); err != nil { 91 l.Warn("skipping invalid repo record", "err", err) 92 return nil 93 } 94 95 hostname := t.spindle.cfg.Server.Hostname 96 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 97 knownRepo := priorErr == nil 98 99 if record.Spindle == nil || *record.Spindle != hostname { 100 if knownRepo { 101 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle) 102 return t.teardownRepo(l, prior, ownerDid, rkey) 103 } 104 return nil 105 } 106 107 if record.RepoDid == nil || *record.RepoDid == "" { 108 l.Warn("skipping repo record without repoDid") 109 return nil 110 } 111 repoDid, err := syntax.ParseDID(*record.RepoDid) 112 if err != nil { 113 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err) 114 return nil 115 } 116 117 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 118 l.Error("failed to add repo policy", "err", err) 119 return fmt.Errorf("add repo policy: %w", err) 120 } 121 122 src := eventconsumer.NewKnotSource(record.Knot) 123 t.spindle.ks.AddSource(t.spindle.rootCtx, src) 124 125 if err := t.spindle.db.AddRepo(db.Repo{ 126 Knot: record.Knot, 127 Owner: ownerDid, 128 Rkey: rkey, 129 RepoDid: repoDid, 130 CreatedAt: record.CreatedAt, 131 }); err != nil { 132 l.Error("failed to add repo row", "err", err) 133 return fmt.Errorf("add repo: %w", err) 134 } 135 136 legacyName := "" 137 if record.Name != nil { 138 legacyName = *record.Name 139 } 140 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid) 141 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid) 142 143 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil { 144 l.Warn("collapse rename siblings failed", "err", err) 145 } else if removed > 0 { 146 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed) 147 } 148 149 if e := t.spindle.embedTap; e == nil || !e.closed.Load() { 150 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 151 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 152 } 153 } 154 t.spindle.jc.AddDid(ownerDid.String()) 155 156 t.drainPendingCollabs(ctx, repoDid) 157 158 case tapc.RecordDeleteAction: 159 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 160 if err != nil { 161 l.Info("skipping delete for unknown repo") 162 return nil 163 } 164 return t.teardownRepo(l, repo, ownerDid, rkey) 165 } 166 return nil 167} 168 169func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error { 170 if repo.RepoDid != "" { 171 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid) 172 if err != nil { 173 l.Error("failed to list collaborators for cleanup", "err", err) 174 return fmt.Errorf("list collaborators: %w", err) 175 } 176 for _, c := range collabs { 177 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 178 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err) 179 return fmt.Errorf("remove collaborator policy: %w", err) 180 } 181 } 182 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil { 183 l.Error("failed to clear collaborator rows", "err", err) 184 return err 185 } 186 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 187 l.Error("failed to remove repo policy", "err", err) 188 return fmt.Errorf("remove repo policy: %w", err) 189 } 190 } 191 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil { 192 l.Error("failed to delete repo row", "err", err) 193 return fmt.Errorf("delete repo row: %w", err) 194 } 195 return nil 196} 197 198func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error { 199 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey) 200 201 switch evt.Action { 202 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 203 record := tangled.RepoCollaborator{} 204 if err := json.Unmarshal(evt.Record, &record); err != nil { 205 l.Warn("skipping invalid collaborator record", "err", err) 206 return nil 207 } 208 209 actor := evt.Did 210 rkey := evt.Rkey 211 212 subjectDid, err := syntax.ParseDID(record.Subject) 213 if err != nil { 214 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err) 215 return nil 216 } 217 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil { 218 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err) 219 return nil 220 } 221 222 repoRefDid, err := syntax.ParseDID(record.Repo) 223 if err != nil { 224 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err) 225 return nil 226 } 227 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid) 228 if errors.Is(lookupErr, sql.ErrNoRows) { 229 t.bufferCollab(repoRefDid, evt) 230 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid) 231 return nil 232 } 233 if lookupErr != nil { 234 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr) 235 } 236 repoDid := repo.RepoDid 237 ownerDid := repo.Owner 238 239 if actor != ownerDid { 240 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid) 241 return nil 242 } 243 244 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String()) 245 if err != nil { 246 l.Error("invite permission check failed", "err", err) 247 return fmt.Errorf("invite check: %w", err) 248 } 249 if !ok { 250 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid) 251 return nil 252 } 253 254 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey) 255 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid) 256 257 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 258 l.Error("failed to add collaborator policy", "err", err) 259 return fmt.Errorf("add collaborator policy: %w", err) 260 } 261 if staleSubject { 262 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil { 263 l.Error("failed to remove stale collaborator policy", "err", err) 264 return fmt.Errorf("remove stale collaborator: %w", err) 265 } 266 } 267 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{ 268 OwnerDid: actor, 269 Rkey: rkey, 270 Subject: subjectDid, 271 RepoDid: repoDid, 272 }); err != nil { 273 l.Error("failed to persist collaborator row", "err", err) 274 return fmt.Errorf("track collaborator: %w", err) 275 } 276 277 case tapc.RecordDeleteAction: 278 actor := evt.Did 279 rkey := evt.Rkey 280 281 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey) 282 if err != nil { 283 l.Info("skipping delete for unknown collaborator record") 284 return nil 285 } 286 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil { 287 l.Error("failed to remove collaborator policy", "err", err) 288 return fmt.Errorf("remove collaborator policy: %w", err) 289 } 290 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil { 291 l.Error("failed to delete collaborator row", "err", err) 292 return fmt.Errorf("delete collaborator row: %w", err) 293 } 294 } 295 return nil 296} 297 298func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) { 299 t.pendingMu.Lock() 300 defer t.pendingMu.Unlock() 301 list := t.pendingCollabs[repoDid] 302 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()}) 303 if len(list) > maxPendingPerRepo { 304 list = list[len(list)-maxPendingPerRepo:] 305 } 306 t.pendingCollabs[repoDid] = list 307} 308 309func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) { 310 t.pendingMu.Lock() 311 list := t.pendingCollabs[repoDid] 312 delete(t.pendingCollabs, repoDid) 313 t.pendingMu.Unlock() 314 if len(list) == 0 { 315 return 316 } 317 cutoff := time.Now().Add(-pendingCollabTTL) 318 for _, p := range list { 319 if p.at.Before(cutoff) { 320 continue 321 } 322 if err := t.processCollaborator(ctx, p.evt); err != nil { 323 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err) 324 } 325 } 326} 327 328func (t *Tap) purgePendingCollabsLoop(ctx context.Context) { 329 ticker := time.NewTicker(pendingCollabTTL / 2) 330 defer ticker.Stop() 331 for { 332 select { 333 case <-ctx.Done(): 334 return 335 case <-ticker.C: 336 t.purgeStalePendingCollabs() 337 } 338 } 339} 340 341func (t *Tap) purgeStalePendingCollabs() { 342 cutoff := time.Now().Add(-pendingCollabTTL) 343 t.pendingMu.Lock() 344 defer t.pendingMu.Unlock() 345 expired := 0 346 for did, list := range t.pendingCollabs { 347 kept := list[:0] 348 for _, p := range list { 349 if !p.at.Before(cutoff) { 350 kept = append(kept, p) 351 } else { 352 expired++ 353 } 354 } 355 if len(kept) == 0 { 356 delete(t.pendingCollabs, did) 357 } else { 358 t.pendingCollabs[did] = kept 359 } 360 } 361 if expired > 0 { 362 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL) 363 } 364}