Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "path/filepath"
12 "sync"
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/go-chi/chi/v5"
17 "github.com/hashicorp/go-version"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/eventconsumer"
20 "tangled.org/core/eventconsumer/cursor"
21 "tangled.org/core/eventstream"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/jetstream"
24 "tangled.org/core/log"
25 "tangled.org/core/notifier"
26 "tangled.org/core/rbac"
27 "tangled.org/core/spindle/config"
28 "tangled.org/core/spindle/db"
29 "tangled.org/core/spindle/engine"
30 "tangled.org/core/spindle/engines/dummy"
31 "tangled.org/core/spindle/engines/microvm"
32 "tangled.org/core/spindle/engines/nixery"
33 "tangled.org/core/spindle/git"
34 "tangled.org/core/spindle/models"
35 "tangled.org/core/spindle/queue"
36 "tangled.org/core/spindle/secrets"
37 "tangled.org/core/spindle/xrpc"
38 "tangled.org/core/xrpc/serviceauth"
39)
40
41//go:embed motd
42var defaultMotd []byte
43
44const (
45 rbacDomain = "thisserver"
46)
47
48type Spindle struct {
49 jc *jetstream.JetstreamClient
50 tap *Tap
51 embedTap *embeddedTap
52 db *db.DB
53 e *rbac.Enforcer
54 l *slog.Logger
55 n *notifier.Notifier
56 engs map[string]models.Engine
57 jq *queue.Queue
58 cfg *config.Config
59 ks *eventconsumer.Consumer
60 res *idresolver.Resolver
61 vault secrets.Manager
62 motd []byte
63 motdMu sync.RWMutex
64 rootCtx context.Context
65}
66
67// New creates a new Spindle server with the provided configuration and engines.
68func New(ctx context.Context, cfg *config.Config, d *db.DB, engines map[string]models.Engine) (*Spindle, error) {
69 logger := log.FromContext(ctx)
70
71 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
74 }
75 e.E.EnableAutoSave(true)
76
77 n := notifier.New()
78
79 var vault secrets.Manager
80 switch cfg.Server.Secrets.Provider {
81 case "openbao":
82 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
83 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
84 }
85 vault, err = secrets.NewOpenBaoManager(
86 cfg.Server.Secrets.OpenBao.ProxyAddr,
87 logger,
88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
89 )
90 if err != nil {
91 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
92 }
93 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
94 case "sqlite", "":
95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
96 if err != nil {
97 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
98 }
99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
100 default:
101 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
102 }
103
104 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil {
105 return nil, fmt.Errorf("failed to run startup migrations: %w", err)
106 }
107
108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
110
111 collections := []string{
112 tangled.SpindleMemberNSID,
113 tangled.RepoNSID,
114 tangled.RepoCollaboratorNSID,
115 }
116 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
117 if err != nil {
118 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
119 }
120 jc.AddDid(cfg.Server.Owner)
121
122 // Check if the spindle knows about any Dids;
123 dids, err := d.GetAllDids()
124 if err != nil {
125 return nil, fmt.Errorf("failed to get all dids: %w", err)
126 }
127 for _, d := range dids {
128 jc.AddDid(d)
129 }
130
131 knownRepos, err := d.AllRepos()
132 if err != nil {
133 return nil, fmt.Errorf("failed to get known repos: %w", err)
134 }
135 for _, r := range knownRepos {
136 if r.Owner != "" {
137 jc.AddDid(r.Owner.String())
138 }
139 }
140
141 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
142
143 spindle := &Spindle{
144 jc: jc,
145 e: e,
146 db: d,
147 l: logger,
148 n: &n,
149 engs: engines,
150 jq: jq,
151 cfg: cfg,
152 res: resolver,
153 vault: vault,
154 motd: defaultMotd,
155 rootCtx: ctx,
156 }
157
158 err = e.AddSpindle(rbacDomain)
159 if err != nil {
160 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
161 }
162 err = spindle.configureOwner()
163 if err != nil {
164 return nil, err
165 }
166 logger.Info("owner set", "did", cfg.Server.Owner)
167
168 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
169 if err != nil {
170 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
171 }
172
173 err = jc.StartJetstream(ctx, spindle.ingest())
174 if err != nil {
175 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
176 }
177
178 // for each incoming sh.tangled.pipeline, we execute
179 // spindle.processPipeline, which in turn enqueues the pipeline
180 // job in the above registered queue.
181 ccfg := eventconsumer.NewConsumerConfig()
182 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
183 ccfg.ProcessFunc = spindle.processPipeline
184 ccfg.CursorStore = cursorStore
185 if cfg.Server.Dev {
186 ccfg.RetryInterval = 5 * time.Second
187 ccfg.MaxRetryInterval = 10 * time.Second
188 } else {
189 ccfg.RetryInterval = 1 * time.Minute
190 ccfg.MaxRetryInterval = 10 * time.Minute
191 }
192 knownKnots, err := d.Knots()
193 if err != nil {
194 return nil, err
195 }
196 for _, knot := range knownKnots {
197 logger.Info("adding source start", "knot", knot)
198 src := eventconsumer.NewKnotSource(knot)
199 eventconsumer.MigrateLegacyCursor(cursorStore, src)
200 ccfg.Sources[src] = struct{}{}
201 }
202 spindle.ks = eventconsumer.NewConsumer(*ccfg)
203
204 if cfg.Server.Tap.Embed {
205 pw, err := randomAdminPassword()
206 if err != nil {
207 return nil, err
208 }
209 cfg.Server.Tap.AdminPassword = pw
210 logger.Info("embedded tap: using random admin password")
211 }
212 spindle.tap = NewTapClient(spindle)
213
214 return spindle, nil
215}
216
217// DB returns the database instance.
218func (s *Spindle) DB() *db.DB {
219 return s.db
220}
221
222// Queue returns the job queue instance.
223func (s *Spindle) Queue() *queue.Queue {
224 return s.jq
225}
226
227// Engines returns the map of available engines.
228func (s *Spindle) Engines() map[string]models.Engine {
229 return s.engs
230}
231
232// Vault returns the secrets manager instance.
233func (s *Spindle) Vault() secrets.Manager {
234 return s.vault
235}
236
237// Notifier returns the notifier instance.
238func (s *Spindle) Notifier() *notifier.Notifier {
239 return s.n
240}
241
242// Enforcer returns the RBAC enforcer instance.
243func (s *Spindle) Enforcer() *rbac.Enforcer {
244 return s.e
245}
246
247// SetMotdContent sets custom MOTD content, replacing the embedded default.
248func (s *Spindle) SetMotdContent(content []byte) {
249 s.motdMu.Lock()
250 defer s.motdMu.Unlock()
251 s.motd = content
252}
253
254// GetMotdContent returns the current MOTD content.
255func (s *Spindle) GetMotdContent() []byte {
256 s.motdMu.RLock()
257 defer s.motdMu.RUnlock()
258 return s.motd
259}
260
261// Start starts the Spindle server (blocking).
262func (s *Spindle) Start(ctx context.Context) error {
263 // starts a job queue runner in the background
264 s.jq.Start()
265 defer s.jq.Stop()
266
267 // Stop vault token renewal if it implements Stopper
268 if stopper, ok := s.vault.(secrets.Stopper); ok {
269 defer stopper.Stop()
270 }
271
272 tapCtx, tapCancel := context.WithCancel(ctx)
273
274 if s.cfg.Server.Tap.Embed {
275 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap"))
276 if err != nil {
277 tapCancel()
278 return fmt.Errorf("starting embedded tap: %w", err)
279 }
280 s.embedTap = emb
281 defer func() {
282 tapCancel()
283 s.embedTap.Shutdown()
284 }()
285
286 go s.watchTapDrain(tapCtx, tapCancel)
287 } else {
288 defer tapCancel()
289 }
290
291 go func() {
292 s.l.Info("starting knot event consumer")
293 s.ks.Start(ctx)
294 }()
295
296 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url)
297 s.tap.Start(tapCtx)
298
299 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
300 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
301}
302
303func (s *Spindle) declareTapInterest(ctx context.Context) {
304 repos, err := s.db.AllRepos()
305 if err != nil {
306 s.l.Warn("tap declare: failed to load known repos", "err", err)
307 return
308 }
309 seen := make(map[syntax.DID]struct{}, len(repos))
310 dids := make([]syntax.DID, 0, len(repos))
311 for _, r := range repos {
312 if r.Owner == "" {
313 continue
314 }
315 if _, ok := seen[r.Owner]; ok {
316 continue
317 }
318 seen[r.Owner] = struct{}{}
319 dids = append(dids, r.Owner)
320 }
321 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil {
322 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err)
323 return
324 }
325 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids))
326}
327
328func Run(ctx context.Context) error {
329 cfg, err := config.Load(ctx)
330 if err != nil {
331 return fmt.Errorf("failed to load config: %w", err)
332 }
333
334 if err := ensureGitVersion(); err != nil {
335 return fmt.Errorf("ensuring git version: %w", err)
336 }
337
338 d, err := db.Make(ctx, cfg.Server.DBPath)
339 if err != nil {
340 return fmt.Errorf("failed to setup db: %w", err)
341 }
342
343 nixeryEng, err := nixery.New(ctx, cfg)
344 if err != nil {
345 return err
346 }
347
348 microvmEng, err := microvm.New(ctx, cfg, d)
349 if err != nil {
350 return err
351 }
352
353 s, err := New(ctx, cfg, d, map[string]models.Engine{
354 "nixery": nixeryEng,
355 "microvm": microvmEng,
356 "dummy": dummy.New(log.FromContext(ctx)),
357 })
358 if err != nil {
359 return err
360 }
361
362 return s.Start(ctx)
363}
364
365func (s *Spindle) Router() http.Handler {
366 mux := chi.NewRouter()
367
368 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
369 w.Write(s.GetMotdContent())
370 })
371 mux.HandleFunc("/events", s.Events)
372 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
373
374 mux.Mount("/xrpc", s.XrpcRouter())
375 return mux
376}
377
378func (s *Spindle) XrpcRouter() http.Handler {
379 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res.Directory(), s.cfg.Server.Did().String())
380
381 l := log.SubLogger(s.l, "xrpc")
382
383 x := xrpc.Xrpc{
384 Logger: l,
385 Db: s.db,
386 Enforcer: s.e,
387 Engines: s.engs,
388 Config: s.cfg,
389 Resolver: s.res,
390 Vault: s.vault,
391 Notifier: s.Notifier(),
392 ServiceAuth: serviceAuth,
393 }
394
395 return x.Router()
396}
397
398func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error {
399 l := log.FromContext(ctx).With("handler", "processKnotStream")
400 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
401 if msg.Nsid == tangled.PipelineNSID {
402 return nil
403 tpl := tangled.Pipeline{}
404 err := json.Unmarshal(msg.EventJson, &tpl)
405 if err != nil {
406 s.l.Error("failed to unmarshal pipeline event", "err", err)
407 return err
408 }
409
410 if tpl.TriggerMetadata == nil {
411 return fmt.Errorf("no trigger metadata found")
412 }
413
414 if tpl.TriggerMetadata.Repo == nil {
415 return fmt.Errorf("no repo data found")
416 }
417
418 if src.Host != tpl.TriggerMetadata.Repo.Knot {
419 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot)
420 }
421
422 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo)
423 if err != nil {
424 return err
425 }
426
427 pipelineId := models.PipelineId{
428 Knot: src.Host,
429 Rkey: msg.Rkey,
430 }
431
432 workflows := make(map[models.Engine][]models.Workflow)
433
434 // Build pipeline environment variables once for all workflows
435 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId)
436
437 for _, w := range tpl.Workflows {
438 if w != nil {
439 if _, ok := s.engs[w.Engine]; !ok {
440 err = s.db.StatusFailed(models.WorkflowId{
441 PipelineId: pipelineId,
442 Name: w.Name,
443 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
444 if err != nil {
445 return fmt.Errorf("db.StatusFailed: %w", err)
446 }
447
448 continue
449 }
450
451 eng := s.engs[w.Engine]
452
453 if _, ok := workflows[eng]; !ok {
454 workflows[eng] = []models.Workflow{}
455 }
456
457 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
458 if err != nil {
459 err = s.db.StatusFailed(models.WorkflowId{
460 PipelineId: pipelineId,
461 Name: w.Name,
462 }, fmt.Sprintf("init workflow: %s", err), -1, s.n)
463 if err != nil {
464 return fmt.Errorf("db.StatusFailed: %w", err)
465 }
466
467 continue
468 }
469
470 // inject TANGLED_* env vars after InitWorkflow
471 // This prevents user-defined env vars from overriding them
472 if ewf.Environment == nil {
473 ewf.Environment = make(map[string]string)
474 }
475 maps.Copy(ewf.Environment, pipelineEnv)
476
477 workflows[eng] = append(workflows[eng], *ewf)
478
479 err = s.db.StatusPending(models.WorkflowId{
480 PipelineId: pipelineId,
481 Name: w.Name,
482 }, s.n)
483 if err != nil {
484 return fmt.Errorf("db.StatusPending: %w", err)
485 }
486 }
487 }
488
489 ok := s.jq.Enqueue(repoDid, queue.Job{
490 Run: func() error {
491 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
492 RepoDid: repoDid,
493 Workflows: workflows,
494 }, pipelineId)
495 return nil
496 },
497 OnFail: func(jobError error) {
498 s.l.Error("pipeline run failed", "error", jobError)
499 },
500 })
501 if ok {
502 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
503 } else {
504 s.l.Error("failed to enqueue pipeline: queue is full")
505 }
506 } else if msg.Nsid == tangled.GitRefUpdateNSID {
507 event := tangled.GitRefUpdate{}
508 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
509 l.Error("error unmarshalling", "err", err)
510 return err
511 }
512 l = l.With("repo", event.Repo, "ref", event.Ref, "newSha", event.NewSha)
513 l.Debug("debug")
514
515 repoDid := syntax.DID(event.Repo)
516 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
517 return fmt.Errorf("unknown repoDid %s: %w", repoDid, err)
518 }
519
520 // NOTE: we are blindly trusting the knot that it will return only repos it own
521 repoCloneUri := s.newRepoCloneUrl(src.Key(), syntax.DID(event.Repo))
522 repoPath := s.newRepoPath(syntax.DID(event.Repo))
523 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
524 return fmt.Errorf("sync git repo: %w", err)
525 }
526 l.Info("synced git repo")
527
528 // TODO: plan the pipeline
529 }
530
531 return nil
532}
533
534// newRepoPath creates a path to store repository by its did and rkey.
535// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
536func (s *Spindle) newRepoPath(repo syntax.DID) string {
537 return filepath.Join(s.cfg.Server.RepoDir, repo.String())
538}
539
540func (s *Spindle) newRepoCloneUrl(knot string, did syntax.DID) string {
541 scheme := "https://"
542 if s.cfg.Server.Dev {
543 scheme = "http://"
544 }
545 return fmt.Sprintf("%s%s/%s", scheme, knot, did)
546}
547
548const RequiredVersion = "2.49.0"
549
550func ensureGitVersion() error {
551 v, err := git.Version()
552 if err != nil {
553 return fmt.Errorf("fetching git version: %w", err)
554 }
555 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
556 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
557 }
558 return nil
559}
560
561func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) {
562 if repo.RepoDid == nil || *repo.RepoDid == "" {
563 return "", fmt.Errorf("pipeline trigger missing repoDid")
564 }
565 repoDid, err := syntax.ParseDID(*repo.RepoDid)
566 if err != nil {
567 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err)
568 }
569 if _, err := s.db.GetRepoByDid(repoDid); err != nil {
570 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err)
571 }
572 return repoDid, nil
573}
574
575func (s *Spindle) configureOwner() error {
576 cfgOwner := s.cfg.Server.Owner
577
578 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
579 if err != nil {
580 return err
581 }
582
583 switch len(existing) {
584 case 0:
585 // no owner configured, continue
586 case 1:
587 // find existing owner
588 existingOwner := existing[0]
589
590 // no ownership change, this is okay
591 if existingOwner == s.cfg.Server.Owner {
592 break
593 }
594
595 // remove existing owner
596 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
597 if err != nil {
598 return nil
599 }
600 default:
601 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
602 }
603
604 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
605}