Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/go-chi/chi/v5" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/eventstream" 19 "tangled.org/core/idresolver" 20 "tangled.org/core/jetstream" 21 "tangled.org/core/log" 22 "tangled.org/core/notifier" 23 "tangled.org/core/rbac" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/db" 26 "tangled.org/core/spindle/engine" 27 "tangled.org/core/spindle/engines/dummy" 28 "tangled.org/core/spindle/engines/nixery" 29 "tangled.org/core/spindle/models" 30 "tangled.org/core/spindle/queue" 31 "tangled.org/core/spindle/secrets" 32 "tangled.org/core/spindle/xrpc" 33 "tangled.org/core/xrpc/serviceauth" 34) 35 36//go:embed motd 37var defaultMotd []byte 38 39const ( 40 rbacDomain = "thisserver" 41) 42 43type Spindle struct { 44 jc *jetstream.JetstreamClient 45 tap *Tap 46 embedTap *embeddedTap 47 db *db.DB 48 e *rbac.Enforcer 49 l *slog.Logger 50 n *notifier.Notifier 51 engs map[string]models.Engine 52 jq *queue.Queue 53 cfg *config.Config 54 ks *eventconsumer.Consumer 55 res *idresolver.Resolver 56 vault secrets.Manager 57 motd []byte 58 motdMu sync.RWMutex 59 workflowSem chan struct{} 60 rootCtx context.Context 61} 62 63// New creates a new Spindle server with the provided configuration and engines. 64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 65 logger := log.FromContext(ctx) 66 67 d, err := db.Make(ctx, cfg.Server.DBPath) 68 if err != nil { 69 return nil, fmt.Errorf("failed to setup db: %w", err) 70 } 71 72 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 73 if err != nil { 74 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 75 } 76 e.E.EnableAutoSave(true) 77 78 n := notifier.New() 79 80 var vault secrets.Manager 81 switch cfg.Server.Secrets.Provider { 82 case "openbao": 83 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 84 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 85 } 86 vault, err = secrets.NewOpenBaoManager( 87 cfg.Server.Secrets.OpenBao.ProxyAddr, 88 logger, 89 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 90 ) 91 if err != nil { 92 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 93 } 94 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 95 case "sqlite", "": 96 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 97 if err != nil { 98 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 99 } 100 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 101 default: 102 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 103 } 104 105 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil { 106 return nil, fmt.Errorf("failed to run startup migrations: %w", err) 107 } 108 109 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 110 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 111 112 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows) 113 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows) 114 115 collections := []string{ 116 tangled.SpindleMemberNSID, 117 tangled.RepoNSID, 118 tangled.RepoCollaboratorNSID, 119 } 120 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 121 if err != nil { 122 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 123 } 124 jc.AddDid(cfg.Server.Owner) 125 126 // Check if the spindle knows about any Dids; 127 dids, err := d.GetAllDids() 128 if err != nil { 129 return nil, fmt.Errorf("failed to get all dids: %w", err) 130 } 131 for _, d := range dids { 132 jc.AddDid(d) 133 } 134 135 knownRepos, err := d.AllRepos() 136 if err != nil { 137 return nil, fmt.Errorf("failed to get known repos: %w", err) 138 } 139 for _, r := range knownRepos { 140 if r.Owner != "" { 141 jc.AddDid(r.Owner.String()) 142 } 143 } 144 145 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 146 147 spindle := &Spindle{ 148 jc: jc, 149 e: e, 150 db: d, 151 l: logger, 152 n: &n, 153 engs: engines, 154 jq: jq, 155 cfg: cfg, 156 res: resolver, 157 vault: vault, 158 motd: defaultMotd, 159 workflowSem: workflowSem, 160 rootCtx: ctx, 161 } 162 163 err = e.AddSpindle(rbacDomain) 164 if err != nil { 165 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 166 } 167 err = spindle.configureOwner() 168 if err != nil { 169 return nil, err 170 } 171 logger.Info("owner set", "did", cfg.Server.Owner) 172 173 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 174 if err != nil { 175 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 176 } 177 178 err = jc.StartJetstream(ctx, spindle.ingest()) 179 if err != nil { 180 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 181 } 182 183 // for each incoming sh.tangled.pipeline, we execute 184 // spindle.processPipeline, which in turn enqueues the pipeline 185 // job in the above registered queue. 186 ccfg := eventconsumer.NewConsumerConfig() 187 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 188 ccfg.URLFunc = eventconsumer.DefaultURL(cfg.Server.Dev) 189 ccfg.ProcessFunc = spindle.processPipeline 190 ccfg.CursorStore = cursorStore 191 knownKnots, err := d.Knots() 192 if err != nil { 193 return nil, err 194 } 195 for _, knot := range knownKnots { 196 logger.Info("adding source start", "knot", knot) 197 src := eventconsumer.NewKnotSource(knot) 198 eventconsumer.MigrateLegacyCursor(cursorStore, src) 199 ccfg.Sources[src] = struct{}{} 200 } 201 spindle.ks = eventconsumer.NewConsumer(*ccfg) 202 203 if cfg.Server.Tap.Embed { 204 pw, err := randomAdminPassword() 205 if err != nil { 206 return nil, err 207 } 208 cfg.Server.Tap.AdminPassword = pw 209 logger.Info("embedded tap: using random admin password") 210 } 211 spindle.tap = NewTapClient(spindle) 212 213 return spindle, nil 214} 215 216// DB returns the database instance. 217func (s *Spindle) DB() *db.DB { 218 return s.db 219} 220 221// Queue returns the job queue instance. 222func (s *Spindle) Queue() *queue.Queue { 223 return s.jq 224} 225 226// Engines returns the map of available engines. 227func (s *Spindle) Engines() map[string]models.Engine { 228 return s.engs 229} 230 231// Vault returns the secrets manager instance. 232func (s *Spindle) Vault() secrets.Manager { 233 return s.vault 234} 235 236// Notifier returns the notifier instance. 237func (s *Spindle) Notifier() *notifier.Notifier { 238 return s.n 239} 240 241// Enforcer returns the RBAC enforcer instance. 242func (s *Spindle) Enforcer() *rbac.Enforcer { 243 return s.e 244} 245 246// SetMotdContent sets custom MOTD content, replacing the embedded default. 247func (s *Spindle) SetMotdContent(content []byte) { 248 s.motdMu.Lock() 249 defer s.motdMu.Unlock() 250 s.motd = content 251} 252 253// GetMotdContent returns the current MOTD content. 254func (s *Spindle) GetMotdContent() []byte { 255 s.motdMu.RLock() 256 defer s.motdMu.RUnlock() 257 return s.motd 258} 259 260// Start starts the Spindle server (blocking). 261func (s *Spindle) Start(ctx context.Context) error { 262 // starts a job queue runner in the background 263 s.jq.Start() 264 defer s.jq.Stop() 265 266 // Stop vault token renewal if it implements Stopper 267 if stopper, ok := s.vault.(secrets.Stopper); ok { 268 defer stopper.Stop() 269 } 270 271 tapCtx, tapCancel := context.WithCancel(ctx) 272 273 if s.cfg.Server.Tap.Embed { 274 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap")) 275 if err != nil { 276 tapCancel() 277 return fmt.Errorf("starting embedded tap: %w", err) 278 } 279 s.embedTap = emb 280 defer func() { 281 tapCancel() 282 s.embedTap.Shutdown() 283 }() 284 285 go s.watchTapDrain(tapCtx, tapCancel) 286 } else { 287 defer tapCancel() 288 } 289 290 go func() { 291 s.l.Info("starting knot event consumer") 292 s.ks.Start(ctx) 293 }() 294 295 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 296 s.tap.Start(tapCtx) 297 298 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 299 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 300} 301 302func (s *Spindle) declareTapInterest(ctx context.Context) { 303 repos, err := s.db.AllRepos() 304 if err != nil { 305 s.l.Warn("tap declare: failed to load known repos", "err", err) 306 return 307 } 308 seen := make(map[syntax.DID]struct{}, len(repos)) 309 dids := make([]syntax.DID, 0, len(repos)) 310 for _, r := range repos { 311 if r.Owner == "" { 312 continue 313 } 314 if _, ok := seen[r.Owner]; ok { 315 continue 316 } 317 seen[r.Owner] = struct{}{} 318 dids = append(dids, r.Owner) 319 } 320 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil { 321 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err) 322 return 323 } 324 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids)) 325} 326 327func Run(ctx context.Context) error { 328 cfg, err := config.Load(ctx) 329 if err != nil { 330 return fmt.Errorf("failed to load config: %w", err) 331 } 332 333 nixeryEng, err := nixery.New(ctx, cfg) 334 if err != nil { 335 return err 336 } 337 338 s, err := New(ctx, cfg, map[string]models.Engine{ 339 "nixery": nixeryEng, 340 "dummy": dummy.New(log.FromContext(ctx)), 341 }) 342 if err != nil { 343 return err 344 } 345 346 return s.Start(ctx) 347} 348 349func (s *Spindle) Router() http.Handler { 350 mux := chi.NewRouter() 351 352 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 353 w.Write(s.GetMotdContent()) 354 }) 355 mux.HandleFunc("/events", s.Events) 356 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 357 358 mux.Mount("/xrpc", s.XrpcRouter()) 359 return mux 360} 361 362func (s *Spindle) XrpcRouter() http.Handler { 363 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 364 365 l := log.SubLogger(s.l, "xrpc") 366 367 x := xrpc.Xrpc{ 368 Logger: l, 369 Db: s.db, 370 Enforcer: s.e, 371 Engines: s.engs, 372 Config: s.cfg, 373 Resolver: s.res, 374 Vault: s.vault, 375 Notifier: s.Notifier(), 376 ServiceAuth: serviceAuth, 377 } 378 379 return x.Router() 380} 381 382func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 383 if msg.Nsid == tangled.PipelineNSID { 384 tpl := tangled.Pipeline{} 385 err := json.Unmarshal(msg.EventJson, &tpl) 386 if err != nil { 387 s.l.Error("failed to unmarshal pipeline event", "err", err) 388 return err 389 } 390 391 if tpl.TriggerMetadata == nil { 392 return fmt.Errorf("no trigger metadata found") 393 } 394 395 if tpl.TriggerMetadata.Repo == nil { 396 return fmt.Errorf("no repo data found") 397 } 398 399 if src.Host != tpl.TriggerMetadata.Repo.Knot { 400 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot) 401 } 402 403 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 404 if err != nil { 405 return err 406 } 407 408 pipelineId := models.PipelineId{ 409 Knot: src.Host, 410 Rkey: msg.Rkey, 411 } 412 413 workflows := make(map[models.Engine][]models.Workflow) 414 415 // Build pipeline environment variables once for all workflows 416 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 417 418 for _, w := range tpl.Workflows { 419 if w != nil { 420 if _, ok := s.engs[w.Engine]; !ok { 421 err = s.db.StatusFailed(models.WorkflowId{ 422 PipelineId: pipelineId, 423 Name: w.Name, 424 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 425 if err != nil { 426 return fmt.Errorf("db.StatusFailed: %w", err) 427 } 428 429 continue 430 } 431 432 eng := s.engs[w.Engine] 433 434 if _, ok := workflows[eng]; !ok { 435 workflows[eng] = []models.Workflow{} 436 } 437 438 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 439 if err != nil { 440 err = s.db.StatusFailed(models.WorkflowId{ 441 PipelineId: pipelineId, 442 Name: w.Name, 443 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 444 if err != nil { 445 return fmt.Errorf("db.StatusFailed: %w", err) 446 } 447 448 continue 449 } 450 451 // inject TANGLED_* env vars after InitWorkflow 452 // This prevents user-defined env vars from overriding them 453 if ewf.Environment == nil { 454 ewf.Environment = make(map[string]string) 455 } 456 maps.Copy(ewf.Environment, pipelineEnv) 457 458 workflows[eng] = append(workflows[eng], *ewf) 459 460 err = s.db.StatusPending(models.WorkflowId{ 461 PipelineId: pipelineId, 462 Name: w.Name, 463 }, s.n) 464 if err != nil { 465 return fmt.Errorf("db.StatusPending: %w", err) 466 } 467 } 468 } 469 470 ok := s.jq.Enqueue(queue.Job{ 471 Run: func() error { 472 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{ 473 RepoDid: repoDid, 474 Workflows: workflows, 475 }, pipelineId) 476 return nil 477 }, 478 OnFail: func(jobError error) { 479 s.l.Error("pipeline run failed", "error", jobError) 480 }, 481 }) 482 if ok { 483 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 484 } else { 485 s.l.Error("failed to enqueue pipeline: queue is full") 486 } 487 } 488 489 return nil 490} 491 492func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) { 493 if repo.RepoDid == nil || *repo.RepoDid == "" { 494 return "", fmt.Errorf("pipeline trigger missing repoDid") 495 } 496 repoDid, err := syntax.ParseDID(*repo.RepoDid) 497 if err != nil { 498 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err) 499 } 500 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 501 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 502 } 503 return repoDid, nil 504} 505 506func (s *Spindle) configureOwner() error { 507 cfgOwner := s.cfg.Server.Owner 508 509 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 510 if err != nil { 511 return err 512 } 513 514 switch len(existing) { 515 case 0: 516 // no owner configured, continue 517 case 1: 518 // find existing owner 519 existingOwner := existing[0] 520 521 // no ownership change, this is okay 522 if existingOwner == s.cfg.Server.Owner { 523 break 524 } 525 526 // remove existing owner 527 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 528 if err != nil { 529 return nil 530 } 531 default: 532 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 533 } 534 535 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 536}