Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/jetstream"
20 "tangled.org/core/log"
21 "tangled.org/core/notifier"
22 "tangled.org/core/rbac"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/engines/nixery"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/queue"
29 "tangled.org/core/spindle/secrets"
30 "tangled.org/core/spindle/xrpc"
31 "tangled.org/core/xrpc/serviceauth"
32)
33
34//go:embed motd
35var defaultMotd []byte
36
37const (
38 rbacDomain = "thisserver"
39)
40
41type Spindle struct {
42 jc *jetstream.JetstreamClient
43 tap *Tap
44 embedTap *embeddedTap
45 db *db.DB
46 e *rbac.Enforcer
47 l *slog.Logger
48 n *notifier.Notifier
49 engs map[string]models.Engine
50 jq *queue.Queue
51 cfg *config.Config
52 ks *eventconsumer.Consumer
53 res *idresolver.Resolver
54 vault secrets.Manager
55 motd []byte
56 motdMu sync.RWMutex
57 workflowSem chan struct{}
58 rootCtx context.Context
59}
60
61// New creates a new Spindle server with the provided configuration and engines.
62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
63 logger := log.FromContext(ctx)
64
65 d, err := db.Make(ctx, cfg.Server.DBPath)
66 if err != nil {
67 return nil, fmt.Errorf("failed to setup db: %w", err)
68 }
69
70 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
71 if err != nil {
72 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
73 }
74 e.E.EnableAutoSave(true)
75
76 n := notifier.New()
77
78 var vault secrets.Manager
79 switch cfg.Server.Secrets.Provider {
80 case "openbao":
81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
83 }
84 vault, err = secrets.NewOpenBaoManager(
85 cfg.Server.Secrets.OpenBao.ProxyAddr,
86 logger,
87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
88 )
89 if err != nil {
90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
91 }
92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
93 case "sqlite", "":
94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
97 }
98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
99 default:
100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
101 }
102
103 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil {
104 return nil, fmt.Errorf("failed to run startup migrations: %w", err)
105 }
106
107 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
108 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
109
110 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows)
111 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows)
112
113 collections := []string{
114 tangled.SpindleMemberNSID,
115 }
116 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
117 if err != nil {
118 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
119 }
120 jc.AddDid(cfg.Server.Owner)
121
122 // Check if the spindle knows about any Dids;
123 dids, err := d.GetAllDids()
124 if err != nil {
125 return nil, fmt.Errorf("failed to get all dids: %w", err)
126 }
127 for _, d := range dids {
128 jc.AddDid(d)
129 }
130
131 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
132
133 spindle := &Spindle{
134 jc: jc,
135 e: e,
136 db: d,
137 l: logger,
138 n: &n,
139 engs: engines,
140 jq: jq,
141 cfg: cfg,
142 res: resolver,
143 vault: vault,
144 motd: defaultMotd,
145 workflowSem: workflowSem,
146 rootCtx: ctx,
147 }
148
149 err = e.AddSpindle(rbacDomain)
150 if err != nil {
151 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
152 }
153 err = spindle.configureOwner()
154 if err != nil {
155 return nil, err
156 }
157 logger.Info("owner set", "did", cfg.Server.Owner)
158
159 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
160 if err != nil {
161 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
162 }
163
164 err = jc.StartJetstream(ctx, spindle.ingest())
165 if err != nil {
166 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
167 }
168
169 // for each incoming sh.tangled.pipeline, we execute
170 // spindle.processPipeline, which in turn enqueues the pipeline
171 // job in the above registered queue.
172 ccfg := eventconsumer.NewConsumerConfig()
173 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
174 ccfg.Dev = cfg.Server.Dev
175 ccfg.ProcessFunc = spindle.processPipeline
176 ccfg.CursorStore = cursorStore
177 knownKnots, err := d.Knots()
178 if err != nil {
179 return nil, err
180 }
181 for _, knot := range knownKnots {
182 logger.Info("adding source start", "knot", knot)
183 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
184 }
185 spindle.ks = eventconsumer.NewConsumer(*ccfg)
186
187 if cfg.Server.Tap.Embed {
188 pw, err := randomAdminPassword()
189 if err != nil {
190 return nil, err
191 }
192 cfg.Server.Tap.AdminPassword = pw
193 logger.Info("embedded tap: using random admin password")
194 }
195 spindle.tap = NewTapClient(spindle)
196
197 return spindle, nil
198}
199
200// DB returns the database instance.
201func (s *Spindle) DB() *db.DB {
202 return s.db
203}
204
205// Queue returns the job queue instance.
206func (s *Spindle) Queue() *queue.Queue {
207 return s.jq
208}
209
210// Engines returns the map of available engines.
211func (s *Spindle) Engines() map[string]models.Engine {
212 return s.engs
213}
214
215// Vault returns the secrets manager instance.
216func (s *Spindle) Vault() secrets.Manager {
217 return s.vault
218}
219
220// Notifier returns the notifier instance.
221func (s *Spindle) Notifier() *notifier.Notifier {
222 return s.n
223}
224
225// Enforcer returns the RBAC enforcer instance.
226func (s *Spindle) Enforcer() *rbac.Enforcer {
227 return s.e
228}
229
230// SetMotdContent sets custom MOTD content, replacing the embedded default.
231func (s *Spindle) SetMotdContent(content []byte) {
232 s.motdMu.Lock()
233 defer s.motdMu.Unlock()
234 s.motd = content
235}
236
237// GetMotdContent returns the current MOTD content.
238func (s *Spindle) GetMotdContent() []byte {
239 s.motdMu.RLock()
240 defer s.motdMu.RUnlock()
241 return s.motd
242}
243
244// Start starts the Spindle server (blocking).
245func (s *Spindle) Start(ctx context.Context) error {
246 // starts a job queue runner in the background
247 s.jq.Start()
248 defer s.jq.Stop()
249
250 // Stop vault token renewal if it implements Stopper
251 if stopper, ok := s.vault.(secrets.Stopper); ok {
252 defer stopper.Stop()
253 }
254
255 if s.cfg.Server.Tap.Embed {
256 emb, err := startEmbeddedTap(ctx, s.cfg, log.SubLogger(s.l, "embedtap"))
257 if err != nil {
258 return fmt.Errorf("starting embedded tap: %w", err)
259 }
260 s.embedTap = emb
261 defer s.embedTap.Shutdown()
262 }
263
264 go func() {
265 s.l.Info("starting knot event consumer")
266 s.ks.Start(ctx)
267 }()
268
269 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
270 s.tap.Start(ctx)
271
272 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
273 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
274}
275
276func (s *Spindle) declareTapInterest(ctx context.Context) {
277 repos, err := s.db.AllRepos()
278 if err != nil {
279 s.l.Warn("tap declare: failed to load known repos", "err", err)
280 return
281 }
282 seen := make(map[syntax.DID]struct{}, len(repos))
283 dids := make([]syntax.DID, 0, len(repos))
284 for _, r := range repos {
285 if r.Owner == "" {
286 continue
287 }
288 if _, ok := seen[r.Owner]; ok {
289 continue
290 }
291 seen[r.Owner] = struct{}{}
292 dids = append(dids, r.Owner)
293 }
294 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
295 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
296 return
297 }
298 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
299}
300
301func Run(ctx context.Context) error {
302 cfg, err := config.Load(ctx)
303 if err != nil {
304 return fmt.Errorf("failed to load config: %w", err)
305 }
306
307 nixeryEng, err := nixery.New(ctx, cfg)
308 if err != nil {
309 return err
310 }
311
312 s, err := New(ctx, cfg, map[string]models.Engine{
313 "nixery": nixeryEng,
314 })
315 if err != nil {
316 return err
317 }
318
319 return s.Start(ctx)
320}
321
322func (s *Spindle) Router() http.Handler {
323 mux := chi.NewRouter()
324
325 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
326 w.Write(s.GetMotdContent())
327 })
328 mux.HandleFunc("/events", s.Events)
329 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
330
331 mux.Mount("/xrpc", s.XrpcRouter())
332 return mux
333}
334
335func (s *Spindle) XrpcRouter() http.Handler {
336 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
337
338 l := log.SubLogger(s.l, "xrpc")
339
340 x := xrpc.Xrpc{
341 Logger: l,
342 Db: s.db,
343 Enforcer: s.e,
344 Engines: s.engs,
345 Config: s.cfg,
346 Resolver: s.res,
347 Vault: s.vault,
348 Notifier: s.Notifier(),
349 ServiceAuth: serviceAuth,
350 }
351
352 return x.Router()
353}
354
355func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
356 if msg.Nsid == tangled.PipelineNSID {
357 tpl := tangled.Pipeline{}
358 err := json.Unmarshal(msg.EventJson, &tpl)
359 if err != nil {
360 s.l.Error("failed to unmarshal pipeline event", "err", err)
361 return err
362 }
363
364 if tpl.TriggerMetadata == nil {
365 return fmt.Errorf("no trigger metadata found")
366 }
367
368 if tpl.TriggerMetadata.Repo == nil {
369 return fmt.Errorf("no repo data found")
370 }
371
372 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
373 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
374 }
375
376 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
377 if err != nil {
378 return err
379 }
380
381 pipelineId := models.PipelineId{
382 Knot: src.Key(),
383 Rkey: msg.Rkey,
384 }
385
386 workflows := make(map[models.Engine][]models.Workflow)
387
388 // Build pipeline environment variables once for all workflows
389 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
390
391 for _, w := range tpl.Workflows {
392 if w != nil {
393 if _, ok := s.engs[w.Engine]; !ok {
394 err = s.db.StatusFailed(models.WorkflowId{
395 PipelineId: pipelineId,
396 Name: w.Name,
397 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
398 if err != nil {
399 return fmt.Errorf("db.StatusFailed: %w", err)
400 }
401
402 continue
403 }
404
405 eng := s.engs[w.Engine]
406
407 if _, ok := workflows[eng]; !ok {
408 workflows[eng] = []models.Workflow{}
409 }
410
411 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
412 if err != nil {
413 err = s.db.StatusFailed(models.WorkflowId{
414 PipelineId: pipelineId,
415 Name: w.Name,
416 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
417 if err != nil {
418 return fmt.Errorf("db.StatusFailed: %w", err)
419 }
420
421 continue
422 }
423
424 // inject TANGLED_* env vars after InitWorkflow
425 // This prevents user-defined env vars from overriding them
426 if ewf.Environment == nil {
427 ewf.Environment = make(map[string]string)
428 }
429 maps.Copy(ewf.Environment, pipelineEnv)
430
431 workflows[eng] = append(workflows[eng], *ewf)
432
433 err = s.db.StatusPending(models.WorkflowId{
434 PipelineId: pipelineId,
435 Name: w.Name,
436 }, s.n)
437 if err != nil {
438 return fmt.Errorf("db.StatusPending: %w", err)
439 }
440 }
441 }
442
443 ok := s.jq.Enqueue(queue.Job{
444 Run: func() error {
445 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{
446 RepoDid: repoDid,
447 Workflows: workflows,
448 }, pipelineId)
449 return nil
450 },
451 OnFail: func(jobError error) {
452 s.l.Error("pipeline run failed", "error", jobError)
453 },
454 })
455 if ok {
456 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
457 } else {
458 s.l.Error("failed to enqueue pipeline: queue is full")
459 }
460 }
461
462 return nil
463}
464
465func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
466 if repo.RepoDid == nil || *repo.RepoDid == "" {
467 return "", fmt.Errorf("pipeline trigger missing repoDid")
468 }
469 repoDid, err := syntax.ParseDID(*repo.RepoDid)
470 if err != nil {
471 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
472 }
473 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
474 s.l.Warn("accepting knot pipeline assertion for unknown repoDid", "repoDid", repoDid, "err", err)
475 }
476 return repoDid, nil
477}
478
479func (s *Spindle) configureOwner() error {
480 cfgOwner := s.cfg.Server.Owner
481
482 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
483 if err != nil {
484 return err
485 }
486
487 switch len(existing) {
488 case 0:
489 // no owner configured, continue
490 case 1:
491 // find existing owner
492 existingOwner := existing[0]
493
494 // no ownership change, this is okay
495 if existingOwner == s.cfg.Server.Owner {
496 break
497 }
498
499 // remove existing owner
500 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
501 if err != nil {
502 return nil
503 }
504 default:
505 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
506 }
507
508 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
509}