Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/eventstream"
19 "tangled.org/core/idresolver"
20 "tangled.org/core/jetstream"
21 "tangled.org/core/log"
22 "tangled.org/core/notifier"
23 "tangled.org/core/rbac"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/db"
26 "tangled.org/core/spindle/engine"
27 "tangled.org/core/spindle/engines/nixery"
28 "tangled.org/core/spindle/models"
29 "tangled.org/core/spindle/queue"
30 "tangled.org/core/spindle/secrets"
31 "tangled.org/core/spindle/xrpc"
32 "tangled.org/core/xrpc/serviceauth"
33)
34
35//go:embed motd
36var defaultMotd []byte
37
38const (
39 rbacDomain = "thisserver"
40)
41
42type Spindle struct {
43 jc *jetstream.JetstreamClient
44 tap *Tap
45 embedTap *embeddedTap
46 db *db.DB
47 e *rbac.Enforcer
48 l *slog.Logger
49 n *notifier.Notifier
50 engs map[string]models.Engine
51 jq *queue.Queue
52 cfg *config.Config
53 ks *eventconsumer.Consumer
54 res *idresolver.Resolver
55 vault secrets.Manager
56 motd []byte
57 motdMu sync.RWMutex
58 workflowSem chan struct{}
59 rootCtx context.Context
60}
61
62// New creates a new Spindle server with the provided configuration and engines.
63func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
64 logger := log.FromContext(ctx)
65
66 d, err := db.Make(ctx, cfg.Server.DBPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to setup db: %w", err)
69 }
70
71 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
74 }
75 e.E.EnableAutoSave(true)
76
77 n := notifier.New()
78
79 var vault secrets.Manager
80 switch cfg.Server.Secrets.Provider {
81 case "openbao":
82 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
83 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
84 }
85 vault, err = secrets.NewOpenBaoManager(
86 cfg.Server.Secrets.OpenBao.ProxyAddr,
87 logger,
88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
89 )
90 if err != nil {
91 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
92 }
93 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
94 case "sqlite", "":
95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
96 if err != nil {
97 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
98 }
99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
100 default:
101 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
102 }
103
104 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil {
105 return nil, fmt.Errorf("failed to run startup migrations: %w", err)
106 }
107
108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
110
111 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows)
112 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows)
113
114 collections := []string{
115 tangled.SpindleMemberNSID,
116 tangled.RepoNSID,
117 tangled.RepoCollaboratorNSID,
118 }
119 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
120 if err != nil {
121 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
122 }
123 jc.AddDid(cfg.Server.Owner)
124
125 // Check if the spindle knows about any Dids;
126 dids, err := d.GetAllDids()
127 if err != nil {
128 return nil, fmt.Errorf("failed to get all dids: %w", err)
129 }
130 for _, d := range dids {
131 jc.AddDid(d)
132 }
133
134 knownRepos, err := d.AllRepos()
135 if err != nil {
136 return nil, fmt.Errorf("failed to get known repos: %w", err)
137 }
138 for _, r := range knownRepos {
139 if r.Owner != "" {
140 jc.AddDid(r.Owner.String())
141 }
142 }
143
144 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
145
146 spindle := &Spindle{
147 jc: jc,
148 e: e,
149 db: d,
150 l: logger,
151 n: &n,
152 engs: engines,
153 jq: jq,
154 cfg: cfg,
155 res: resolver,
156 vault: vault,
157 motd: defaultMotd,
158 workflowSem: workflowSem,
159 rootCtx: ctx,
160 }
161
162 err = e.AddSpindle(rbacDomain)
163 if err != nil {
164 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
165 }
166 err = spindle.configureOwner()
167 if err != nil {
168 return nil, err
169 }
170 logger.Info("owner set", "did", cfg.Server.Owner)
171
172 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
173 if err != nil {
174 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
175 }
176
177 err = jc.StartJetstream(ctx, spindle.ingest())
178 if err != nil {
179 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
180 }
181
182 // for each incoming sh.tangled.pipeline, we execute
183 // spindle.processPipeline, which in turn enqueues the pipeline
184 // job in the above registered queue.
185 ccfg := eventconsumer.NewConsumerConfig()
186 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
187 ccfg.URLFunc = eventconsumer.DefaultURL(cfg.Server.Dev)
188 ccfg.ProcessFunc = spindle.processPipeline
189 ccfg.CursorStore = cursorStore
190 knownKnots, err := d.Knots()
191 if err != nil {
192 return nil, err
193 }
194 for _, knot := range knownKnots {
195 logger.Info("adding source start", "knot", knot)
196 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
197 }
198 spindle.ks = eventconsumer.NewConsumer(*ccfg)
199
200 if cfg.Server.Tap.Embed {
201 pw, err := randomAdminPassword()
202 if err != nil {
203 return nil, err
204 }
205 cfg.Server.Tap.AdminPassword = pw
206 logger.Info("embedded tap: using random admin password")
207 }
208 spindle.tap = NewTapClient(spindle)
209
210 return spindle, nil
211}
212
213// DB returns the database instance.
214func (s *Spindle) DB() *db.DB {
215 return s.db
216}
217
218// Queue returns the job queue instance.
219func (s *Spindle) Queue() *queue.Queue {
220 return s.jq
221}
222
223// Engines returns the map of available engines.
224func (s *Spindle) Engines() map[string]models.Engine {
225 return s.engs
226}
227
228// Vault returns the secrets manager instance.
229func (s *Spindle) Vault() secrets.Manager {
230 return s.vault
231}
232
233// Notifier returns the notifier instance.
234func (s *Spindle) Notifier() *notifier.Notifier {
235 return s.n
236}
237
238// Enforcer returns the RBAC enforcer instance.
239func (s *Spindle) Enforcer() *rbac.Enforcer {
240 return s.e
241}
242
243// SetMotdContent sets custom MOTD content, replacing the embedded default.
244func (s *Spindle) SetMotdContent(content []byte) {
245 s.motdMu.Lock()
246 defer s.motdMu.Unlock()
247 s.motd = content
248}
249
250// GetMotdContent returns the current MOTD content.
251func (s *Spindle) GetMotdContent() []byte {
252 s.motdMu.RLock()
253 defer s.motdMu.RUnlock()
254 return s.motd
255}
256
257// Start starts the Spindle server (blocking).
258func (s *Spindle) Start(ctx context.Context) error {
259 // starts a job queue runner in the background
260 s.jq.Start()
261 defer s.jq.Stop()
262
263 // Stop vault token renewal if it implements Stopper
264 if stopper, ok := s.vault.(secrets.Stopper); ok {
265 defer stopper.Stop()
266 }
267
268 tapCtx, tapCancel := context.WithCancel(ctx)
269
270 if s.cfg.Server.Tap.Embed {
271 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap"))
272 if err != nil {
273 tapCancel()
274 return fmt.Errorf("starting embedded tap: %w", err)
275 }
276 s.embedTap = emb
277 defer func() {
278 tapCancel()
279 s.embedTap.Shutdown()
280 }()
281
282 go s.watchTapDrain(tapCtx, tapCancel)
283 } else {
284 defer tapCancel()
285 }
286
287 go func() {
288 s.l.Info("starting knot event consumer")
289 s.ks.Start(ctx)
290 }()
291
292 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
293 s.tap.Start(tapCtx)
294
295 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
296 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
297}
298
299func (s *Spindle) declareTapInterest(ctx context.Context) {
300 repos, err := s.db.AllRepos()
301 if err != nil {
302 s.l.Warn("tap declare: failed to load known repos", "err", err)
303 return
304 }
305 seen := make(map[syntax.DID]struct{}, len(repos))
306 dids := make([]syntax.DID, 0, len(repos))
307 for _, r := range repos {
308 if r.Owner == "" {
309 continue
310 }
311 if _, ok := seen[r.Owner]; ok {
312 continue
313 }
314 seen[r.Owner] = struct{}{}
315 dids = append(dids, r.Owner)
316 }
317 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
318 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
319 return
320 }
321 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
322}
323
324func Run(ctx context.Context) error {
325 cfg, err := config.Load(ctx)
326 if err != nil {
327 return fmt.Errorf("failed to load config: %w", err)
328 }
329
330 nixeryEng, err := nixery.New(ctx, cfg)
331 if err != nil {
332 return err
333 }
334
335 s, err := New(ctx, cfg, map[string]models.Engine{
336 "nixery": nixeryEng,
337 })
338 if err != nil {
339 return err
340 }
341
342 return s.Start(ctx)
343}
344
345func (s *Spindle) Router() http.Handler {
346 mux := chi.NewRouter()
347
348 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
349 w.Write(s.GetMotdContent())
350 })
351 mux.HandleFunc("/events", s.Events)
352 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
353
354 mux.Mount("/xrpc", s.XrpcRouter())
355 return mux
356}
357
358func (s *Spindle) XrpcRouter() http.Handler {
359 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
360
361 l := log.SubLogger(s.l, "xrpc")
362
363 x := xrpc.Xrpc{
364 Logger: l,
365 Db: s.db,
366 Enforcer: s.e,
367 Engines: s.engs,
368 Config: s.cfg,
369 Resolver: s.res,
370 Vault: s.vault,
371 Notifier: s.Notifier(),
372 ServiceAuth: serviceAuth,
373 }
374
375 return x.Router()
376}
377
378func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error {
379 if msg.Nsid == tangled.PipelineNSID {
380 tpl := tangled.Pipeline{}
381 err := json.Unmarshal(msg.EventJson, &tpl)
382 if err != nil {
383 s.l.Error("failed to unmarshal pipeline event", "err", err)
384 return err
385 }
386
387 if tpl.TriggerMetadata == nil {
388 return fmt.Errorf("no trigger metadata found")
389 }
390
391 if tpl.TriggerMetadata.Repo == nil {
392 return fmt.Errorf("no repo data found")
393 }
394
395 if src.Host != tpl.TriggerMetadata.Repo.Knot {
396 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot)
397 }
398
399 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
400 if err != nil {
401 return err
402 }
403
404 pipelineId := models.PipelineId{
405 Knot: src.Host,
406 Rkey: msg.Rkey,
407 }
408
409 workflows := make(map[models.Engine][]models.Workflow)
410
411 // Build pipeline environment variables once for all workflows
412 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
413
414 for _, w := range tpl.Workflows {
415 if w != nil {
416 if _, ok := s.engs[w.Engine]; !ok {
417 err = s.db.StatusFailed(models.WorkflowId{
418 PipelineId: pipelineId,
419 Name: w.Name,
420 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
421 if err != nil {
422 return fmt.Errorf("db.StatusFailed: %w", err)
423 }
424
425 continue
426 }
427
428 eng := s.engs[w.Engine]
429
430 if _, ok := workflows[eng]; !ok {
431 workflows[eng] = []models.Workflow{}
432 }
433
434 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
435 if err != nil {
436 err = s.db.StatusFailed(models.WorkflowId{
437 PipelineId: pipelineId,
438 Name: w.Name,
439 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
440 if err != nil {
441 return fmt.Errorf("db.StatusFailed: %w", err)
442 }
443
444 continue
445 }
446
447 // inject TANGLED_* env vars after InitWorkflow
448 // This prevents user-defined env vars from overriding them
449 if ewf.Environment == nil {
450 ewf.Environment = make(map[string]string)
451 }
452 maps.Copy(ewf.Environment, pipelineEnv)
453
454 workflows[eng] = append(workflows[eng], *ewf)
455
456 err = s.db.StatusPending(models.WorkflowId{
457 PipelineId: pipelineId,
458 Name: w.Name,
459 }, s.n)
460 if err != nil {
461 return fmt.Errorf("db.StatusPending: %w", err)
462 }
463 }
464 }
465
466 ok := s.jq.Enqueue(queue.Job{
467 Run: func() error {
468 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{
469 RepoDid: repoDid,
470 Workflows: workflows,
471 }, pipelineId)
472 return nil
473 },
474 OnFail: func(jobError error) {
475 s.l.Error("pipeline run failed", "error", jobError)
476 },
477 })
478 if ok {
479 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
480 } else {
481 s.l.Error("failed to enqueue pipeline: queue is full")
482 }
483 }
484
485 return nil
486}
487
488func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
489 if repo.RepoDid == nil || *repo.RepoDid == "" {
490 return "", fmt.Errorf("pipeline trigger missing repoDid")
491 }
492 repoDid, err := syntax.ParseDID(*repo.RepoDid)
493 if err != nil {
494 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
495 }
496 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
497 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err)
498 }
499 return repoDid, nil
500}
501
502func (s *Spindle) configureOwner() error {
503 cfgOwner := s.cfg.Server.Owner
504
505 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
506 if err != nil {
507 return err
508 }
509
510 switch len(existing) {
511 case 0:
512 // no owner configured, continue
513 case 1:
514 // find existing owner
515 existingOwner := existing[0]
516
517 // no ownership change, this is okay
518 if existingOwner == s.cfg.Server.Owner {
519 break
520 }
521
522 // remove existing owner
523 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
524 if err != nil {
525 return nil
526 }
527 default:
528 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
529 }
530
531 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
532}