Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/jetstream"
20 "tangled.org/core/log"
21 "tangled.org/core/notifier"
22 "tangled.org/core/rbac"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/engines/nixery"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/queue"
29 "tangled.org/core/spindle/secrets"
30 "tangled.org/core/spindle/xrpc"
31 "tangled.org/core/xrpc/serviceauth"
32)
33
34//go:embed motd
35var defaultMotd []byte
36
37const (
38 rbacDomain = "thisserver"
39)
40
41type Spindle struct {
42 jc *jetstream.JetstreamClient
43 tap *Tap
44 embedTap *embeddedTap
45 db *db.DB
46 e *rbac.Enforcer
47 l *slog.Logger
48 n *notifier.Notifier
49 engs map[string]models.Engine
50 jq *queue.Queue
51 cfg *config.Config
52 ks *eventconsumer.Consumer
53 res *idresolver.Resolver
54 vault secrets.Manager
55 motd []byte
56 motdMu sync.RWMutex
57 workflowSem chan struct{}
58 rootCtx context.Context
59}
60
61// New creates a new Spindle server with the provided configuration and engines.
62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
63 logger := log.FromContext(ctx)
64
65 d, err := db.Make(ctx, cfg.Server.DBPath)
66 if err != nil {
67 return nil, fmt.Errorf("failed to setup db: %w", err)
68 }
69
70 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
71 if err != nil {
72 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
73 }
74 e.E.EnableAutoSave(true)
75
76 n := notifier.New()
77
78 var vault secrets.Manager
79 switch cfg.Server.Secrets.Provider {
80 case "openbao":
81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
83 }
84 vault, err = secrets.NewOpenBaoManager(
85 cfg.Server.Secrets.OpenBao.ProxyAddr,
86 logger,
87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
88 )
89 if err != nil {
90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
91 }
92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
93 case "sqlite", "":
94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
97 }
98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
99 default:
100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
101 }
102
103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
105
106 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows)
107 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows)
108
109 collections := []string{
110 tangled.SpindleMemberNSID,
111 }
112 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
113 if err != nil {
114 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
115 }
116 jc.AddDid(cfg.Server.Owner)
117
118 // Check if the spindle knows about any Dids;
119 dids, err := d.GetAllDids()
120 if err != nil {
121 return nil, fmt.Errorf("failed to get all dids: %w", err)
122 }
123 for _, d := range dids {
124 jc.AddDid(d)
125 }
126
127 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
128
129 spindle := &Spindle{
130 jc: jc,
131 e: e,
132 db: d,
133 l: logger,
134 n: &n,
135 engs: engines,
136 jq: jq,
137 cfg: cfg,
138 res: resolver,
139 vault: vault,
140 motd: defaultMotd,
141 workflowSem: workflowSem,
142 rootCtx: ctx,
143 }
144
145 err = e.AddSpindle(rbacDomain)
146 if err != nil {
147 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
148 }
149 err = spindle.configureOwner()
150 if err != nil {
151 return nil, err
152 }
153 logger.Info("owner set", "did", cfg.Server.Owner)
154
155 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
156 if err != nil {
157 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
158 }
159
160 err = jc.StartJetstream(ctx, spindle.ingest())
161 if err != nil {
162 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
163 }
164
165 // for each incoming sh.tangled.pipeline, we execute
166 // spindle.processPipeline, which in turn enqueues the pipeline
167 // job in the above registered queue.
168 ccfg := eventconsumer.NewConsumerConfig()
169 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
170 ccfg.Dev = cfg.Server.Dev
171 ccfg.ProcessFunc = spindle.processPipeline
172 ccfg.CursorStore = cursorStore
173 knownKnots, err := d.Knots()
174 if err != nil {
175 return nil, err
176 }
177 for _, knot := range knownKnots {
178 logger.Info("adding source start", "knot", knot)
179 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
180 }
181 spindle.ks = eventconsumer.NewConsumer(*ccfg)
182
183 if cfg.Server.Tap.Embed {
184 pw, err := randomAdminPassword()
185 if err != nil {
186 return nil, err
187 }
188 cfg.Server.Tap.AdminPassword = pw
189 logger.Info("embedded tap: using random admin password")
190 }
191 spindle.tap = NewTapClient(spindle)
192
193 return spindle, nil
194}
195
196// DB returns the database instance.
197func (s *Spindle) DB() *db.DB {
198 return s.db
199}
200
201// Queue returns the job queue instance.
202func (s *Spindle) Queue() *queue.Queue {
203 return s.jq
204}
205
206// Engines returns the map of available engines.
207func (s *Spindle) Engines() map[string]models.Engine {
208 return s.engs
209}
210
211// Vault returns the secrets manager instance.
212func (s *Spindle) Vault() secrets.Manager {
213 return s.vault
214}
215
216// Notifier returns the notifier instance.
217func (s *Spindle) Notifier() *notifier.Notifier {
218 return s.n
219}
220
221// Enforcer returns the RBAC enforcer instance.
222func (s *Spindle) Enforcer() *rbac.Enforcer {
223 return s.e
224}
225
226// SetMotdContent sets custom MOTD content, replacing the embedded default.
227func (s *Spindle) SetMotdContent(content []byte) {
228 s.motdMu.Lock()
229 defer s.motdMu.Unlock()
230 s.motd = content
231}
232
233// GetMotdContent returns the current MOTD content.
234func (s *Spindle) GetMotdContent() []byte {
235 s.motdMu.RLock()
236 defer s.motdMu.RUnlock()
237 return s.motd
238}
239
240// Start starts the Spindle server (blocking).
241func (s *Spindle) Start(ctx context.Context) error {
242 // starts a job queue runner in the background
243 s.jq.Start()
244 defer s.jq.Stop()
245
246 // Stop vault token renewal if it implements Stopper
247 if stopper, ok := s.vault.(secrets.Stopper); ok {
248 defer stopper.Stop()
249 }
250
251 if s.cfg.Server.Tap.Embed {
252 emb, err := startEmbeddedTap(ctx, s.cfg, log.SubLogger(s.l, "embedtap"))
253 if err != nil {
254 return fmt.Errorf("starting embedded tap: %w", err)
255 }
256 s.embedTap = emb
257 defer s.embedTap.Shutdown()
258 }
259
260 go func() {
261 s.l.Info("starting knot event consumer")
262 s.ks.Start(ctx)
263 }()
264
265 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
266 s.tap.Start(ctx)
267
268 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
269 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
270}
271
272func (s *Spindle) declareTapInterest(ctx context.Context) {
273 repos, err := s.db.AllRepos()
274 if err != nil {
275 s.l.Warn("tap declare: failed to load known repos", "err", err)
276 return
277 }
278 seen := make(map[syntax.DID]struct{}, len(repos))
279 dids := make([]syntax.DID, 0, len(repos))
280 for _, r := range repos {
281 if r.Owner == "" {
282 continue
283 }
284 if _, ok := seen[r.Owner]; ok {
285 continue
286 }
287 seen[r.Owner] = struct{}{}
288 dids = append(dids, r.Owner)
289 }
290 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
291 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
292 return
293 }
294 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
295}
296
297func Run(ctx context.Context) error {
298 cfg, err := config.Load(ctx)
299 if err != nil {
300 return fmt.Errorf("failed to load config: %w", err)
301 }
302
303 nixeryEng, err := nixery.New(ctx, cfg)
304 if err != nil {
305 return err
306 }
307
308 s, err := New(ctx, cfg, map[string]models.Engine{
309 "nixery": nixeryEng,
310 })
311 if err != nil {
312 return err
313 }
314
315 return s.Start(ctx)
316}
317
318func (s *Spindle) Router() http.Handler {
319 mux := chi.NewRouter()
320
321 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
322 w.Write(s.GetMotdContent())
323 })
324 mux.HandleFunc("/events", s.Events)
325 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
326
327 mux.Mount("/xrpc", s.XrpcRouter())
328 return mux
329}
330
331func (s *Spindle) XrpcRouter() http.Handler {
332 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
333
334 l := log.SubLogger(s.l, "xrpc")
335
336 x := xrpc.Xrpc{
337 Logger: l,
338 Db: s.db,
339 Enforcer: s.e,
340 Engines: s.engs,
341 Config: s.cfg,
342 Resolver: s.res,
343 Vault: s.vault,
344 Notifier: s.Notifier(),
345 ServiceAuth: serviceAuth,
346 }
347
348 return x.Router()
349}
350
351func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
352 if msg.Nsid == tangled.PipelineNSID {
353 tpl := tangled.Pipeline{}
354 err := json.Unmarshal(msg.EventJson, &tpl)
355 if err != nil {
356 s.l.Error("failed to unmarshal pipeline event", "err", err)
357 return err
358 }
359
360 if tpl.TriggerMetadata == nil {
361 return fmt.Errorf("no trigger metadata found")
362 }
363
364 if tpl.TriggerMetadata.Repo == nil {
365 return fmt.Errorf("no repo data found")
366 }
367
368 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
369 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
370 }
371
372 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
373 if err != nil {
374 return err
375 }
376
377 pipelineId := models.PipelineId{
378 Knot: src.Key(),
379 Rkey: msg.Rkey,
380 }
381
382 workflows := make(map[models.Engine][]models.Workflow)
383
384 // Build pipeline environment variables once for all workflows
385 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
386
387 for _, w := range tpl.Workflows {
388 if w != nil {
389 if _, ok := s.engs[w.Engine]; !ok {
390 err = s.db.StatusFailed(models.WorkflowId{
391 PipelineId: pipelineId,
392 Name: w.Name,
393 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
394 if err != nil {
395 return fmt.Errorf("db.StatusFailed: %w", err)
396 }
397
398 continue
399 }
400
401 eng := s.engs[w.Engine]
402
403 if _, ok := workflows[eng]; !ok {
404 workflows[eng] = []models.Workflow{}
405 }
406
407 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
408 if err != nil {
409 err = s.db.StatusFailed(models.WorkflowId{
410 PipelineId: pipelineId,
411 Name: w.Name,
412 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
413 if err != nil {
414 return fmt.Errorf("db.StatusFailed: %w", err)
415 }
416
417 continue
418 }
419
420 // inject TANGLED_* env vars after InitWorkflow
421 // This prevents user-defined env vars from overriding them
422 if ewf.Environment == nil {
423 ewf.Environment = make(map[string]string)
424 }
425 maps.Copy(ewf.Environment, pipelineEnv)
426
427 workflows[eng] = append(workflows[eng], *ewf)
428
429 err = s.db.StatusPending(models.WorkflowId{
430 PipelineId: pipelineId,
431 Name: w.Name,
432 }, s.n)
433 if err != nil {
434 return fmt.Errorf("db.StatusPending: %w", err)
435 }
436 }
437 }
438
439 ok := s.jq.Enqueue(queue.Job{
440 Run: func() error {
441 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{
442 RepoDid: repoDid,
443 Workflows: workflows,
444 }, pipelineId)
445 return nil
446 },
447 OnFail: func(jobError error) {
448 s.l.Error("pipeline run failed", "error", jobError)
449 },
450 })
451 if ok {
452 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
453 } else {
454 s.l.Error("failed to enqueue pipeline: queue is full")
455 }
456 }
457
458 return nil
459}
460
461func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
462 if repo.RepoDid == nil || *repo.RepoDid == "" {
463 return "", fmt.Errorf("pipeline trigger missing repoDid")
464 }
465 repoDid, err := syntax.ParseDID(*repo.RepoDid)
466 if err != nil {
467 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
468 }
469 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
470 s.l.Warn("accepting knot pipeline assertion for unknown repoDid", "repoDid", repoDid, "err", err)
471 }
472 return repoDid, nil
473}
474
475func (s *Spindle) configureOwner() error {
476 cfgOwner := s.cfg.Server.Owner
477
478 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
479 if err != nil {
480 return err
481 }
482
483 switch len(existing) {
484 case 0:
485 // no owner configured, continue
486 case 1:
487 // find existing owner
488 existingOwner := existing[0]
489
490 // no ownership change, this is okay
491 if existingOwner == s.cfg.Server.Owner {
492 break
493 }
494
495 // remove existing owner
496 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
497 if err != nil {
498 return nil
499 }
500 default:
501 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
502 }
503
504 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
505}