Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/jetstream"
20 "tangled.org/core/log"
21 "tangled.org/core/notifier"
22 "tangled.org/core/rbac"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/engines/nixery"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/queue"
29 "tangled.org/core/spindle/secrets"
30 "tangled.org/core/spindle/xrpc"
31 "tangled.org/core/xrpc/serviceauth"
32)
33
34//go:embed motd
35var defaultMotd []byte
36
37const (
38 rbacDomain = "thisserver"
39)
40
41type Spindle struct {
42 jc *jetstream.JetstreamClient
43 tap *Tap
44 embedTap *embeddedTap
45 db *db.DB
46 e *rbac.Enforcer
47 l *slog.Logger
48 n *notifier.Notifier
49 engs map[string]models.Engine
50 jq *queue.Queue
51 cfg *config.Config
52 ks *eventconsumer.Consumer
53 res *idresolver.Resolver
54 vault secrets.Manager
55 motd []byte
56 motdMu sync.RWMutex
57 workflowSem chan struct{}
58 rootCtx context.Context
59}
60
61// New creates a new Spindle server with the provided configuration and engines.
62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
63 logger := log.FromContext(ctx)
64
65 d, err := db.Make(ctx, cfg.Server.DBPath)
66 if err != nil {
67 return nil, fmt.Errorf("failed to setup db: %w", err)
68 }
69
70 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
71 if err != nil {
72 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
73 }
74 e.E.EnableAutoSave(true)
75
76 n := notifier.New()
77
78 var vault secrets.Manager
79 switch cfg.Server.Secrets.Provider {
80 case "openbao":
81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
83 }
84 vault, err = secrets.NewOpenBaoManager(
85 cfg.Server.Secrets.OpenBao.ProxyAddr,
86 logger,
87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
88 )
89 if err != nil {
90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
91 }
92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
93 case "sqlite", "":
94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
97 }
98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
99 default:
100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
101 }
102
103 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil {
104 return nil, fmt.Errorf("failed to run startup migrations: %w", err)
105 }
106
107 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
108 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
109
110 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows)
111 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows)
112
113 collections := []string{
114 tangled.SpindleMemberNSID,
115 tangled.RepoNSID,
116 tangled.RepoCollaboratorNSID,
117 }
118 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
119 if err != nil {
120 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
121 }
122 jc.AddDid(cfg.Server.Owner)
123
124 // Check if the spindle knows about any Dids;
125 dids, err := d.GetAllDids()
126 if err != nil {
127 return nil, fmt.Errorf("failed to get all dids: %w", err)
128 }
129 for _, d := range dids {
130 jc.AddDid(d)
131 }
132
133 knownRepos, err := d.AllRepos()
134 if err != nil {
135 return nil, fmt.Errorf("failed to get known repos: %w", err)
136 }
137 for _, r := range knownRepos {
138 if r.Owner != "" {
139 jc.AddDid(r.Owner.String())
140 }
141 }
142
143 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
144
145 spindle := &Spindle{
146 jc: jc,
147 e: e,
148 db: d,
149 l: logger,
150 n: &n,
151 engs: engines,
152 jq: jq,
153 cfg: cfg,
154 res: resolver,
155 vault: vault,
156 motd: defaultMotd,
157 workflowSem: workflowSem,
158 rootCtx: ctx,
159 }
160
161 err = e.AddSpindle(rbacDomain)
162 if err != nil {
163 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
164 }
165 err = spindle.configureOwner()
166 if err != nil {
167 return nil, err
168 }
169 logger.Info("owner set", "did", cfg.Server.Owner)
170
171 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
172 if err != nil {
173 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
174 }
175
176 err = jc.StartJetstream(ctx, spindle.ingest())
177 if err != nil {
178 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
179 }
180
181 // for each incoming sh.tangled.pipeline, we execute
182 // spindle.processPipeline, which in turn enqueues the pipeline
183 // job in the above registered queue.
184 ccfg := eventconsumer.NewConsumerConfig()
185 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
186 ccfg.Dev = cfg.Server.Dev
187 ccfg.ProcessFunc = spindle.processPipeline
188 ccfg.CursorStore = cursorStore
189 knownKnots, err := d.Knots()
190 if err != nil {
191 return nil, err
192 }
193 for _, knot := range knownKnots {
194 logger.Info("adding source start", "knot", knot)
195 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
196 }
197 spindle.ks = eventconsumer.NewConsumer(*ccfg)
198
199 if cfg.Server.Tap.Embed {
200 pw, err := randomAdminPassword()
201 if err != nil {
202 return nil, err
203 }
204 cfg.Server.Tap.AdminPassword = pw
205 logger.Info("embedded tap: using random admin password")
206 }
207 spindle.tap = NewTapClient(spindle)
208
209 return spindle, nil
210}
211
212// DB returns the database instance.
213func (s *Spindle) DB() *db.DB {
214 return s.db
215}
216
217// Queue returns the job queue instance.
218func (s *Spindle) Queue() *queue.Queue {
219 return s.jq
220}
221
222// Engines returns the map of available engines.
223func (s *Spindle) Engines() map[string]models.Engine {
224 return s.engs
225}
226
227// Vault returns the secrets manager instance.
228func (s *Spindle) Vault() secrets.Manager {
229 return s.vault
230}
231
232// Notifier returns the notifier instance.
233func (s *Spindle) Notifier() *notifier.Notifier {
234 return s.n
235}
236
237// Enforcer returns the RBAC enforcer instance.
238func (s *Spindle) Enforcer() *rbac.Enforcer {
239 return s.e
240}
241
242// SetMotdContent sets custom MOTD content, replacing the embedded default.
243func (s *Spindle) SetMotdContent(content []byte) {
244 s.motdMu.Lock()
245 defer s.motdMu.Unlock()
246 s.motd = content
247}
248
249// GetMotdContent returns the current MOTD content.
250func (s *Spindle) GetMotdContent() []byte {
251 s.motdMu.RLock()
252 defer s.motdMu.RUnlock()
253 return s.motd
254}
255
256// Start starts the Spindle server (blocking).
257func (s *Spindle) Start(ctx context.Context) error {
258 // starts a job queue runner in the background
259 s.jq.Start()
260 defer s.jq.Stop()
261
262 // Stop vault token renewal if it implements Stopper
263 if stopper, ok := s.vault.(secrets.Stopper); ok {
264 defer stopper.Stop()
265 }
266
267 tapCtx, tapCancel := context.WithCancel(ctx)
268
269 if s.cfg.Server.Tap.Embed {
270 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap"))
271 if err != nil {
272 tapCancel()
273 return fmt.Errorf("starting embedded tap: %w", err)
274 }
275 s.embedTap = emb
276 defer func() {
277 tapCancel()
278 s.embedTap.Shutdown()
279 }()
280
281 go s.watchTapDrain(tapCtx, tapCancel)
282 } else {
283 defer tapCancel()
284 }
285
286 go func() {
287 s.l.Info("starting knot event consumer")
288 s.ks.Start(ctx)
289 }()
290
291 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
292 s.tap.Start(tapCtx)
293
294 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
295 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
296}
297
298func (s *Spindle) declareTapInterest(ctx context.Context) {
299 repos, err := s.db.AllRepos()
300 if err != nil {
301 s.l.Warn("tap declare: failed to load known repos", "err", err)
302 return
303 }
304 seen := make(map[syntax.DID]struct{}, len(repos))
305 dids := make([]syntax.DID, 0, len(repos))
306 for _, r := range repos {
307 if r.Owner == "" {
308 continue
309 }
310 if _, ok := seen[r.Owner]; ok {
311 continue
312 }
313 seen[r.Owner] = struct{}{}
314 dids = append(dids, r.Owner)
315 }
316 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
317 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
318 return
319 }
320 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
321}
322
323func Run(ctx context.Context) error {
324 cfg, err := config.Load(ctx)
325 if err != nil {
326 return fmt.Errorf("failed to load config: %w", err)
327 }
328
329 nixeryEng, err := nixery.New(ctx, cfg)
330 if err != nil {
331 return err
332 }
333
334 s, err := New(ctx, cfg, map[string]models.Engine{
335 "nixery": nixeryEng,
336 })
337 if err != nil {
338 return err
339 }
340
341 return s.Start(ctx)
342}
343
344func (s *Spindle) Router() http.Handler {
345 mux := chi.NewRouter()
346
347 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
348 w.Write(s.GetMotdContent())
349 })
350 mux.HandleFunc("/events", s.Events)
351 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
352
353 mux.Mount("/xrpc", s.XrpcRouter())
354 return mux
355}
356
357func (s *Spindle) XrpcRouter() http.Handler {
358 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
359
360 l := log.SubLogger(s.l, "xrpc")
361
362 x := xrpc.Xrpc{
363 Logger: l,
364 Db: s.db,
365 Enforcer: s.e,
366 Engines: s.engs,
367 Config: s.cfg,
368 Resolver: s.res,
369 Vault: s.vault,
370 Notifier: s.Notifier(),
371 ServiceAuth: serviceAuth,
372 }
373
374 return x.Router()
375}
376
377func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
378 if msg.Nsid == tangled.PipelineNSID {
379 tpl := tangled.Pipeline{}
380 err := json.Unmarshal(msg.EventJson, &tpl)
381 if err != nil {
382 s.l.Error("failed to unmarshal pipeline event", "err", err)
383 return err
384 }
385
386 if tpl.TriggerMetadata == nil {
387 return fmt.Errorf("no trigger metadata found")
388 }
389
390 if tpl.TriggerMetadata.Repo == nil {
391 return fmt.Errorf("no repo data found")
392 }
393
394 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
395 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
396 }
397
398 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
399 if err != nil {
400 return err
401 }
402
403 pipelineId := models.PipelineId{
404 Knot: src.Key(),
405 Rkey: msg.Rkey,
406 }
407
408 workflows := make(map[models.Engine][]models.Workflow)
409
410 // Build pipeline environment variables once for all workflows
411 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
412
413 for _, w := range tpl.Workflows {
414 if w != nil {
415 if _, ok := s.engs[w.Engine]; !ok {
416 err = s.db.StatusFailed(models.WorkflowId{
417 PipelineId: pipelineId,
418 Name: w.Name,
419 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
420 if err != nil {
421 return fmt.Errorf("db.StatusFailed: %w", err)
422 }
423
424 continue
425 }
426
427 eng := s.engs[w.Engine]
428
429 if _, ok := workflows[eng]; !ok {
430 workflows[eng] = []models.Workflow{}
431 }
432
433 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
434 if err != nil {
435 err = s.db.StatusFailed(models.WorkflowId{
436 PipelineId: pipelineId,
437 Name: w.Name,
438 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
439 if err != nil {
440 return fmt.Errorf("db.StatusFailed: %w", err)
441 }
442
443 continue
444 }
445
446 // inject TANGLED_* env vars after InitWorkflow
447 // This prevents user-defined env vars from overriding them
448 if ewf.Environment == nil {
449 ewf.Environment = make(map[string]string)
450 }
451 maps.Copy(ewf.Environment, pipelineEnv)
452
453 workflows[eng] = append(workflows[eng], *ewf)
454
455 err = s.db.StatusPending(models.WorkflowId{
456 PipelineId: pipelineId,
457 Name: w.Name,
458 }, s.n)
459 if err != nil {
460 return fmt.Errorf("db.StatusPending: %w", err)
461 }
462 }
463 }
464
465 ok := s.jq.Enqueue(queue.Job{
466 Run: func() error {
467 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{
468 RepoDid: repoDid,
469 Workflows: workflows,
470 }, pipelineId)
471 return nil
472 },
473 OnFail: func(jobError error) {
474 s.l.Error("pipeline run failed", "error", jobError)
475 },
476 })
477 if ok {
478 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
479 } else {
480 s.l.Error("failed to enqueue pipeline: queue is full")
481 }
482 }
483
484 return nil
485}
486
487func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
488 if repo.RepoDid == nil || *repo.RepoDid == "" {
489 return "", fmt.Errorf("pipeline trigger missing repoDid")
490 }
491 repoDid, err := syntax.ParseDID(*repo.RepoDid)
492 if err != nil {
493 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
494 }
495 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
496 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err)
497 }
498 return repoDid, nil
499}
500
501func (s *Spindle) configureOwner() error {
502 cfgOwner := s.cfg.Server.Owner
503
504 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
505 if err != nil {
506 return err
507 }
508
509 switch len(existing) {
510 case 0:
511 // no owner configured, continue
512 case 1:
513 // find existing owner
514 existingOwner := existing[0]
515
516 // no ownership change, this is okay
517 if existingOwner == s.cfg.Server.Owner {
518 break
519 }
520
521 // remove existing owner
522 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
523 if err != nil {
524 return nil
525 }
526 default:
527 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
528 }
529
530 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
531}