Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/go-chi/chi/v5" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/eventstream" 19 "tangled.org/core/idresolver" 20 "tangled.org/core/jetstream" 21 "tangled.org/core/log" 22 "tangled.org/core/notifier" 23 "tangled.org/core/rbac" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/db" 26 "tangled.org/core/spindle/engine" 27 "tangled.org/core/spindle/engines/nixery" 28 "tangled.org/core/spindle/models" 29 "tangled.org/core/spindle/queue" 30 "tangled.org/core/spindle/secrets" 31 "tangled.org/core/spindle/xrpc" 32 "tangled.org/core/xrpc/serviceauth" 33) 34 35//go:embed motd 36var defaultMotd []byte 37 38const ( 39 rbacDomain = "thisserver" 40) 41 42type Spindle struct { 43 jc *jetstream.JetstreamClient 44 tap *Tap 45 embedTap *embeddedTap 46 db *db.DB 47 e *rbac.Enforcer 48 l *slog.Logger 49 n *notifier.Notifier 50 engs map[string]models.Engine 51 jq *queue.Queue 52 cfg *config.Config 53 ks *eventconsumer.Consumer 54 res *idresolver.Resolver 55 vault secrets.Manager 56 motd []byte 57 motdMu sync.RWMutex 58 workflowSem chan struct{} 59 rootCtx context.Context 60} 61 62// New creates a new Spindle server with the provided configuration and engines. 63func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 64 logger := log.FromContext(ctx) 65 66 d, err := db.Make(ctx, cfg.Server.DBPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup db: %w", err) 69 } 70 71 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 74 } 75 e.E.EnableAutoSave(true) 76 77 n := notifier.New() 78 79 var vault secrets.Manager 80 switch cfg.Server.Secrets.Provider { 81 case "openbao": 82 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 83 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 84 } 85 vault, err = secrets.NewOpenBaoManager( 86 cfg.Server.Secrets.OpenBao.ProxyAddr, 87 logger, 88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 89 ) 90 if err != nil { 91 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 92 } 93 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 94 case "sqlite", "": 95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 96 if err != nil { 97 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 98 } 99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 100 default: 101 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 102 } 103 104 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil { 105 return nil, fmt.Errorf("failed to run startup migrations: %w", err) 106 } 107 108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 110 111 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows) 112 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows) 113 114 collections := []string{ 115 tangled.SpindleMemberNSID, 116 tangled.RepoNSID, 117 tangled.RepoCollaboratorNSID, 118 } 119 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 120 if err != nil { 121 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 122 } 123 jc.AddDid(cfg.Server.Owner) 124 125 // Check if the spindle knows about any Dids; 126 dids, err := d.GetAllDids() 127 if err != nil { 128 return nil, fmt.Errorf("failed to get all dids: %w", err) 129 } 130 for _, d := range dids { 131 jc.AddDid(d) 132 } 133 134 knownRepos, err := d.AllRepos() 135 if err != nil { 136 return nil, fmt.Errorf("failed to get known repos: %w", err) 137 } 138 for _, r := range knownRepos { 139 if r.Owner != "" { 140 jc.AddDid(r.Owner.String()) 141 } 142 } 143 144 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 145 146 spindle := &Spindle{ 147 jc: jc, 148 e: e, 149 db: d, 150 l: logger, 151 n: &n, 152 engs: engines, 153 jq: jq, 154 cfg: cfg, 155 res: resolver, 156 vault: vault, 157 motd: defaultMotd, 158 workflowSem: workflowSem, 159 rootCtx: ctx, 160 } 161 162 err = e.AddSpindle(rbacDomain) 163 if err != nil { 164 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 165 } 166 err = spindle.configureOwner() 167 if err != nil { 168 return nil, err 169 } 170 logger.Info("owner set", "did", cfg.Server.Owner) 171 172 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 173 if err != nil { 174 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 175 } 176 177 err = jc.StartJetstream(ctx, spindle.ingest()) 178 if err != nil { 179 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 180 } 181 182 // for each incoming sh.tangled.pipeline, we execute 183 // spindle.processPipeline, which in turn enqueues the pipeline 184 // job in the above registered queue. 185 ccfg := eventconsumer.NewConsumerConfig() 186 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 187 ccfg.URLFunc = eventconsumer.DefaultURL(cfg.Server.Dev) 188 ccfg.ProcessFunc = spindle.processPipeline 189 ccfg.CursorStore = cursorStore 190 knownKnots, err := d.Knots() 191 if err != nil { 192 return nil, err 193 } 194 for _, knot := range knownKnots { 195 logger.Info("adding source start", "knot", knot) 196 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 197 } 198 spindle.ks = eventconsumer.NewConsumer(*ccfg) 199 200 if cfg.Server.Tap.Embed { 201 pw, err := randomAdminPassword() 202 if err != nil { 203 return nil, err 204 } 205 cfg.Server.Tap.AdminPassword = pw 206 logger.Info("embedded tap: using random admin password") 207 } 208 spindle.tap = NewTapClient(spindle) 209 210 return spindle, nil 211} 212 213// DB returns the database instance. 214func (s *Spindle) DB() *db.DB { 215 return s.db 216} 217 218// Queue returns the job queue instance. 219func (s *Spindle) Queue() *queue.Queue { 220 return s.jq 221} 222 223// Engines returns the map of available engines. 224func (s *Spindle) Engines() map[string]models.Engine { 225 return s.engs 226} 227 228// Vault returns the secrets manager instance. 229func (s *Spindle) Vault() secrets.Manager { 230 return s.vault 231} 232 233// Notifier returns the notifier instance. 234func (s *Spindle) Notifier() *notifier.Notifier { 235 return s.n 236} 237 238// Enforcer returns the RBAC enforcer instance. 239func (s *Spindle) Enforcer() *rbac.Enforcer { 240 return s.e 241} 242 243// SetMotdContent sets custom MOTD content, replacing the embedded default. 244func (s *Spindle) SetMotdContent(content []byte) { 245 s.motdMu.Lock() 246 defer s.motdMu.Unlock() 247 s.motd = content 248} 249 250// GetMotdContent returns the current MOTD content. 251func (s *Spindle) GetMotdContent() []byte { 252 s.motdMu.RLock() 253 defer s.motdMu.RUnlock() 254 return s.motd 255} 256 257// Start starts the Spindle server (blocking). 258func (s *Spindle) Start(ctx context.Context) error { 259 // starts a job queue runner in the background 260 s.jq.Start() 261 defer s.jq.Stop() 262 263 // Stop vault token renewal if it implements Stopper 264 if stopper, ok := s.vault.(secrets.Stopper); ok { 265 defer stopper.Stop() 266 } 267 268 tapCtx, tapCancel := context.WithCancel(ctx) 269 270 if s.cfg.Server.Tap.Embed { 271 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap")) 272 if err != nil { 273 tapCancel() 274 return fmt.Errorf("starting embedded tap: %w", err) 275 } 276 s.embedTap = emb 277 defer func() { 278 tapCancel() 279 s.embedTap.Shutdown() 280 }() 281 282 go s.watchTapDrain(tapCtx, tapCancel) 283 } else { 284 defer tapCancel() 285 } 286 287 go func() { 288 s.l.Info("starting knot event consumer") 289 s.ks.Start(ctx) 290 }() 291 292 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 293 s.tap.Start(tapCtx) 294 295 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 296 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 297} 298 299func (s *Spindle) declareTapInterest(ctx context.Context) { 300 repos, err := s.db.AllRepos() 301 if err != nil { 302 s.l.Warn("tap declare: failed to load known repos", "err", err) 303 return 304 } 305 seen := make(map[syntax.DID]struct{}, len(repos)) 306 dids := make([]syntax.DID, 0, len(repos)) 307 for _, r := range repos { 308 if r.Owner == "" { 309 continue 310 } 311 if _, ok := seen[r.Owner]; ok { 312 continue 313 } 314 seen[r.Owner] = struct{}{} 315 dids = append(dids, r.Owner) 316 } 317 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil { 318 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err) 319 return 320 } 321 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids)) 322} 323 324func Run(ctx context.Context) error { 325 cfg, err := config.Load(ctx) 326 if err != nil { 327 return fmt.Errorf("failed to load config: %w", err) 328 } 329 330 nixeryEng, err := nixery.New(ctx, cfg) 331 if err != nil { 332 return err 333 } 334 335 s, err := New(ctx, cfg, map[string]models.Engine{ 336 "nixery": nixeryEng, 337 }) 338 if err != nil { 339 return err 340 } 341 342 return s.Start(ctx) 343} 344 345func (s *Spindle) Router() http.Handler { 346 mux := chi.NewRouter() 347 348 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 349 w.Write(s.GetMotdContent()) 350 }) 351 mux.HandleFunc("/events", s.Events) 352 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 353 354 mux.Mount("/xrpc", s.XrpcRouter()) 355 return mux 356} 357 358func (s *Spindle) XrpcRouter() http.Handler { 359 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 360 361 l := log.SubLogger(s.l, "xrpc") 362 363 x := xrpc.Xrpc{ 364 Logger: l, 365 Db: s.db, 366 Enforcer: s.e, 367 Engines: s.engs, 368 Config: s.cfg, 369 Resolver: s.res, 370 Vault: s.vault, 371 Notifier: s.Notifier(), 372 ServiceAuth: serviceAuth, 373 } 374 375 return x.Router() 376} 377 378func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 379 if msg.Nsid == tangled.PipelineNSID { 380 tpl := tangled.Pipeline{} 381 err := json.Unmarshal(msg.EventJson, &tpl) 382 if err != nil { 383 s.l.Error("failed to unmarshal pipeline event", "err", err) 384 return err 385 } 386 387 if tpl.TriggerMetadata == nil { 388 return fmt.Errorf("no trigger metadata found") 389 } 390 391 if tpl.TriggerMetadata.Repo == nil { 392 return fmt.Errorf("no repo data found") 393 } 394 395 if src.Host != tpl.TriggerMetadata.Repo.Knot { 396 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot) 397 } 398 399 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 400 if err != nil { 401 return err 402 } 403 404 pipelineId := models.PipelineId{ 405 Knot: src.Host, 406 Rkey: msg.Rkey, 407 } 408 409 workflows := make(map[models.Engine][]models.Workflow) 410 411 // Build pipeline environment variables once for all workflows 412 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 413 414 for _, w := range tpl.Workflows { 415 if w != nil { 416 if _, ok := s.engs[w.Engine]; !ok { 417 err = s.db.StatusFailed(models.WorkflowId{ 418 PipelineId: pipelineId, 419 Name: w.Name, 420 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 421 if err != nil { 422 return fmt.Errorf("db.StatusFailed: %w", err) 423 } 424 425 continue 426 } 427 428 eng := s.engs[w.Engine] 429 430 if _, ok := workflows[eng]; !ok { 431 workflows[eng] = []models.Workflow{} 432 } 433 434 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 435 if err != nil { 436 err = s.db.StatusFailed(models.WorkflowId{ 437 PipelineId: pipelineId, 438 Name: w.Name, 439 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 440 if err != nil { 441 return fmt.Errorf("db.StatusFailed: %w", err) 442 } 443 444 continue 445 } 446 447 // inject TANGLED_* env vars after InitWorkflow 448 // This prevents user-defined env vars from overriding them 449 if ewf.Environment == nil { 450 ewf.Environment = make(map[string]string) 451 } 452 maps.Copy(ewf.Environment, pipelineEnv) 453 454 workflows[eng] = append(workflows[eng], *ewf) 455 456 err = s.db.StatusPending(models.WorkflowId{ 457 PipelineId: pipelineId, 458 Name: w.Name, 459 }, s.n) 460 if err != nil { 461 return fmt.Errorf("db.StatusPending: %w", err) 462 } 463 } 464 } 465 466 ok := s.jq.Enqueue(queue.Job{ 467 Run: func() error { 468 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{ 469 RepoDid: repoDid, 470 Workflows: workflows, 471 }, pipelineId) 472 return nil 473 }, 474 OnFail: func(jobError error) { 475 s.l.Error("pipeline run failed", "error", jobError) 476 }, 477 }) 478 if ok { 479 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 480 } else { 481 s.l.Error("failed to enqueue pipeline: queue is full") 482 } 483 } 484 485 return nil 486} 487 488func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) { 489 if repo.RepoDid == nil || *repo.RepoDid == "" { 490 return "", fmt.Errorf("pipeline trigger missing repoDid") 491 } 492 repoDid, err := syntax.ParseDID(*repo.RepoDid) 493 if err != nil { 494 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err) 495 } 496 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 497 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 498 } 499 return repoDid, nil 500} 501 502func (s *Spindle) configureOwner() error { 503 cfgOwner := s.cfg.Server.Owner 504 505 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 506 if err != nil { 507 return err 508 } 509 510 switch len(existing) { 511 case 0: 512 // no owner configured, continue 513 case 1: 514 // find existing owner 515 existingOwner := existing[0] 516 517 // no ownership change, this is okay 518 if existingOwner == s.cfg.Server.Owner { 519 break 520 } 521 522 // remove existing owner 523 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 524 if err != nil { 525 return nil 526 } 527 default: 528 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 529 } 530 531 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 532}