Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/go-chi/chi/v5"
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/eventconsumer/cursor"
19 "tangled.org/core/eventstream"
20 "tangled.org/core/idresolver"
21 "tangled.org/core/jetstream"
22 "tangled.org/core/log"
23 "tangled.org/core/notifier"
24 "tangled.org/core/rbac"
25 "tangled.org/core/spindle/config"
26 "tangled.org/core/spindle/db"
27 "tangled.org/core/spindle/engine"
28 "tangled.org/core/spindle/engines/dummy"
29 "tangled.org/core/spindle/engines/microvm"
30 "tangled.org/core/spindle/engines/nixery"
31 "tangled.org/core/spindle/models"
32 "tangled.org/core/spindle/queue"
33 "tangled.org/core/spindle/secrets"
34 "tangled.org/core/spindle/xrpc"
35 "tangled.org/core/xrpc/serviceauth"
36)
37
38//go:embed motd
39var defaultMotd []byte
40
41const (
42 rbacDomain = "thisserver"
43)
44
45type Spindle struct {
46 jc *jetstream.JetstreamClient
47 tap *Tap
48 embedTap *embeddedTap
49 db *db.DB
50 e *rbac.Enforcer
51 l *slog.Logger
52 n *notifier.Notifier
53 engs map[string]models.Engine
54 jq *queue.Queue
55 cfg *config.Config
56 ks *eventconsumer.Consumer
57 res *idresolver.Resolver
58 vault secrets.Manager
59 motd []byte
60 motdMu sync.RWMutex
61 rootCtx context.Context
62}
63
64// New creates a new Spindle server with the provided configuration and engines.
65func New(ctx context.Context, cfg *config.Config, d *db.DB, engines map[string]models.Engine) (*Spindle, error) {
66 logger := log.FromContext(ctx)
67
68 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
71 }
72 e.E.EnableAutoSave(true)
73
74 n := notifier.New()
75
76 var vault secrets.Manager
77 switch cfg.Server.Secrets.Provider {
78 case "openbao":
79 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
80 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
81 }
82 vault, err = secrets.NewOpenBaoManager(
83 cfg.Server.Secrets.OpenBao.ProxyAddr,
84 logger,
85 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
86 )
87 if err != nil {
88 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
89 }
90 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
91 case "sqlite", "":
92 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
93 if err != nil {
94 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
95 }
96 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
97 default:
98 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
99 }
100
101 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil {
102 return nil, fmt.Errorf("failed to run startup migrations: %w", err)
103 }
104
105 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
106 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
107
108 collections := []string{
109 tangled.SpindleMemberNSID,
110 tangled.RepoNSID,
111 tangled.RepoCollaboratorNSID,
112 }
113 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
114 if err != nil {
115 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
116 }
117 jc.AddDid(cfg.Server.Owner)
118
119 // Check if the spindle knows about any Dids;
120 dids, err := d.GetAllDids()
121 if err != nil {
122 return nil, fmt.Errorf("failed to get all dids: %w", err)
123 }
124 for _, d := range dids {
125 jc.AddDid(d)
126 }
127
128 knownRepos, err := d.AllRepos()
129 if err != nil {
130 return nil, fmt.Errorf("failed to get known repos: %w", err)
131 }
132 for _, r := range knownRepos {
133 if r.Owner != "" {
134 jc.AddDid(r.Owner.String())
135 }
136 }
137
138 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
139
140 spindle := &Spindle{
141 jc: jc,
142 e: e,
143 db: d,
144 l: logger,
145 n: &n,
146 engs: engines,
147 jq: jq,
148 cfg: cfg,
149 res: resolver,
150 vault: vault,
151 motd: defaultMotd,
152 rootCtx: ctx,
153 }
154
155 err = e.AddSpindle(rbacDomain)
156 if err != nil {
157 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
158 }
159 err = spindle.configureOwner()
160 if err != nil {
161 return nil, err
162 }
163 logger.Info("owner set", "did", cfg.Server.Owner)
164
165 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
166 if err != nil {
167 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
168 }
169
170 err = jc.StartJetstream(ctx, spindle.ingest())
171 if err != nil {
172 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
173 }
174
175 // for each incoming sh.tangled.pipeline, we execute
176 // spindle.processPipeline, which in turn enqueues the pipeline
177 // job in the above registered queue.
178 ccfg := eventconsumer.NewConsumerConfig()
179 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
180 ccfg.ProcessFunc = spindle.processPipeline
181 ccfg.CursorStore = cursorStore
182 if cfg.Server.Dev {
183 ccfg.RetryInterval = 5 * time.Second
184 ccfg.MaxRetryInterval = 10 * time.Second
185 } else {
186 ccfg.RetryInterval = 1 * time.Minute
187 ccfg.MaxRetryInterval = 10 * time.Minute
188 }
189 knownKnots, err := d.Knots()
190 if err != nil {
191 return nil, err
192 }
193 for _, knot := range knownKnots {
194 logger.Info("adding source start", "knot", knot)
195 src := eventconsumer.NewKnotSource(knot)
196 eventconsumer.MigrateLegacyCursor(cursorStore, src)
197 ccfg.Sources[src] = struct{}{}
198 }
199 spindle.ks = eventconsumer.NewConsumer(*ccfg)
200
201 if cfg.Server.Tap.Embed {
202 pw, err := randomAdminPassword()
203 if err != nil {
204 return nil, err
205 }
206 cfg.Server.Tap.AdminPassword = pw
207 logger.Info("embedded tap: using random admin password")
208 }
209 spindle.tap = NewTapClient(spindle)
210
211 return spindle, nil
212}
213
214// DB returns the database instance.
215func (s *Spindle) DB() *db.DB {
216 return s.db
217}
218
219// Queue returns the job queue instance.
220func (s *Spindle) Queue() *queue.Queue {
221 return s.jq
222}
223
224// Engines returns the map of available engines.
225func (s *Spindle) Engines() map[string]models.Engine {
226 return s.engs
227}
228
229// Vault returns the secrets manager instance.
230func (s *Spindle) Vault() secrets.Manager {
231 return s.vault
232}
233
234// Notifier returns the notifier instance.
235func (s *Spindle) Notifier() *notifier.Notifier {
236 return s.n
237}
238
239// Enforcer returns the RBAC enforcer instance.
240func (s *Spindle) Enforcer() *rbac.Enforcer {
241 return s.e
242}
243
244// SetMotdContent sets custom MOTD content, replacing the embedded default.
245func (s *Spindle) SetMotdContent(content []byte) {
246 s.motdMu.Lock()
247 defer s.motdMu.Unlock()
248 s.motd = content
249}
250
251// GetMotdContent returns the current MOTD content.
252func (s *Spindle) GetMotdContent() []byte {
253 s.motdMu.RLock()
254 defer s.motdMu.RUnlock()
255 return s.motd
256}
257
258// Start starts the Spindle server (blocking).
259func (s *Spindle) Start(ctx context.Context) error {
260 // starts a job queue runner in the background
261 s.jq.Start()
262 defer s.jq.Stop()
263
264 // Stop vault token renewal if it implements Stopper
265 if stopper, ok := s.vault.(secrets.Stopper); ok {
266 defer stopper.Stop()
267 }
268
269 tapCtx, tapCancel := context.WithCancel(ctx)
270
271 if s.cfg.Server.Tap.Embed {
272 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap"))
273 if err != nil {
274 tapCancel()
275 return fmt.Errorf("starting embedded tap: %w", err)
276 }
277 s.embedTap = emb
278 defer func() {
279 tapCancel()
280 s.embedTap.Shutdown()
281 }()
282
283 go s.watchTapDrain(tapCtx, tapCancel)
284 } else {
285 defer tapCancel()
286 }
287
288 go func() {
289 s.l.Info("starting knot event consumer")
290 s.ks.Start(ctx)
291 }()
292
293 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
294 s.tap.Start(tapCtx)
295
296 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
297 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
298}
299
300func (s *Spindle) declareTapInterest(ctx context.Context) {
301 repos, err := s.db.AllRepos()
302 if err != nil {
303 s.l.Warn("tap declare: failed to load known repos", "err", err)
304 return
305 }
306 seen := make(map[syntax.DID]struct{}, len(repos))
307 dids := make([]syntax.DID, 0, len(repos))
308 for _, r := range repos {
309 if r.Owner == "" {
310 continue
311 }
312 if _, ok := seen[r.Owner]; ok {
313 continue
314 }
315 seen[r.Owner] = struct{}{}
316 dids = append(dids, r.Owner)
317 }
318 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
319 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
320 return
321 }
322 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
323}
324
325func Run(ctx context.Context) error {
326 cfg, err := config.Load(ctx)
327 if err != nil {
328 return fmt.Errorf("failed to load config: %w", err)
329 }
330
331 d, err := db.Make(ctx, cfg.Server.DBPath)
332 if err != nil {
333 return fmt.Errorf("failed to setup db: %w", err)
334 }
335
336 nixeryEng, err := nixery.New(ctx, cfg)
337 if err != nil {
338 return err
339 }
340
341 microvmEng, err := microvm.New(ctx, cfg, d)
342 if err != nil {
343 return err
344 }
345
346 s, err := New(ctx, cfg, d, map[string]models.Engine{
347 "nixery": nixeryEng,
348 "microvm": microvmEng,
349 "dummy": dummy.New(log.FromContext(ctx)),
350 })
351 if err != nil {
352 return err
353 }
354
355 return s.Start(ctx)
356}
357
358func (s *Spindle) Router() http.Handler {
359 mux := chi.NewRouter()
360
361 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
362 w.Write(s.GetMotdContent())
363 })
364 mux.HandleFunc("/events", s.Events)
365 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
366
367 mux.Mount("/xrpc", s.XrpcRouter())
368 return mux
369}
370
371func (s *Spindle) XrpcRouter() http.Handler {
372 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res.Directory(), s.cfg.Server.Did().String())
373
374 l := log.SubLogger(s.l, "xrpc")
375
376 x := xrpc.Xrpc{
377 Logger: l,
378 Db: s.db,
379 Enforcer: s.e,
380 Engines: s.engs,
381 Config: s.cfg,
382 Resolver: s.res,
383 Vault: s.vault,
384 Notifier: s.Notifier(),
385 ServiceAuth: serviceAuth,
386 }
387
388 return x.Router()
389}
390
391func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error {
392 if msg.Nsid == tangled.PipelineNSID {
393 tpl := tangled.Pipeline{}
394 err := json.Unmarshal(msg.EventJson, &tpl)
395 if err != nil {
396 s.l.Error("failed to unmarshal pipeline event", "err", err)
397 return err
398 }
399
400 if tpl.TriggerMetadata == nil {
401 return fmt.Errorf("no trigger metadata found")
402 }
403
404 if tpl.TriggerMetadata.Repo == nil {
405 return fmt.Errorf("no repo data found")
406 }
407
408 if src.Host != tpl.TriggerMetadata.Repo.Knot {
409 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot)
410 }
411
412 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
413 if err != nil {
414 return err
415 }
416
417 pipelineId := models.PipelineId{
418 Knot: src.Host,
419 Rkey: msg.Rkey,
420 }
421
422 workflows := make(map[models.Engine][]models.Workflow)
423
424 // Build pipeline environment variables once for all workflows
425 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId)
426
427 for _, w := range tpl.Workflows {
428 if w != nil {
429 if _, ok := s.engs[w.Engine]; !ok {
430 err = s.db.StatusFailed(models.WorkflowId{
431 PipelineId: pipelineId,
432 Name: w.Name,
433 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
434 if err != nil {
435 return fmt.Errorf("db.StatusFailed: %w", err)
436 }
437
438 continue
439 }
440
441 eng := s.engs[w.Engine]
442
443 if _, ok := workflows[eng]; !ok {
444 workflows[eng] = []models.Workflow{}
445 }
446
447 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
448 if err != nil {
449 err = s.db.StatusFailed(models.WorkflowId{
450 PipelineId: pipelineId,
451 Name: w.Name,
452 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
453 if err != nil {
454 return fmt.Errorf("db.StatusFailed: %w", err)
455 }
456
457 continue
458 }
459
460 // inject TANGLED_* env vars after InitWorkflow
461 // This prevents user-defined env vars from overriding them
462 if ewf.Environment == nil {
463 ewf.Environment = make(map[string]string)
464 }
465 maps.Copy(ewf.Environment, pipelineEnv)
466
467 workflows[eng] = append(workflows[eng], *ewf)
468
469 err = s.db.StatusPending(models.WorkflowId{
470 PipelineId: pipelineId,
471 Name: w.Name,
472 }, s.n)
473 if err != nil {
474 return fmt.Errorf("db.StatusPending: %w", err)
475 }
476 }
477 }
478
479 ok := s.jq.Enqueue(repoDid, queue.Job{
480 Run: func() error {
481 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
482 RepoDid: repoDid,
483 Workflows: workflows,
484 }, pipelineId)
485 return nil
486 },
487 OnFail: func(jobError error) {
488 s.l.Error("pipeline run failed", "error", jobError)
489 },
490 })
491 if ok {
492 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
493 } else {
494 s.l.Error("failed to enqueue pipeline: queue is full")
495 }
496 }
497
498 return nil
499}
500
501func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
502 if repo.RepoDid == nil || *repo.RepoDid == "" {
503 return "", fmt.Errorf("pipeline trigger missing repoDid")
504 }
505 repoDid, err := syntax.ParseDID(*repo.RepoDid)
506 if err != nil {
507 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
508 }
509 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
510 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err)
511 }
512 return repoDid, nil
513}
514
515func (s *Spindle) configureOwner() error {
516 cfgOwner := s.cfg.Server.Owner
517
518 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
519 if err != nil {
520 return err
521 }
522
523 switch len(existing) {
524 case 0:
525 // no owner configured, continue
526 case 1:
527 // find existing owner
528 existingOwner := existing[0]
529
530 // no ownership change, this is okay
531 if existingOwner == s.cfg.Server.Owner {
532 break
533 }
534
535 // remove existing owner
536 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
537 if err != nil {
538 return nil
539 }
540 default:
541 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
542 }
543
544 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
545}