Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 "tangled.org/core/xrpc/serviceauth" 31) 32 33//go:embed motd 34var defaultMotd []byte 35 36const ( 37 rbacDomain = "thisserver" 38) 39 40type Spindle struct { 41 jc *jetstream.JetstreamClient 42 db *db.DB 43 e *rbac.Enforcer 44 l *slog.Logger 45 n *notifier.Notifier 46 engs map[string]models.Engine 47 jq *queue.Queue 48 cfg *config.Config 49 ks *eventconsumer.Consumer 50 res *idresolver.Resolver 51 vault secrets.Manager 52 motd []byte 53 motdMu sync.RWMutex 54 workflowSem chan struct{} 55} 56 57// New creates a new Spindle server with the provided configuration and engines. 58func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 59 logger := log.FromContext(ctx) 60 61 d, err := db.Make(cfg.Server.DBPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to setup db: %w", err) 64 } 65 66 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 69 } 70 e.E.EnableAutoSave(true) 71 72 n := notifier.New() 73 74 var vault secrets.Manager 75 switch cfg.Server.Secrets.Provider { 76 case "openbao": 77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 78 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 79 } 80 vault, err = secrets.NewOpenBaoManager( 81 cfg.Server.Secrets.OpenBao.ProxyAddr, 82 logger, 83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 84 ) 85 if err != nil { 86 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 87 } 88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 89 case "sqlite", "": 90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 91 if err != nil { 92 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 93 } 94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 95 default: 96 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 97 } 98 99 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 100 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 101 102 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows) 103 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows) 104 105 collections := []string{ 106 tangled.SpindleMemberNSID, 107 tangled.RepoNSID, 108 tangled.RepoCollaboratorNSID, 109 } 110 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 111 if err != nil { 112 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 113 } 114 jc.AddDid(cfg.Server.Owner) 115 116 // Check if the spindle knows about any Dids; 117 dids, err := d.GetAllDids() 118 if err != nil { 119 return nil, fmt.Errorf("failed to get all dids: %w", err) 120 } 121 for _, d := range dids { 122 jc.AddDid(d) 123 } 124 125 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 126 127 spindle := &Spindle{ 128 jc: jc, 129 e: e, 130 db: d, 131 l: logger, 132 n: &n, 133 engs: engines, 134 jq: jq, 135 cfg: cfg, 136 res: resolver, 137 vault: vault, 138 motd: defaultMotd, 139 workflowSem: workflowSem, 140 } 141 142 err = e.AddSpindle(rbacDomain) 143 if err != nil { 144 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 145 } 146 err = spindle.configureOwner() 147 if err != nil { 148 return nil, err 149 } 150 logger.Info("owner set", "did", cfg.Server.Owner) 151 152 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 153 if err != nil { 154 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 155 } 156 157 err = jc.StartJetstream(ctx, spindle.ingest()) 158 if err != nil { 159 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 160 } 161 162 // for each incoming sh.tangled.pipeline, we execute 163 // spindle.processPipeline, which in turn enqueues the pipeline 164 // job in the above registered queue. 165 ccfg := eventconsumer.NewConsumerConfig() 166 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 167 ccfg.Dev = cfg.Server.Dev 168 ccfg.ProcessFunc = spindle.processPipeline 169 ccfg.CursorStore = cursorStore 170 knownKnots, err := d.Knots() 171 if err != nil { 172 return nil, err 173 } 174 for _, knot := range knownKnots { 175 logger.Info("adding source start", "knot", knot) 176 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 177 } 178 spindle.ks = eventconsumer.NewConsumer(*ccfg) 179 180 return spindle, nil 181} 182 183// DB returns the database instance. 184func (s *Spindle) DB() *db.DB { 185 return s.db 186} 187 188// Queue returns the job queue instance. 189func (s *Spindle) Queue() *queue.Queue { 190 return s.jq 191} 192 193// Engines returns the map of available engines. 194func (s *Spindle) Engines() map[string]models.Engine { 195 return s.engs 196} 197 198// Vault returns the secrets manager instance. 199func (s *Spindle) Vault() secrets.Manager { 200 return s.vault 201} 202 203// Notifier returns the notifier instance. 204func (s *Spindle) Notifier() *notifier.Notifier { 205 return s.n 206} 207 208// Enforcer returns the RBAC enforcer instance. 209func (s *Spindle) Enforcer() *rbac.Enforcer { 210 return s.e 211} 212 213// SetMotdContent sets custom MOTD content, replacing the embedded default. 214func (s *Spindle) SetMotdContent(content []byte) { 215 s.motdMu.Lock() 216 defer s.motdMu.Unlock() 217 s.motd = content 218} 219 220// GetMotdContent returns the current MOTD content. 221func (s *Spindle) GetMotdContent() []byte { 222 s.motdMu.RLock() 223 defer s.motdMu.RUnlock() 224 return s.motd 225} 226 227// Start starts the Spindle server (blocking). 228func (s *Spindle) Start(ctx context.Context) error { 229 // starts a job queue runner in the background 230 s.jq.Start() 231 defer s.jq.Stop() 232 233 // Stop vault token renewal if it implements Stopper 234 if stopper, ok := s.vault.(secrets.Stopper); ok { 235 defer stopper.Stop() 236 } 237 238 go func() { 239 s.l.Info("starting knot event consumer") 240 s.ks.Start(ctx) 241 }() 242 243 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 244 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 245} 246 247func Run(ctx context.Context) error { 248 cfg, err := config.Load(ctx) 249 if err != nil { 250 return fmt.Errorf("failed to load config: %w", err) 251 } 252 253 nixeryEng, err := nixery.New(ctx, cfg) 254 if err != nil { 255 return err 256 } 257 258 s, err := New(ctx, cfg, map[string]models.Engine{ 259 "nixery": nixeryEng, 260 }) 261 if err != nil { 262 return err 263 } 264 265 return s.Start(ctx) 266} 267 268func (s *Spindle) Router() http.Handler { 269 mux := chi.NewRouter() 270 271 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 272 w.Write(s.GetMotdContent()) 273 }) 274 mux.HandleFunc("/events", s.Events) 275 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 276 277 mux.Mount("/xrpc", s.XrpcRouter()) 278 return mux 279} 280 281func (s *Spindle) XrpcRouter() http.Handler { 282 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 283 284 l := log.SubLogger(s.l, "xrpc") 285 286 x := xrpc.Xrpc{ 287 Logger: l, 288 Db: s.db, 289 Enforcer: s.e, 290 Engines: s.engs, 291 Config: s.cfg, 292 Resolver: s.res, 293 Vault: s.vault, 294 Notifier: s.Notifier(), 295 ServiceAuth: serviceAuth, 296 } 297 298 return x.Router() 299} 300 301func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 302 if msg.Nsid == tangled.PipelineNSID { 303 tpl := tangled.Pipeline{} 304 err := json.Unmarshal(msg.EventJson, &tpl) 305 if err != nil { 306 s.l.Error("failed to unmarshal pipeline event", "err", err) 307 return err 308 } 309 310 if tpl.TriggerMetadata == nil { 311 return fmt.Errorf("no trigger metadata found") 312 } 313 314 if tpl.TriggerMetadata.Repo == nil { 315 return fmt.Errorf("no repo data found") 316 } 317 318 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 319 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 320 } 321 322 // filter by repos 323 repoName := "" 324 if tpl.TriggerMetadata.Repo.Repo != nil { 325 repoName = *tpl.TriggerMetadata.Repo.Repo 326 } 327 328 _, err = s.db.GetRepo( 329 tpl.TriggerMetadata.Repo.Knot, 330 tpl.TriggerMetadata.Repo.Did, 331 repoName, 332 ) 333 if err != nil { 334 return fmt.Errorf("failed to get repo: %w", err) 335 } 336 337 pipelineId := models.PipelineId{ 338 Knot: src.Key(), 339 Rkey: msg.Rkey, 340 } 341 342 workflows := make(map[models.Engine][]models.Workflow) 343 344 // Build pipeline environment variables once for all workflows 345 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 346 347 for _, w := range tpl.Workflows { 348 if w != nil { 349 if _, ok := s.engs[w.Engine]; !ok { 350 err = s.db.StatusFailed(models.WorkflowId{ 351 PipelineId: pipelineId, 352 Name: w.Name, 353 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 354 if err != nil { 355 return fmt.Errorf("db.StatusFailed: %w", err) 356 } 357 358 continue 359 } 360 361 eng := s.engs[w.Engine] 362 363 if _, ok := workflows[eng]; !ok { 364 workflows[eng] = []models.Workflow{} 365 } 366 367 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 368 if err != nil { 369 err = s.db.StatusFailed(models.WorkflowId{ 370 PipelineId: pipelineId, 371 Name: w.Name, 372 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 373 if err != nil { 374 return fmt.Errorf("db.StatusFailed: %w", err) 375 } 376 377 continue 378 } 379 380 // inject TANGLED_* env vars after InitWorkflow 381 // This prevents user-defined env vars from overriding them 382 if ewf.Environment == nil { 383 ewf.Environment = make(map[string]string) 384 } 385 maps.Copy(ewf.Environment, pipelineEnv) 386 387 workflows[eng] = append(workflows[eng], *ewf) 388 389 err = s.db.StatusPending(models.WorkflowId{ 390 PipelineId: pipelineId, 391 Name: w.Name, 392 }, s.n) 393 if err != nil { 394 return fmt.Errorf("db.StatusPending: %w", err) 395 } 396 } 397 } 398 399 ok := s.jq.Enqueue(queue.Job{ 400 Run: func() error { 401 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{ 402 RepoOwner: tpl.TriggerMetadata.Repo.Did, 403 RepoName: repoName, 404 Workflows: workflows, 405 }, pipelineId) 406 return nil 407 }, 408 OnFail: func(jobError error) { 409 s.l.Error("pipeline run failed", "error", jobError) 410 }, 411 }) 412 if ok { 413 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 414 } else { 415 s.l.Error("failed to enqueue pipeline: queue is full") 416 } 417 } 418 419 return nil 420} 421 422func (s *Spindle) configureOwner() error { 423 cfgOwner := s.cfg.Server.Owner 424 425 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 426 if err != nil { 427 return err 428 } 429 430 switch len(existing) { 431 case 0: 432 // no owner configured, continue 433 case 1: 434 // find existing owner 435 existingOwner := existing[0] 436 437 // no ownership change, this is okay 438 if existingOwner == s.cfg.Server.Owner { 439 break 440 } 441 442 // remove existing owner 443 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 444 if err != nil { 445 return nil 446 } 447 default: 448 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 449 } 450 451 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 452}