Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/eventstream"
19 "tangled.org/core/idresolver"
20 "tangled.org/core/jetstream"
21 "tangled.org/core/log"
22 "tangled.org/core/notifier"
23 "tangled.org/core/rbac"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/db"
26 "tangled.org/core/spindle/engine"
27 "tangled.org/core/spindle/engines/dummy"
28 "tangled.org/core/spindle/engines/nixery"
29 "tangled.org/core/spindle/models"
30 "tangled.org/core/spindle/queue"
31 "tangled.org/core/spindle/secrets"
32 "tangled.org/core/spindle/xrpc"
33 "tangled.org/core/xrpc/serviceauth"
34)
35
36//go:embed motd
37var defaultMotd []byte
38
39const (
40 rbacDomain = "thisserver"
41)
42
43type Spindle struct {
44 jc *jetstream.JetstreamClient
45 tap *Tap
46 embedTap *embeddedTap
47 db *db.DB
48 e *rbac.Enforcer
49 l *slog.Logger
50 n *notifier.Notifier
51 engs map[string]models.Engine
52 jq *queue.Queue
53 cfg *config.Config
54 ks *eventconsumer.Consumer
55 res *idresolver.Resolver
56 vault secrets.Manager
57 motd []byte
58 motdMu sync.RWMutex
59 workflowSem chan struct{}
60 rootCtx context.Context
61}
62
63// New creates a new Spindle server with the provided configuration and engines.
64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
65 logger := log.FromContext(ctx)
66
67 d, err := db.Make(ctx, cfg.Server.DBPath)
68 if err != nil {
69 return nil, fmt.Errorf("failed to setup db: %w", err)
70 }
71
72 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
73 if err != nil {
74 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
75 }
76 e.E.EnableAutoSave(true)
77
78 n := notifier.New()
79
80 var vault secrets.Manager
81 switch cfg.Server.Secrets.Provider {
82 case "openbao":
83 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
84 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
85 }
86 vault, err = secrets.NewOpenBaoManager(
87 cfg.Server.Secrets.OpenBao.ProxyAddr,
88 logger,
89 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
90 )
91 if err != nil {
92 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
93 }
94 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
95 case "sqlite", "":
96 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
97 if err != nil {
98 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
99 }
100 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
101 default:
102 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
103 }
104
105 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil {
106 return nil, fmt.Errorf("failed to run startup migrations: %w", err)
107 }
108
109 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
110 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
111
112 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows)
113 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows)
114
115 collections := []string{
116 tangled.SpindleMemberNSID,
117 tangled.RepoNSID,
118 tangled.RepoCollaboratorNSID,
119 }
120 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
121 if err != nil {
122 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
123 }
124 jc.AddDid(cfg.Server.Owner)
125
126 // Check if the spindle knows about any Dids;
127 dids, err := d.GetAllDids()
128 if err != nil {
129 return nil, fmt.Errorf("failed to get all dids: %w", err)
130 }
131 for _, d := range dids {
132 jc.AddDid(d)
133 }
134
135 knownRepos, err := d.AllRepos()
136 if err != nil {
137 return nil, fmt.Errorf("failed to get known repos: %w", err)
138 }
139 for _, r := range knownRepos {
140 if r.Owner != "" {
141 jc.AddDid(r.Owner.String())
142 }
143 }
144
145 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
146
147 spindle := &Spindle{
148 jc: jc,
149 e: e,
150 db: d,
151 l: logger,
152 n: &n,
153 engs: engines,
154 jq: jq,
155 cfg: cfg,
156 res: resolver,
157 vault: vault,
158 motd: defaultMotd,
159 workflowSem: workflowSem,
160 rootCtx: ctx,
161 }
162
163 err = e.AddSpindle(rbacDomain)
164 if err != nil {
165 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
166 }
167 err = spindle.configureOwner()
168 if err != nil {
169 return nil, err
170 }
171 logger.Info("owner set", "did", cfg.Server.Owner)
172
173 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
174 if err != nil {
175 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
176 }
177
178 err = jc.StartJetstream(ctx, spindle.ingest())
179 if err != nil {
180 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
181 }
182
183 // for each incoming sh.tangled.pipeline, we execute
184 // spindle.processPipeline, which in turn enqueues the pipeline
185 // job in the above registered queue.
186 ccfg := eventconsumer.NewConsumerConfig()
187 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
188 ccfg.URLFunc = eventconsumer.DefaultURL(cfg.Server.Dev)
189 ccfg.ProcessFunc = spindle.processPipeline
190 ccfg.CursorStore = cursorStore
191 knownKnots, err := d.Knots()
192 if err != nil {
193 return nil, err
194 }
195 for _, knot := range knownKnots {
196 logger.Info("adding source start", "knot", knot)
197 src := eventconsumer.NewKnotSource(knot)
198 eventconsumer.MigrateLegacyCursor(cursorStore, src)
199 ccfg.Sources[src] = struct{}{}
200 }
201 spindle.ks = eventconsumer.NewConsumer(*ccfg)
202
203 if cfg.Server.Tap.Embed {
204 pw, err := randomAdminPassword()
205 if err != nil {
206 return nil, err
207 }
208 cfg.Server.Tap.AdminPassword = pw
209 logger.Info("embedded tap: using random admin password")
210 }
211 spindle.tap = NewTapClient(spindle)
212
213 return spindle, nil
214}
215
216// DB returns the database instance.
217func (s *Spindle) DB() *db.DB {
218 return s.db
219}
220
221// Queue returns the job queue instance.
222func (s *Spindle) Queue() *queue.Queue {
223 return s.jq
224}
225
226// Engines returns the map of available engines.
227func (s *Spindle) Engines() map[string]models.Engine {
228 return s.engs
229}
230
231// Vault returns the secrets manager instance.
232func (s *Spindle) Vault() secrets.Manager {
233 return s.vault
234}
235
236// Notifier returns the notifier instance.
237func (s *Spindle) Notifier() *notifier.Notifier {
238 return s.n
239}
240
241// Enforcer returns the RBAC enforcer instance.
242func (s *Spindle) Enforcer() *rbac.Enforcer {
243 return s.e
244}
245
246// SetMotdContent sets custom MOTD content, replacing the embedded default.
247func (s *Spindle) SetMotdContent(content []byte) {
248 s.motdMu.Lock()
249 defer s.motdMu.Unlock()
250 s.motd = content
251}
252
253// GetMotdContent returns the current MOTD content.
254func (s *Spindle) GetMotdContent() []byte {
255 s.motdMu.RLock()
256 defer s.motdMu.RUnlock()
257 return s.motd
258}
259
260// Start starts the Spindle server (blocking).
261func (s *Spindle) Start(ctx context.Context) error {
262 // starts a job queue runner in the background
263 s.jq.Start()
264 defer s.jq.Stop()
265
266 // Stop vault token renewal if it implements Stopper
267 if stopper, ok := s.vault.(secrets.Stopper); ok {
268 defer stopper.Stop()
269 }
270
271 tapCtx, tapCancel := context.WithCancel(ctx)
272
273 if s.cfg.Server.Tap.Embed {
274 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap"))
275 if err != nil {
276 tapCancel()
277 return fmt.Errorf("starting embedded tap: %w", err)
278 }
279 s.embedTap = emb
280 defer func() {
281 tapCancel()
282 s.embedTap.Shutdown()
283 }()
284
285 go s.watchTapDrain(tapCtx, tapCancel)
286 } else {
287 defer tapCancel()
288 }
289
290 go func() {
291 s.l.Info("starting knot event consumer")
292 s.ks.Start(ctx)
293 }()
294
295 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
296 s.tap.Start(tapCtx)
297
298 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
299 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
300}
301
302func (s *Spindle) declareTapInterest(ctx context.Context) {
303 repos, err := s.db.AllRepos()
304 if err != nil {
305 s.l.Warn("tap declare: failed to load known repos", "err", err)
306 return
307 }
308 seen := make(map[syntax.DID]struct{}, len(repos))
309 dids := make([]syntax.DID, 0, len(repos))
310 for _, r := range repos {
311 if r.Owner == "" {
312 continue
313 }
314 if _, ok := seen[r.Owner]; ok {
315 continue
316 }
317 seen[r.Owner] = struct{}{}
318 dids = append(dids, r.Owner)
319 }
320 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
321 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
322 return
323 }
324 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
325}
326
327func Run(ctx context.Context) error {
328 cfg, err := config.Load(ctx)
329 if err != nil {
330 return fmt.Errorf("failed to load config: %w", err)
331 }
332
333 nixeryEng, err := nixery.New(ctx, cfg)
334 if err != nil {
335 return err
336 }
337
338 s, err := New(ctx, cfg, map[string]models.Engine{
339 "nixery": nixeryEng,
340 "dummy": dummy.New(log.FromContext(ctx)),
341 })
342 if err != nil {
343 return err
344 }
345
346 return s.Start(ctx)
347}
348
349func (s *Spindle) Router() http.Handler {
350 mux := chi.NewRouter()
351
352 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
353 w.Write(s.GetMotdContent())
354 })
355 mux.HandleFunc("/events", s.Events)
356 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
357
358 mux.Mount("/xrpc", s.XrpcRouter())
359 return mux
360}
361
362func (s *Spindle) XrpcRouter() http.Handler {
363 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
364
365 l := log.SubLogger(s.l, "xrpc")
366
367 x := xrpc.Xrpc{
368 Logger: l,
369 Db: s.db,
370 Enforcer: s.e,
371 Engines: s.engs,
372 Config: s.cfg,
373 Resolver: s.res,
374 Vault: s.vault,
375 Notifier: s.Notifier(),
376 ServiceAuth: serviceAuth,
377 }
378
379 return x.Router()
380}
381
382func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error {
383 if msg.Nsid == tangled.PipelineNSID {
384 tpl := tangled.Pipeline{}
385 err := json.Unmarshal(msg.EventJson, &tpl)
386 if err != nil {
387 s.l.Error("failed to unmarshal pipeline event", "err", err)
388 return err
389 }
390
391 if tpl.TriggerMetadata == nil {
392 return fmt.Errorf("no trigger metadata found")
393 }
394
395 if tpl.TriggerMetadata.Repo == nil {
396 return fmt.Errorf("no repo data found")
397 }
398
399 if src.Host != tpl.TriggerMetadata.Repo.Knot {
400 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot)
401 }
402
403 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
404 if err != nil {
405 return err
406 }
407
408 pipelineId := models.PipelineId{
409 Knot: src.Host,
410 Rkey: msg.Rkey,
411 }
412
413 workflows := make(map[models.Engine][]models.Workflow)
414
415 // Build pipeline environment variables once for all workflows
416 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
417
418 for _, w := range tpl.Workflows {
419 if w != nil {
420 if _, ok := s.engs[w.Engine]; !ok {
421 err = s.db.StatusFailed(models.WorkflowId{
422 PipelineId: pipelineId,
423 Name: w.Name,
424 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
425 if err != nil {
426 return fmt.Errorf("db.StatusFailed: %w", err)
427 }
428
429 continue
430 }
431
432 eng := s.engs[w.Engine]
433
434 if _, ok := workflows[eng]; !ok {
435 workflows[eng] = []models.Workflow{}
436 }
437
438 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
439 if err != nil {
440 err = s.db.StatusFailed(models.WorkflowId{
441 PipelineId: pipelineId,
442 Name: w.Name,
443 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
444 if err != nil {
445 return fmt.Errorf("db.StatusFailed: %w", err)
446 }
447
448 continue
449 }
450
451 // inject TANGLED_* env vars after InitWorkflow
452 // This prevents user-defined env vars from overriding them
453 if ewf.Environment == nil {
454 ewf.Environment = make(map[string]string)
455 }
456 maps.Copy(ewf.Environment, pipelineEnv)
457
458 workflows[eng] = append(workflows[eng], *ewf)
459
460 err = s.db.StatusPending(models.WorkflowId{
461 PipelineId: pipelineId,
462 Name: w.Name,
463 }, s.n)
464 if err != nil {
465 return fmt.Errorf("db.StatusPending: %w", err)
466 }
467 }
468 }
469
470 ok := s.jq.Enqueue(queue.Job{
471 Run: func() error {
472 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{
473 RepoDid: repoDid,
474 Workflows: workflows,
475 }, pipelineId)
476 return nil
477 },
478 OnFail: func(jobError error) {
479 s.l.Error("pipeline run failed", "error", jobError)
480 },
481 })
482 if ok {
483 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
484 } else {
485 s.l.Error("failed to enqueue pipeline: queue is full")
486 }
487 }
488
489 return nil
490}
491
492func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
493 if repo.RepoDid == nil || *repo.RepoDid == "" {
494 return "", fmt.Errorf("pipeline trigger missing repoDid")
495 }
496 repoDid, err := syntax.ParseDID(*repo.RepoDid)
497 if err != nil {
498 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
499 }
500 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
501 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err)
502 }
503 return repoDid, nil
504}
505
506func (s *Spindle) configureOwner() error {
507 cfgOwner := s.cfg.Server.Owner
508
509 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
510 if err != nil {
511 return err
512 }
513
514 switch len(existing) {
515 case 0:
516 // no owner configured, continue
517 case 1:
518 // find existing owner
519 existingOwner := existing[0]
520
521 // no ownership change, this is okay
522 if existingOwner == s.cfg.Server.Owner {
523 break
524 }
525
526 // remove existing owner
527 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
528 if err != nil {
529 return nil
530 }
531 default:
532 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
533 }
534
535 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
536}