Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "sync" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/log" 17 "tangled.org/core/rbac" 18 "tangled.org/core/spindle/db" 19 "tangled.org/core/spindle/git" 20 "tangled.org/core/tapc" 21) 22 23const ( 24 maxPendingPerRepo = 64 25 pendingCollabTTL = 10 * time.Minute 26) 27 28type pendingCollabEvent struct { 29 evt *tapc.RecordEventData 30 at time.Time 31} 32 33type Tap struct { 34 logger *slog.Logger 35 spindle *Spindle 36 tap tapc.Client 37 pendingMu sync.Mutex 38 pendingCollabs map[syntax.DID][]pendingCollabEvent 39} 40 41func NewTapClient(s *Spindle) *Tap { 42 return &Tap{ 43 logger: log.SubLogger(s.l, "tapclient"), 44 spindle: s, 45 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword), 46 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent), 47 } 48} 49 50func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error { 51 if len(dids) == 0 { 52 return nil 53 } 54 return t.tap.AddRepos(ctx, dids) 55} 56 57func (t *Tap) Start(connCtx context.Context) { 58 go t.tap.Connect(connCtx, &tapc.SimpleIndexer{ 59 EventHandler: t.processEvent, 60 ConnectHandler: t.onConnect, 61 }) 62 go t.purgePendingCollabsLoop(t.spindle.rootCtx) 63} 64 65func (t *Tap) onConnect(ctx context.Context) { 66 t.spindle.declareTapInterest(ctx) 67} 68 69func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 70 if evt.Type != tapc.EvtRecord || evt.Record == nil { 71 return nil 72 } 73 switch evt.Record.Collection.String() { 74 case tangled.RepoNSID: 75 return t.processRepo(ctx, evt.Record) 76 case tangled.RepoCollaboratorNSID: 77 return t.processCollaborator(ctx, evt.Record) 78 } 79 return nil 80} 81 82func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 83 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey) 84 85 ownerDid := evt.Did 86 rkey := evt.Rkey 87 88 switch evt.Action { 89 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 90 record := tangled.Repo{} 91 if err := json.Unmarshal(evt.Record, &record); err != nil { 92 l.Warn("skipping invalid repo record", "err", err) 93 return nil 94 } 95 96 hostname := t.spindle.cfg.Server.Hostname 97 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 98 knownRepo := priorErr == nil 99 100 if record.Spindle == nil || *record.Spindle != hostname { 101 if knownRepo { 102 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle) 103 return t.teardownRepo(l, prior, ownerDid, rkey) 104 } 105 return nil 106 } 107 108 if record.RepoDid == nil || *record.RepoDid == "" { 109 l.Warn("skipping repo record without repoDid") 110 return nil 111 } 112 repoDid, err := syntax.ParseDID(*record.RepoDid) 113 if err != nil { 114 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err) 115 return nil 116 } 117 118 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 119 l.Error("failed to add repo policy", "err", err) 120 return fmt.Errorf("add repo policy: %w", err) 121 } 122 123 src := eventconsumer.NewKnotSource(record.Knot) 124 t.spindle.ks.AddSource(t.spindle.rootCtx, src) 125 126 repo := db.Repo{ 127 Knot: record.Knot, 128 Owner: ownerDid, 129 Rkey: rkey, 130 RepoDid: repoDid, 131 CreatedAt: record.CreatedAt, 132 } 133 134 if err := t.spindle.db.AddRepo(repo); err != nil { 135 l.Error("failed to add repo row", "err", err) 136 return fmt.Errorf("add repo: %w", err) 137 } 138 139 // setup sparse sync 140 repoCloneUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid) 141 repoPath := t.spindle.newRepoPath(repo.RepoDid) 142 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { 143 return fmt.Errorf("setting up sparse-clone git repo: %w", err) 144 } 145 146 legacyName := "" 147 if record.Name != nil { 148 legacyName = *record.Name 149 } 150 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid) 151 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid) 152 153 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil { 154 l.Warn("collapse rename siblings failed", "err", err) 155 } else if removed > 0 { 156 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed) 157 } 158 159 if e := t.spindle.embedTap; e == nil || !e.closed.Load() { 160 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil { 161 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err) 162 } 163 } 164 t.spindle.jc.AddDid(ownerDid.String()) 165 166 t.drainPendingCollabs(ctx, repoDid) 167 168 case tapc.RecordDeleteAction: 169 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey) 170 if err != nil { 171 l.Info("skipping delete for unknown repo") 172 return nil 173 } 174 return t.teardownRepo(l, repo, ownerDid, rkey) 175 } 176 return nil 177} 178 179func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error { 180 if repo.RepoDid != "" { 181 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid) 182 if err != nil { 183 l.Error("failed to list collaborators for cleanup", "err", err) 184 return fmt.Errorf("list collaborators: %w", err) 185 } 186 for _, c := range collabs { 187 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 188 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err) 189 return fmt.Errorf("remove collaborator policy: %w", err) 190 } 191 } 192 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil { 193 l.Error("failed to clear collaborator rows", "err", err) 194 return err 195 } 196 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil { 197 l.Error("failed to remove repo policy", "err", err) 198 return fmt.Errorf("remove repo policy: %w", err) 199 } 200 } 201 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil { 202 l.Error("failed to delete repo row", "err", err) 203 return fmt.Errorf("delete repo row: %w", err) 204 } 205 // TODO: clear sparse-synced git repo 206 return nil 207} 208 209func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error { 210 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey) 211 212 switch evt.Action { 213 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 214 record := tangled.RepoCollaborator{} 215 if err := json.Unmarshal(evt.Record, &record); err != nil { 216 l.Warn("skipping invalid collaborator record", "err", err) 217 return nil 218 } 219 220 actor := evt.Did 221 rkey := evt.Rkey 222 223 subjectDid, err := syntax.ParseDID(record.Subject) 224 if err != nil { 225 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err) 226 return nil 227 } 228 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil { 229 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err) 230 return nil 231 } 232 233 repoRefDid, err := syntax.ParseDID(record.Repo) 234 if err != nil { 235 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err) 236 return nil 237 } 238 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid) 239 if errors.Is(lookupErr, sql.ErrNoRows) { 240 t.bufferCollab(repoRefDid, evt) 241 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid) 242 return nil 243 } 244 if lookupErr != nil { 245 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr) 246 } 247 repoDid := repo.RepoDid 248 ownerDid := repo.Owner 249 250 if actor != ownerDid { 251 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid) 252 return nil 253 } 254 255 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String()) 256 if err != nil { 257 l.Error("invite permission check failed", "err", err) 258 return fmt.Errorf("invite check: %w", err) 259 } 260 if !ok { 261 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid) 262 return nil 263 } 264 265 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey) 266 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid) 267 268 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil { 269 l.Error("failed to add collaborator policy", "err", err) 270 return fmt.Errorf("add collaborator policy: %w", err) 271 } 272 if staleSubject { 273 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil { 274 l.Error("failed to remove stale collaborator policy", "err", err) 275 return fmt.Errorf("remove stale collaborator: %w", err) 276 } 277 } 278 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{ 279 OwnerDid: actor, 280 Rkey: rkey, 281 Subject: subjectDid, 282 RepoDid: repoDid, 283 }); err != nil { 284 l.Error("failed to persist collaborator row", "err", err) 285 return fmt.Errorf("track collaborator: %w", err) 286 } 287 288 case tapc.RecordDeleteAction: 289 actor := evt.Did 290 rkey := evt.Rkey 291 292 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey) 293 if err != nil { 294 l.Info("skipping delete for unknown collaborator record") 295 return nil 296 } 297 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil { 298 l.Error("failed to remove collaborator policy", "err", err) 299 return fmt.Errorf("remove collaborator policy: %w", err) 300 } 301 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil { 302 l.Error("failed to delete collaborator row", "err", err) 303 return fmt.Errorf("delete collaborator row: %w", err) 304 } 305 } 306 return nil 307} 308 309func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) { 310 t.pendingMu.Lock() 311 defer t.pendingMu.Unlock() 312 list := t.pendingCollabs[repoDid] 313 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()}) 314 if len(list) > maxPendingPerRepo { 315 list = list[len(list)-maxPendingPerRepo:] 316 } 317 t.pendingCollabs[repoDid] = list 318} 319 320func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) { 321 t.pendingMu.Lock() 322 list := t.pendingCollabs[repoDid] 323 delete(t.pendingCollabs, repoDid) 324 t.pendingMu.Unlock() 325 if len(list) == 0 { 326 return 327 } 328 cutoff := time.Now().Add(-pendingCollabTTL) 329 for _, p := range list { 330 if p.at.Before(cutoff) { 331 continue 332 } 333 if err := t.processCollaborator(ctx, p.evt); err != nil { 334 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err) 335 } 336 } 337} 338 339func (t *Tap) purgePendingCollabsLoop(ctx context.Context) { 340 ticker := time.NewTicker(pendingCollabTTL / 2) 341 defer ticker.Stop() 342 for { 343 select { 344 case <-ctx.Done(): 345 return 346 case <-ticker.C: 347 t.purgeStalePendingCollabs() 348 } 349 } 350} 351 352func (t *Tap) purgeStalePendingCollabs() { 353 cutoff := time.Now().Add(-pendingCollabTTL) 354 t.pendingMu.Lock() 355 defer t.pendingMu.Unlock() 356 expired := 0 357 for did, list := range t.pendingCollabs { 358 kept := list[:0] 359 for _, p := range list { 360 if !p.at.Before(cutoff) { 361 kept = append(kept, p) 362 } else { 363 expired++ 364 } 365 } 366 if len(kept) == 0 { 367 delete(t.pendingCollabs, did) 368 } else { 369 t.pendingCollabs[did] = kept 370 } 371 } 372 if expired > 0 { 373 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL) 374 } 375}