Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/go-chi/chi/v5" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/jetstream" 20 "tangled.org/core/log" 21 "tangled.org/core/notifier" 22 "tangled.org/core/rbac" 23 "tangled.org/core/spindle/config" 24 "tangled.org/core/spindle/db" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/engines/nixery" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/queue" 29 "tangled.org/core/spindle/secrets" 30 "tangled.org/core/spindle/xrpc" 31 "tangled.org/core/xrpc/serviceauth" 32) 33 34//go:embed motd 35var defaultMotd []byte 36 37const ( 38 rbacDomain = "thisserver" 39) 40 41type Spindle struct { 42 jc *jetstream.JetstreamClient 43 tap *Tap 44 embedTap *embeddedTap 45 db *db.DB 46 e *rbac.Enforcer 47 l *slog.Logger 48 n *notifier.Notifier 49 engs map[string]models.Engine 50 jq *queue.Queue 51 cfg *config.Config 52 ks *eventconsumer.Consumer 53 res *idresolver.Resolver 54 vault secrets.Manager 55 motd []byte 56 motdMu sync.RWMutex 57 workflowSem chan struct{} 58 rootCtx context.Context 59} 60 61// New creates a new Spindle server with the provided configuration and engines. 62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 63 logger := log.FromContext(ctx) 64 65 d, err := db.Make(ctx, cfg.Server.DBPath) 66 if err != nil { 67 return nil, fmt.Errorf("failed to setup db: %w", err) 68 } 69 70 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 71 if err != nil { 72 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 73 } 74 e.E.EnableAutoSave(true) 75 76 n := notifier.New() 77 78 var vault secrets.Manager 79 switch cfg.Server.Secrets.Provider { 80 case "openbao": 81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 83 } 84 vault, err = secrets.NewOpenBaoManager( 85 cfg.Server.Secrets.OpenBao.ProxyAddr, 86 logger, 87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 88 ) 89 if err != nil { 90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 91 } 92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 93 case "sqlite", "": 94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 97 } 98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 99 default: 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 } 102 103 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil { 104 return nil, fmt.Errorf("failed to run startup migrations: %w", err) 105 } 106 107 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 108 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 109 110 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows) 111 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows) 112 113 collections := []string{ 114 tangled.SpindleMemberNSID, 115 tangled.RepoNSID, 116 tangled.RepoCollaboratorNSID, 117 } 118 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 119 if err != nil { 120 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 121 } 122 jc.AddDid(cfg.Server.Owner) 123 124 // Check if the spindle knows about any Dids; 125 dids, err := d.GetAllDids() 126 if err != nil { 127 return nil, fmt.Errorf("failed to get all dids: %w", err) 128 } 129 for _, d := range dids { 130 jc.AddDid(d) 131 } 132 133 knownRepos, err := d.AllRepos() 134 if err != nil { 135 return nil, fmt.Errorf("failed to get known repos: %w", err) 136 } 137 for _, r := range knownRepos { 138 if r.Owner != "" { 139 jc.AddDid(r.Owner.String()) 140 } 141 } 142 143 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 144 145 spindle := &Spindle{ 146 jc: jc, 147 e: e, 148 db: d, 149 l: logger, 150 n: &n, 151 engs: engines, 152 jq: jq, 153 cfg: cfg, 154 res: resolver, 155 vault: vault, 156 motd: defaultMotd, 157 workflowSem: workflowSem, 158 rootCtx: ctx, 159 } 160 161 err = e.AddSpindle(rbacDomain) 162 if err != nil { 163 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 164 } 165 err = spindle.configureOwner() 166 if err != nil { 167 return nil, err 168 } 169 logger.Info("owner set", "did", cfg.Server.Owner) 170 171 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 172 if err != nil { 173 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 174 } 175 176 err = jc.StartJetstream(ctx, spindle.ingest()) 177 if err != nil { 178 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 179 } 180 181 // for each incoming sh.tangled.pipeline, we execute 182 // spindle.processPipeline, which in turn enqueues the pipeline 183 // job in the above registered queue. 184 ccfg := eventconsumer.NewConsumerConfig() 185 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 186 ccfg.Dev = cfg.Server.Dev 187 ccfg.ProcessFunc = spindle.processPipeline 188 ccfg.CursorStore = cursorStore 189 knownKnots, err := d.Knots() 190 if err != nil { 191 return nil, err 192 } 193 for _, knot := range knownKnots { 194 logger.Info("adding source start", "knot", knot) 195 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 196 } 197 spindle.ks = eventconsumer.NewConsumer(*ccfg) 198 199 if cfg.Server.Tap.Embed { 200 pw, err := randomAdminPassword() 201 if err != nil { 202 return nil, err 203 } 204 cfg.Server.Tap.AdminPassword = pw 205 logger.Info("embedded tap: using random admin password") 206 } 207 spindle.tap = NewTapClient(spindle) 208 209 return spindle, nil 210} 211 212// DB returns the database instance. 213func (s *Spindle) DB() *db.DB { 214 return s.db 215} 216 217// Queue returns the job queue instance. 218func (s *Spindle) Queue() *queue.Queue { 219 return s.jq 220} 221 222// Engines returns the map of available engines. 223func (s *Spindle) Engines() map[string]models.Engine { 224 return s.engs 225} 226 227// Vault returns the secrets manager instance. 228func (s *Spindle) Vault() secrets.Manager { 229 return s.vault 230} 231 232// Notifier returns the notifier instance. 233func (s *Spindle) Notifier() *notifier.Notifier { 234 return s.n 235} 236 237// Enforcer returns the RBAC enforcer instance. 238func (s *Spindle) Enforcer() *rbac.Enforcer { 239 return s.e 240} 241 242// SetMotdContent sets custom MOTD content, replacing the embedded default. 243func (s *Spindle) SetMotdContent(content []byte) { 244 s.motdMu.Lock() 245 defer s.motdMu.Unlock() 246 s.motd = content 247} 248 249// GetMotdContent returns the current MOTD content. 250func (s *Spindle) GetMotdContent() []byte { 251 s.motdMu.RLock() 252 defer s.motdMu.RUnlock() 253 return s.motd 254} 255 256// Start starts the Spindle server (blocking). 257func (s *Spindle) Start(ctx context.Context) error { 258 // starts a job queue runner in the background 259 s.jq.Start() 260 defer s.jq.Stop() 261 262 // Stop vault token renewal if it implements Stopper 263 if stopper, ok := s.vault.(secrets.Stopper); ok { 264 defer stopper.Stop() 265 } 266 267 tapCtx, tapCancel := context.WithCancel(ctx) 268 269 if s.cfg.Server.Tap.Embed { 270 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap")) 271 if err != nil { 272 tapCancel() 273 return fmt.Errorf("starting embedded tap: %w", err) 274 } 275 s.embedTap = emb 276 defer func() { 277 tapCancel() 278 s.embedTap.Shutdown() 279 }() 280 281 go s.watchTapDrain(tapCtx, tapCancel) 282 } else { 283 defer tapCancel() 284 } 285 286 go func() { 287 s.l.Info("starting knot event consumer") 288 s.ks.Start(ctx) 289 }() 290 291 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 292 s.tap.Start(tapCtx) 293 294 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 295 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 296} 297 298func (s *Spindle) declareTapInterest(ctx context.Context) { 299 repos, err := s.db.AllRepos() 300 if err != nil { 301 s.l.Warn("tap declare: failed to load known repos", "err", err) 302 return 303 } 304 seen := make(map[syntax.DID]struct{}, len(repos)) 305 dids := make([]syntax.DID, 0, len(repos)) 306 for _, r := range repos { 307 if r.Owner == "" { 308 continue 309 } 310 if _, ok := seen[r.Owner]; ok { 311 continue 312 } 313 seen[r.Owner] = struct{}{} 314 dids = append(dids, r.Owner) 315 } 316 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil { 317 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err) 318 return 319 } 320 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids)) 321} 322 323func Run(ctx context.Context) error { 324 cfg, err := config.Load(ctx) 325 if err != nil { 326 return fmt.Errorf("failed to load config: %w", err) 327 } 328 329 nixeryEng, err := nixery.New(ctx, cfg) 330 if err != nil { 331 return err 332 } 333 334 s, err := New(ctx, cfg, map[string]models.Engine{ 335 "nixery": nixeryEng, 336 }) 337 if err != nil { 338 return err 339 } 340 341 return s.Start(ctx) 342} 343 344func (s *Spindle) Router() http.Handler { 345 mux := chi.NewRouter() 346 347 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 348 w.Write(s.GetMotdContent()) 349 }) 350 mux.HandleFunc("/events", s.Events) 351 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 352 353 mux.Mount("/xrpc", s.XrpcRouter()) 354 return mux 355} 356 357func (s *Spindle) XrpcRouter() http.Handler { 358 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 359 360 l := log.SubLogger(s.l, "xrpc") 361 362 x := xrpc.Xrpc{ 363 Logger: l, 364 Db: s.db, 365 Enforcer: s.e, 366 Engines: s.engs, 367 Config: s.cfg, 368 Resolver: s.res, 369 Vault: s.vault, 370 Notifier: s.Notifier(), 371 ServiceAuth: serviceAuth, 372 } 373 374 return x.Router() 375} 376 377func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 378 if msg.Nsid == tangled.PipelineNSID { 379 tpl := tangled.Pipeline{} 380 err := json.Unmarshal(msg.EventJson, &tpl) 381 if err != nil { 382 s.l.Error("failed to unmarshal pipeline event", "err", err) 383 return err 384 } 385 386 if tpl.TriggerMetadata == nil { 387 return fmt.Errorf("no trigger metadata found") 388 } 389 390 if tpl.TriggerMetadata.Repo == nil { 391 return fmt.Errorf("no repo data found") 392 } 393 394 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 395 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 396 } 397 398 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 399 if err != nil { 400 return err 401 } 402 403 pipelineId := models.PipelineId{ 404 Knot: src.Key(), 405 Rkey: msg.Rkey, 406 } 407 408 workflows := make(map[models.Engine][]models.Workflow) 409 410 // Build pipeline environment variables once for all workflows 411 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 412 413 for _, w := range tpl.Workflows { 414 if w != nil { 415 if _, ok := s.engs[w.Engine]; !ok { 416 err = s.db.StatusFailed(models.WorkflowId{ 417 PipelineId: pipelineId, 418 Name: w.Name, 419 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 420 if err != nil { 421 return fmt.Errorf("db.StatusFailed: %w", err) 422 } 423 424 continue 425 } 426 427 eng := s.engs[w.Engine] 428 429 if _, ok := workflows[eng]; !ok { 430 workflows[eng] = []models.Workflow{} 431 } 432 433 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 434 if err != nil { 435 err = s.db.StatusFailed(models.WorkflowId{ 436 PipelineId: pipelineId, 437 Name: w.Name, 438 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 439 if err != nil { 440 return fmt.Errorf("db.StatusFailed: %w", err) 441 } 442 443 continue 444 } 445 446 // inject TANGLED_* env vars after InitWorkflow 447 // This prevents user-defined env vars from overriding them 448 if ewf.Environment == nil { 449 ewf.Environment = make(map[string]string) 450 } 451 maps.Copy(ewf.Environment, pipelineEnv) 452 453 workflows[eng] = append(workflows[eng], *ewf) 454 455 err = s.db.StatusPending(models.WorkflowId{ 456 PipelineId: pipelineId, 457 Name: w.Name, 458 }, s.n) 459 if err != nil { 460 return fmt.Errorf("db.StatusPending: %w", err) 461 } 462 } 463 } 464 465 ok := s.jq.Enqueue(queue.Job{ 466 Run: func() error { 467 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{ 468 RepoDid: repoDid, 469 Workflows: workflows, 470 }, pipelineId) 471 return nil 472 }, 473 OnFail: func(jobError error) { 474 s.l.Error("pipeline run failed", "error", jobError) 475 }, 476 }) 477 if ok { 478 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 479 } else { 480 s.l.Error("failed to enqueue pipeline: queue is full") 481 } 482 } 483 484 return nil 485} 486 487func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) { 488 if repo.RepoDid == nil || *repo.RepoDid == "" { 489 return "", fmt.Errorf("pipeline trigger missing repoDid") 490 } 491 repoDid, err := syntax.ParseDID(*repo.RepoDid) 492 if err != nil { 493 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err) 494 } 495 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 496 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 497 } 498 return repoDid, nil 499} 500 501func (s *Spindle) configureOwner() error { 502 cfgOwner := s.cfg.Server.Owner 503 504 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 505 if err != nil { 506 return err 507 } 508 509 switch len(existing) { 510 case 0: 511 // no owner configured, continue 512 case 1: 513 // find existing owner 514 existingOwner := existing[0] 515 516 // no ownership change, this is okay 517 if existingOwner == s.cfg.Server.Owner { 518 break 519 } 520 521 // remove existing owner 522 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 523 if err != nil { 524 return nil 525 } 526 default: 527 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 528 } 529 530 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 531}