Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/jetstream"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/queue"
28 "tangled.org/core/spindle/secrets"
29 "tangled.org/core/spindle/xrpc"
30 "tangled.org/core/xrpc/serviceauth"
31)
32
33//go:embed motd
34var defaultMotd []byte
35
36const (
37 rbacDomain = "thisserver"
38)
39
40type Spindle struct {
41 jc *jetstream.JetstreamClient
42 db *db.DB
43 e *rbac.Enforcer
44 l *slog.Logger
45 n *notifier.Notifier
46 engs map[string]models.Engine
47 jq *queue.Queue
48 cfg *config.Config
49 ks *eventconsumer.Consumer
50 res *idresolver.Resolver
51 vault secrets.Manager
52 motd []byte
53 motdMu sync.RWMutex
54 workflowSem chan struct{}
55}
56
57// New creates a new Spindle server with the provided configuration and engines.
58func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
59 logger := log.FromContext(ctx)
60
61 d, err := db.Make(cfg.Server.DBPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to setup db: %w", err)
64 }
65
66 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
69 }
70 e.E.EnableAutoSave(true)
71
72 n := notifier.New()
73
74 var vault secrets.Manager
75 switch cfg.Server.Secrets.Provider {
76 case "openbao":
77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
78 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
79 }
80 vault, err = secrets.NewOpenBaoManager(
81 cfg.Server.Secrets.OpenBao.ProxyAddr,
82 logger,
83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
84 )
85 if err != nil {
86 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
87 }
88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
89 case "sqlite", "":
90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
91 if err != nil {
92 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
93 }
94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
95 default:
96 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
97 }
98
99 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
100 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
101
102 workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows)
103 logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows)
104
105 collections := []string{
106 tangled.SpindleMemberNSID,
107 tangled.RepoNSID,
108 tangled.RepoCollaboratorNSID,
109 }
110 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
111 if err != nil {
112 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
113 }
114 jc.AddDid(cfg.Server.Owner)
115
116 // Check if the spindle knows about any Dids;
117 dids, err := d.GetAllDids()
118 if err != nil {
119 return nil, fmt.Errorf("failed to get all dids: %w", err)
120 }
121 for _, d := range dids {
122 jc.AddDid(d)
123 }
124
125 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
126
127 spindle := &Spindle{
128 jc: jc,
129 e: e,
130 db: d,
131 l: logger,
132 n: &n,
133 engs: engines,
134 jq: jq,
135 cfg: cfg,
136 res: resolver,
137 vault: vault,
138 motd: defaultMotd,
139 workflowSem: workflowSem,
140 }
141
142 err = e.AddSpindle(rbacDomain)
143 if err != nil {
144 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
145 }
146 err = spindle.configureOwner()
147 if err != nil {
148 return nil, err
149 }
150 logger.Info("owner set", "did", cfg.Server.Owner)
151
152 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
153 if err != nil {
154 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
155 }
156
157 err = jc.StartJetstream(ctx, spindle.ingest())
158 if err != nil {
159 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
160 }
161
162 // for each incoming sh.tangled.pipeline, we execute
163 // spindle.processPipeline, which in turn enqueues the pipeline
164 // job in the above registered queue.
165 ccfg := eventconsumer.NewConsumerConfig()
166 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
167 ccfg.Dev = cfg.Server.Dev
168 ccfg.ProcessFunc = spindle.processPipeline
169 ccfg.CursorStore = cursorStore
170 knownKnots, err := d.Knots()
171 if err != nil {
172 return nil, err
173 }
174 for _, knot := range knownKnots {
175 logger.Info("adding source start", "knot", knot)
176 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
177 }
178 spindle.ks = eventconsumer.NewConsumer(*ccfg)
179
180 return spindle, nil
181}
182
183// DB returns the database instance.
184func (s *Spindle) DB() *db.DB {
185 return s.db
186}
187
188// Queue returns the job queue instance.
189func (s *Spindle) Queue() *queue.Queue {
190 return s.jq
191}
192
193// Engines returns the map of available engines.
194func (s *Spindle) Engines() map[string]models.Engine {
195 return s.engs
196}
197
198// Vault returns the secrets manager instance.
199func (s *Spindle) Vault() secrets.Manager {
200 return s.vault
201}
202
203// Notifier returns the notifier instance.
204func (s *Spindle) Notifier() *notifier.Notifier {
205 return s.n
206}
207
208// Enforcer returns the RBAC enforcer instance.
209func (s *Spindle) Enforcer() *rbac.Enforcer {
210 return s.e
211}
212
213// SetMotdContent sets custom MOTD content, replacing the embedded default.
214func (s *Spindle) SetMotdContent(content []byte) {
215 s.motdMu.Lock()
216 defer s.motdMu.Unlock()
217 s.motd = content
218}
219
220// GetMotdContent returns the current MOTD content.
221func (s *Spindle) GetMotdContent() []byte {
222 s.motdMu.RLock()
223 defer s.motdMu.RUnlock()
224 return s.motd
225}
226
227// Start starts the Spindle server (blocking).
228func (s *Spindle) Start(ctx context.Context) error {
229 // starts a job queue runner in the background
230 s.jq.Start()
231 defer s.jq.Stop()
232
233 // Stop vault token renewal if it implements Stopper
234 if stopper, ok := s.vault.(secrets.Stopper); ok {
235 defer stopper.Stop()
236 }
237
238 go func() {
239 s.l.Info("starting knot event consumer")
240 s.ks.Start(ctx)
241 }()
242
243 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
244 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
245}
246
247func Run(ctx context.Context) error {
248 cfg, err := config.Load(ctx)
249 if err != nil {
250 return fmt.Errorf("failed to load config: %w", err)
251 }
252
253 nixeryEng, err := nixery.New(ctx, cfg)
254 if err != nil {
255 return err
256 }
257
258 s, err := New(ctx, cfg, map[string]models.Engine{
259 "nixery": nixeryEng,
260 })
261 if err != nil {
262 return err
263 }
264
265 return s.Start(ctx)
266}
267
268func (s *Spindle) Router() http.Handler {
269 mux := chi.NewRouter()
270
271 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
272 w.Write(s.GetMotdContent())
273 })
274 mux.HandleFunc("/events", s.Events)
275 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
276
277 mux.Mount("/xrpc", s.XrpcRouter())
278 return mux
279}
280
281func (s *Spindle) XrpcRouter() http.Handler {
282 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
283
284 l := log.SubLogger(s.l, "xrpc")
285
286 x := xrpc.Xrpc{
287 Logger: l,
288 Db: s.db,
289 Enforcer: s.e,
290 Engines: s.engs,
291 Config: s.cfg,
292 Resolver: s.res,
293 Vault: s.vault,
294 Notifier: s.Notifier(),
295 ServiceAuth: serviceAuth,
296 }
297
298 return x.Router()
299}
300
301func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
302 if msg.Nsid == tangled.PipelineNSID {
303 tpl := tangled.Pipeline{}
304 err := json.Unmarshal(msg.EventJson, &tpl)
305 if err != nil {
306 s.l.Error("failed to unmarshal pipeline event", "err", err)
307 return err
308 }
309
310 if tpl.TriggerMetadata == nil {
311 return fmt.Errorf("no trigger metadata found")
312 }
313
314 if tpl.TriggerMetadata.Repo == nil {
315 return fmt.Errorf("no repo data found")
316 }
317
318 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
319 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
320 }
321
322 // filter by repos
323 repoName := ""
324 if tpl.TriggerMetadata.Repo.Repo != nil {
325 repoName = *tpl.TriggerMetadata.Repo.Repo
326 }
327
328 _, err = s.db.GetRepo(
329 tpl.TriggerMetadata.Repo.Knot,
330 tpl.TriggerMetadata.Repo.Did,
331 repoName,
332 )
333 if err != nil {
334 return fmt.Errorf("failed to get repo: %w", err)
335 }
336
337 pipelineId := models.PipelineId{
338 Knot: src.Key(),
339 Rkey: msg.Rkey,
340 }
341
342 workflows := make(map[models.Engine][]models.Workflow)
343
344 // Build pipeline environment variables once for all workflows
345 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
346
347 for _, w := range tpl.Workflows {
348 if w != nil {
349 if _, ok := s.engs[w.Engine]; !ok {
350 err = s.db.StatusFailed(models.WorkflowId{
351 PipelineId: pipelineId,
352 Name: w.Name,
353 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
354 if err != nil {
355 return fmt.Errorf("db.StatusFailed: %w", err)
356 }
357
358 continue
359 }
360
361 eng := s.engs[w.Engine]
362
363 if _, ok := workflows[eng]; !ok {
364 workflows[eng] = []models.Workflow{}
365 }
366
367 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
368 if err != nil {
369 err = s.db.StatusFailed(models.WorkflowId{
370 PipelineId: pipelineId,
371 Name: w.Name,
372 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
373 if err != nil {
374 return fmt.Errorf("db.StatusFailed: %w", err)
375 }
376
377 continue
378 }
379
380 // inject TANGLED_* env vars after InitWorkflow
381 // This prevents user-defined env vars from overriding them
382 if ewf.Environment == nil {
383 ewf.Environment = make(map[string]string)
384 }
385 maps.Copy(ewf.Environment, pipelineEnv)
386
387 workflows[eng] = append(workflows[eng], *ewf)
388
389 err = s.db.StatusPending(models.WorkflowId{
390 PipelineId: pipelineId,
391 Name: w.Name,
392 }, s.n)
393 if err != nil {
394 return fmt.Errorf("db.StatusPending: %w", err)
395 }
396 }
397 }
398
399 ok := s.jq.Enqueue(queue.Job{
400 Run: func() error {
401 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{
402 RepoOwner: tpl.TriggerMetadata.Repo.Did,
403 RepoName: repoName,
404 Workflows: workflows,
405 }, pipelineId)
406 return nil
407 },
408 OnFail: func(jobError error) {
409 s.l.Error("pipeline run failed", "error", jobError)
410 },
411 })
412 if ok {
413 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
414 } else {
415 s.l.Error("failed to enqueue pipeline: queue is full")
416 }
417 }
418
419 return nil
420}
421
422func (s *Spindle) configureOwner() error {
423 cfgOwner := s.cfg.Server.Owner
424
425 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
426 if err != nil {
427 return err
428 }
429
430 switch len(existing) {
431 case 0:
432 // no owner configured, continue
433 case 1:
434 // find existing owner
435 existingOwner := existing[0]
436
437 // no ownership change, this is okay
438 if existingOwner == s.cfg.Server.Owner {
439 break
440 }
441
442 // remove existing owner
443 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
444 if err != nil {
445 return nil
446 }
447 default:
448 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
449 }
450
451 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
452}