Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/go-chi/chi/v5" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/jetstream" 20 "tangled.org/core/log" 21 "tangled.org/core/notifier" 22 "tangled.org/core/rbac" 23 "tangled.org/core/spindle/config" 24 "tangled.org/core/spindle/db" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/engines/nixery" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/queue" 29 "tangled.org/core/spindle/secrets" 30 "tangled.org/core/spindle/xrpc" 31 "tangled.org/core/xrpc/serviceauth" 32) 33 34//go:embed motd 35var defaultMotd []byte 36 37const ( 38 rbacDomain = "thisserver" 39) 40 41type Spindle struct { 42 jc *jetstream.JetstreamClient 43 tap *Tap 44 embedTap *embeddedTap 45 db *db.DB 46 e *rbac.Enforcer 47 l *slog.Logger 48 n *notifier.Notifier 49 engs map[string]models.Engine 50 jq *queue.Queue 51 cfg *config.Config 52 ks *eventconsumer.Consumer 53 res *idresolver.Resolver 54 vault secrets.Manager 55 motd []byte 56 motdMu sync.RWMutex 57 workflowSem chan struct{} 58 rootCtx context.Context 59} 60 61// New creates a new Spindle server with the provided configuration and engines. 62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 63 logger := log.FromContext(ctx) 64 65 d, err := db.Make(ctx, cfg.Server.DBPath) 66 if err != nil { 67 return nil, fmt.Errorf("failed to setup db: %w", err) 68 } 69 70 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 71 if err != nil { 72 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 73 } 74 e.E.EnableAutoSave(true) 75 76 n := notifier.New() 77 78 var vault secrets.Manager 79 switch cfg.Server.Secrets.Provider { 80 case "openbao": 81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 83 } 84 vault, err = secrets.NewOpenBaoManager( 85 cfg.Server.Secrets.OpenBao.ProxyAddr, 86 logger, 87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 88 ) 89 if err != nil { 90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 91 } 92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 93 case "sqlite", "": 94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 97 } 98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 99 default: 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 } 102 103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 105 106 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows) 107 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows) 108 109 collections := []string{ 110 tangled.SpindleMemberNSID, 111 } 112 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 113 if err != nil { 114 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 115 } 116 jc.AddDid(cfg.Server.Owner) 117 118 // Check if the spindle knows about any Dids; 119 dids, err := d.GetAllDids() 120 if err != nil { 121 return nil, fmt.Errorf("failed to get all dids: %w", err) 122 } 123 for _, d := range dids { 124 jc.AddDid(d) 125 } 126 127 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 128 129 spindle := &Spindle{ 130 jc: jc, 131 e: e, 132 db: d, 133 l: logger, 134 n: &n, 135 engs: engines, 136 jq: jq, 137 cfg: cfg, 138 res: resolver, 139 vault: vault, 140 motd: defaultMotd, 141 workflowSem: workflowSem, 142 rootCtx: ctx, 143 } 144 145 err = e.AddSpindle(rbacDomain) 146 if err != nil { 147 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 148 } 149 err = spindle.configureOwner() 150 if err != nil { 151 return nil, err 152 } 153 logger.Info("owner set", "did", cfg.Server.Owner) 154 155 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 156 if err != nil { 157 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 158 } 159 160 err = jc.StartJetstream(ctx, spindle.ingest()) 161 if err != nil { 162 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 163 } 164 165 // for each incoming sh.tangled.pipeline, we execute 166 // spindle.processPipeline, which in turn enqueues the pipeline 167 // job in the above registered queue. 168 ccfg := eventconsumer.NewConsumerConfig() 169 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 170 ccfg.Dev = cfg.Server.Dev 171 ccfg.ProcessFunc = spindle.processPipeline 172 ccfg.CursorStore = cursorStore 173 knownKnots, err := d.Knots() 174 if err != nil { 175 return nil, err 176 } 177 for _, knot := range knownKnots { 178 logger.Info("adding source start", "knot", knot) 179 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 180 } 181 spindle.ks = eventconsumer.NewConsumer(*ccfg) 182 183 if cfg.Server.Tap.Embed { 184 pw, err := randomAdminPassword() 185 if err != nil { 186 return nil, err 187 } 188 cfg.Server.Tap.AdminPassword = pw 189 logger.Info("embedded tap: using random admin password") 190 } 191 spindle.tap = NewTapClient(spindle) 192 193 return spindle, nil 194} 195 196// DB returns the database instance. 197func (s *Spindle) DB() *db.DB { 198 return s.db 199} 200 201// Queue returns the job queue instance. 202func (s *Spindle) Queue() *queue.Queue { 203 return s.jq 204} 205 206// Engines returns the map of available engines. 207func (s *Spindle) Engines() map[string]models.Engine { 208 return s.engs 209} 210 211// Vault returns the secrets manager instance. 212func (s *Spindle) Vault() secrets.Manager { 213 return s.vault 214} 215 216// Notifier returns the notifier instance. 217func (s *Spindle) Notifier() *notifier.Notifier { 218 return s.n 219} 220 221// Enforcer returns the RBAC enforcer instance. 222func (s *Spindle) Enforcer() *rbac.Enforcer { 223 return s.e 224} 225 226// SetMotdContent sets custom MOTD content, replacing the embedded default. 227func (s *Spindle) SetMotdContent(content []byte) { 228 s.motdMu.Lock() 229 defer s.motdMu.Unlock() 230 s.motd = content 231} 232 233// GetMotdContent returns the current MOTD content. 234func (s *Spindle) GetMotdContent() []byte { 235 s.motdMu.RLock() 236 defer s.motdMu.RUnlock() 237 return s.motd 238} 239 240// Start starts the Spindle server (blocking). 241func (s *Spindle) Start(ctx context.Context) error { 242 // starts a job queue runner in the background 243 s.jq.Start() 244 defer s.jq.Stop() 245 246 // Stop vault token renewal if it implements Stopper 247 if stopper, ok := s.vault.(secrets.Stopper); ok { 248 defer stopper.Stop() 249 } 250 251 if s.cfg.Server.Tap.Embed { 252 emb, err := startEmbeddedTap(ctx, s.cfg, log.SubLogger(s.l, "embedtap")) 253 if err != nil { 254 return fmt.Errorf("starting embedded tap: %w", err) 255 } 256 s.embedTap = emb 257 defer s.embedTap.Shutdown() 258 } 259 260 go func() { 261 s.l.Info("starting knot event consumer") 262 s.ks.Start(ctx) 263 }() 264 265 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 266 s.tap.Start(ctx) 267 268 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 269 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 270} 271 272func (s *Spindle) declareTapInterest(ctx context.Context) { 273 repos, err := s.db.AllRepos() 274 if err != nil { 275 s.l.Warn("tap declare: failed to load known repos", "err", err) 276 return 277 } 278 seen := make(map[syntax.DID]struct{}, len(repos)) 279 dids := make([]syntax.DID, 0, len(repos)) 280 for _, r := range repos { 281 if r.Owner == "" { 282 continue 283 } 284 if _, ok := seen[r.Owner]; ok { 285 continue 286 } 287 seen[r.Owner] = struct{}{} 288 dids = append(dids, r.Owner) 289 } 290 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil { 291 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err) 292 return 293 } 294 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids)) 295} 296 297func Run(ctx context.Context) error { 298 cfg, err := config.Load(ctx) 299 if err != nil { 300 return fmt.Errorf("failed to load config: %w", err) 301 } 302 303 nixeryEng, err := nixery.New(ctx, cfg) 304 if err != nil { 305 return err 306 } 307 308 s, err := New(ctx, cfg, map[string]models.Engine{ 309 "nixery": nixeryEng, 310 }) 311 if err != nil { 312 return err 313 } 314 315 return s.Start(ctx) 316} 317 318func (s *Spindle) Router() http.Handler { 319 mux := chi.NewRouter() 320 321 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 322 w.Write(s.GetMotdContent()) 323 }) 324 mux.HandleFunc("/events", s.Events) 325 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 326 327 mux.Mount("/xrpc", s.XrpcRouter()) 328 return mux 329} 330 331func (s *Spindle) XrpcRouter() http.Handler { 332 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 333 334 l := log.SubLogger(s.l, "xrpc") 335 336 x := xrpc.Xrpc{ 337 Logger: l, 338 Db: s.db, 339 Enforcer: s.e, 340 Engines: s.engs, 341 Config: s.cfg, 342 Resolver: s.res, 343 Vault: s.vault, 344 Notifier: s.Notifier(), 345 ServiceAuth: serviceAuth, 346 } 347 348 return x.Router() 349} 350 351func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 352 if msg.Nsid == tangled.PipelineNSID { 353 tpl := tangled.Pipeline{} 354 err := json.Unmarshal(msg.EventJson, &tpl) 355 if err != nil { 356 s.l.Error("failed to unmarshal pipeline event", "err", err) 357 return err 358 } 359 360 if tpl.TriggerMetadata == nil { 361 return fmt.Errorf("no trigger metadata found") 362 } 363 364 if tpl.TriggerMetadata.Repo == nil { 365 return fmt.Errorf("no repo data found") 366 } 367 368 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 369 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 370 } 371 372 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 373 if err != nil { 374 return err 375 } 376 377 pipelineId := models.PipelineId{ 378 Knot: src.Key(), 379 Rkey: msg.Rkey, 380 } 381 382 workflows := make(map[models.Engine][]models.Workflow) 383 384 // Build pipeline environment variables once for all workflows 385 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 386 387 for _, w := range tpl.Workflows { 388 if w != nil { 389 if _, ok := s.engs[w.Engine]; !ok { 390 err = s.db.StatusFailed(models.WorkflowId{ 391 PipelineId: pipelineId, 392 Name: w.Name, 393 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 394 if err != nil { 395 return fmt.Errorf("db.StatusFailed: %w", err) 396 } 397 398 continue 399 } 400 401 eng := s.engs[w.Engine] 402 403 if _, ok := workflows[eng]; !ok { 404 workflows[eng] = []models.Workflow{} 405 } 406 407 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 408 if err != nil { 409 err = s.db.StatusFailed(models.WorkflowId{ 410 PipelineId: pipelineId, 411 Name: w.Name, 412 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 413 if err != nil { 414 return fmt.Errorf("db.StatusFailed: %w", err) 415 } 416 417 continue 418 } 419 420 // inject TANGLED_* env vars after InitWorkflow 421 // This prevents user-defined env vars from overriding them 422 if ewf.Environment == nil { 423 ewf.Environment = make(map[string]string) 424 } 425 maps.Copy(ewf.Environment, pipelineEnv) 426 427 workflows[eng] = append(workflows[eng], *ewf) 428 429 err = s.db.StatusPending(models.WorkflowId{ 430 PipelineId: pipelineId, 431 Name: w.Name, 432 }, s.n) 433 if err != nil { 434 return fmt.Errorf("db.StatusPending: %w", err) 435 } 436 } 437 } 438 439 ok := s.jq.Enqueue(queue.Job{ 440 Run: func() error { 441 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{ 442 RepoDid: repoDid, 443 Workflows: workflows, 444 }, pipelineId) 445 return nil 446 }, 447 OnFail: func(jobError error) { 448 s.l.Error("pipeline run failed", "error", jobError) 449 }, 450 }) 451 if ok { 452 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 453 } else { 454 s.l.Error("failed to enqueue pipeline: queue is full") 455 } 456 } 457 458 return nil 459} 460 461func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) { 462 if repo.RepoDid == nil || *repo.RepoDid == "" { 463 return "", fmt.Errorf("pipeline trigger missing repoDid") 464 } 465 repoDid, err := syntax.ParseDID(*repo.RepoDid) 466 if err != nil { 467 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err) 468 } 469 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 470 s.l.Warn("accepting knot pipeline assertion for unknown repoDid", "repoDid", repoDid, "err", err) 471 } 472 return repoDid, nil 473} 474 475func (s *Spindle) configureOwner() error { 476 cfgOwner := s.cfg.Server.Owner 477 478 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 479 if err != nil { 480 return err 481 } 482 483 switch len(existing) { 484 case 0: 485 // no owner configured, continue 486 case 1: 487 // find existing owner 488 existingOwner := existing[0] 489 490 // no ownership change, this is okay 491 if existingOwner == s.cfg.Server.Owner { 492 break 493 } 494 495 // remove existing owner 496 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 497 if err != nil { 498 return nil 499 } 500 default: 501 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 502 } 503 504 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 505}