Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/go-chi/chi/v5" 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/eventconsumer/cursor" 19 "tangled.org/core/eventstream" 20 "tangled.org/core/idresolver" 21 "tangled.org/core/jetstream" 22 "tangled.org/core/log" 23 "tangled.org/core/notifier" 24 "tangled.org/core/rbac" 25 "tangled.org/core/spindle/config" 26 "tangled.org/core/spindle/db" 27 "tangled.org/core/spindle/engine" 28 "tangled.org/core/spindle/engines/dummy" 29 "tangled.org/core/spindle/engines/microvm" 30 "tangled.org/core/spindle/engines/nixery" 31 "tangled.org/core/spindle/models" 32 "tangled.org/core/spindle/queue" 33 "tangled.org/core/spindle/secrets" 34 "tangled.org/core/spindle/xrpc" 35 "tangled.org/core/xrpc/serviceauth" 36) 37 38//go:embed motd 39var defaultMotd []byte 40 41const ( 42 rbacDomain = "thisserver" 43) 44 45type Spindle struct { 46 jc *jetstream.JetstreamClient 47 tap *Tap 48 embedTap *embeddedTap 49 db *db.DB 50 e *rbac.Enforcer 51 l *slog.Logger 52 n *notifier.Notifier 53 engs map[string]models.Engine 54 jq *queue.Queue 55 cfg *config.Config 56 ks *eventconsumer.Consumer 57 res *idresolver.Resolver 58 vault secrets.Manager 59 motd []byte 60 motdMu sync.RWMutex 61 rootCtx context.Context 62} 63 64// New creates a new Spindle server with the provided configuration and engines. 65func New(ctx context.Context, cfg *config.Config, d *db.DB, engines map[string]models.Engine) (*Spindle, error) { 66 logger := log.FromContext(ctx) 67 68 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 71 } 72 e.E.EnableAutoSave(true) 73 74 n := notifier.New() 75 76 var vault secrets.Manager 77 switch cfg.Server.Secrets.Provider { 78 case "openbao": 79 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 80 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 81 } 82 vault, err = secrets.NewOpenBaoManager( 83 cfg.Server.Secrets.OpenBao.ProxyAddr, 84 logger, 85 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 86 ) 87 if err != nil { 88 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 89 } 90 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 91 case "sqlite", "": 92 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 93 if err != nil { 94 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 95 } 96 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 97 default: 98 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 99 } 100 101 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil { 102 return nil, fmt.Errorf("failed to run startup migrations: %w", err) 103 } 104 105 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 106 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 107 108 collections := []string{ 109 tangled.SpindleMemberNSID, 110 tangled.RepoNSID, 111 tangled.RepoCollaboratorNSID, 112 } 113 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 114 if err != nil { 115 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 116 } 117 jc.AddDid(cfg.Server.Owner) 118 119 // Check if the spindle knows about any Dids; 120 dids, err := d.GetAllDids() 121 if err != nil { 122 return nil, fmt.Errorf("failed to get all dids: %w", err) 123 } 124 for _, d := range dids { 125 jc.AddDid(d) 126 } 127 128 knownRepos, err := d.AllRepos() 129 if err != nil { 130 return nil, fmt.Errorf("failed to get known repos: %w", err) 131 } 132 for _, r := range knownRepos { 133 if r.Owner != "" { 134 jc.AddDid(r.Owner.String()) 135 } 136 } 137 138 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 139 140 spindle := &Spindle{ 141 jc: jc, 142 e: e, 143 db: d, 144 l: logger, 145 n: &n, 146 engs: engines, 147 jq: jq, 148 cfg: cfg, 149 res: resolver, 150 vault: vault, 151 motd: defaultMotd, 152 rootCtx: ctx, 153 } 154 155 err = e.AddSpindle(rbacDomain) 156 if err != nil { 157 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 158 } 159 err = spindle.configureOwner() 160 if err != nil { 161 return nil, err 162 } 163 logger.Info("owner set", "did", cfg.Server.Owner) 164 165 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 166 if err != nil { 167 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 168 } 169 170 err = jc.StartJetstream(ctx, spindle.ingest()) 171 if err != nil { 172 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 173 } 174 175 // for each incoming sh.tangled.pipeline, we execute 176 // spindle.processPipeline, which in turn enqueues the pipeline 177 // job in the above registered queue. 178 ccfg := eventconsumer.NewConsumerConfig() 179 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 180 ccfg.ProcessFunc = spindle.processPipeline 181 ccfg.CursorStore = cursorStore 182 if cfg.Server.Dev { 183 ccfg.RetryInterval = 5 * time.Second 184 ccfg.MaxRetryInterval = 10 * time.Second 185 } else { 186 ccfg.RetryInterval = 1 * time.Minute 187 ccfg.MaxRetryInterval = 10 * time.Minute 188 } 189 knownKnots, err := d.Knots() 190 if err != nil { 191 return nil, err 192 } 193 for _, knot := range knownKnots { 194 logger.Info("adding source start", "knot", knot) 195 src := eventconsumer.NewKnotSource(knot) 196 eventconsumer.MigrateLegacyCursor(cursorStore, src) 197 ccfg.Sources[src] = struct{}{} 198 } 199 spindle.ks = eventconsumer.NewConsumer(*ccfg) 200 201 if cfg.Server.Tap.Embed { 202 pw, err := randomAdminPassword() 203 if err != nil { 204 return nil, err 205 } 206 cfg.Server.Tap.AdminPassword = pw 207 logger.Info("embedded tap: using random admin password") 208 } 209 spindle.tap = NewTapClient(spindle) 210 211 return spindle, nil 212} 213 214// DB returns the database instance. 215func (s *Spindle) DB() *db.DB { 216 return s.db 217} 218 219// Queue returns the job queue instance. 220func (s *Spindle) Queue() *queue.Queue { 221 return s.jq 222} 223 224// Engines returns the map of available engines. 225func (s *Spindle) Engines() map[string]models.Engine { 226 return s.engs 227} 228 229// Vault returns the secrets manager instance. 230func (s *Spindle) Vault() secrets.Manager { 231 return s.vault 232} 233 234// Notifier returns the notifier instance. 235func (s *Spindle) Notifier() *notifier.Notifier { 236 return s.n 237} 238 239// Enforcer returns the RBAC enforcer instance. 240func (s *Spindle) Enforcer() *rbac.Enforcer { 241 return s.e 242} 243 244// SetMotdContent sets custom MOTD content, replacing the embedded default. 245func (s *Spindle) SetMotdContent(content []byte) { 246 s.motdMu.Lock() 247 defer s.motdMu.Unlock() 248 s.motd = content 249} 250 251// GetMotdContent returns the current MOTD content. 252func (s *Spindle) GetMotdContent() []byte { 253 s.motdMu.RLock() 254 defer s.motdMu.RUnlock() 255 return s.motd 256} 257 258// Start starts the Spindle server (blocking). 259func (s *Spindle) Start(ctx context.Context) error { 260 // starts a job queue runner in the background 261 s.jq.Start() 262 defer s.jq.Stop() 263 264 // Stop vault token renewal if it implements Stopper 265 if stopper, ok := s.vault.(secrets.Stopper); ok { 266 defer stopper.Stop() 267 } 268 269 tapCtx, tapCancel := context.WithCancel(ctx) 270 271 if s.cfg.Server.Tap.Embed { 272 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap")) 273 if err != nil { 274 tapCancel() 275 return fmt.Errorf("starting embedded tap: %w", err) 276 } 277 s.embedTap = emb 278 defer func() { 279 tapCancel() 280 s.embedTap.Shutdown() 281 }() 282 283 go s.watchTapDrain(tapCtx, tapCancel) 284 } else { 285 defer tapCancel() 286 } 287 288 go func() { 289 s.l.Info("starting knot event consumer") 290 s.ks.Start(ctx) 291 }() 292 293 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 294 s.tap.Start(tapCtx) 295 296 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 297 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 298} 299 300func (s *Spindle) declareTapInterest(ctx context.Context) { 301 repos, err := s.db.AllRepos() 302 if err != nil { 303 s.l.Warn("tap declare: failed to load known repos", "err", err) 304 return 305 } 306 seen := make(map[syntax.DID]struct{}, len(repos)) 307 dids := make([]syntax.DID, 0, len(repos)) 308 for _, r := range repos { 309 if r.Owner == "" { 310 continue 311 } 312 if _, ok := seen[r.Owner]; ok { 313 continue 314 } 315 seen[r.Owner] = struct{}{} 316 dids = append(dids, r.Owner) 317 } 318 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil { 319 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err) 320 return 321 } 322 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids)) 323} 324 325func Run(ctx context.Context) error { 326 cfg, err := config.Load(ctx) 327 if err != nil { 328 return fmt.Errorf("failed to load config: %w", err) 329 } 330 331 d, err := db.Make(ctx, cfg.Server.DBPath) 332 if err != nil { 333 return fmt.Errorf("failed to setup db: %w", err) 334 } 335 336 nixeryEng, err := nixery.New(ctx, cfg) 337 if err != nil { 338 return err 339 } 340 341 microvmEng, err := microvm.New(ctx, cfg, d) 342 if err != nil { 343 return err 344 } 345 346 s, err := New(ctx, cfg, d, map[string]models.Engine{ 347 "nixery": nixeryEng, 348 "microvm": microvmEng, 349 "dummy": dummy.New(log.FromContext(ctx)), 350 }) 351 if err != nil { 352 return err 353 } 354 355 return s.Start(ctx) 356} 357 358func (s *Spindle) Router() http.Handler { 359 mux := chi.NewRouter() 360 361 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 362 w.Write(s.GetMotdContent()) 363 }) 364 mux.HandleFunc("/events", s.Events) 365 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 366 367 mux.Mount("/xrpc", s.XrpcRouter()) 368 return mux 369} 370 371func (s *Spindle) XrpcRouter() http.Handler { 372 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res.Directory(), s.cfg.Server.Did().String()) 373 374 l := log.SubLogger(s.l, "xrpc") 375 376 x := xrpc.Xrpc{ 377 Logger: l, 378 Db: s.db, 379 Enforcer: s.e, 380 Engines: s.engs, 381 Config: s.cfg, 382 Resolver: s.res, 383 Vault: s.vault, 384 Notifier: s.Notifier(), 385 ServiceAuth: serviceAuth, 386 } 387 388 return x.Router() 389} 390 391func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 392 if msg.Nsid == tangled.PipelineNSID { 393 tpl := tangled.Pipeline{} 394 err := json.Unmarshal(msg.EventJson, &tpl) 395 if err != nil { 396 s.l.Error("failed to unmarshal pipeline event", "err", err) 397 return err 398 } 399 400 if tpl.TriggerMetadata == nil { 401 return fmt.Errorf("no trigger metadata found") 402 } 403 404 if tpl.TriggerMetadata.Repo == nil { 405 return fmt.Errorf("no repo data found") 406 } 407 408 if src.Host != tpl.TriggerMetadata.Repo.Knot { 409 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot) 410 } 411 412 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 413 if err != nil { 414 return err 415 } 416 417 pipelineId := models.PipelineId{ 418 Knot: src.Host, 419 Rkey: msg.Rkey, 420 } 421 422 workflows := make(map[models.Engine][]models.Workflow) 423 424 // Build pipeline environment variables once for all workflows 425 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId) 426 427 for _, w := range tpl.Workflows { 428 if w != nil { 429 if _, ok := s.engs[w.Engine]; !ok { 430 err = s.db.StatusFailed(models.WorkflowId{ 431 PipelineId: pipelineId, 432 Name: w.Name, 433 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 434 if err != nil { 435 return fmt.Errorf("db.StatusFailed: %w", err) 436 } 437 438 continue 439 } 440 441 eng := s.engs[w.Engine] 442 443 if _, ok := workflows[eng]; !ok { 444 workflows[eng] = []models.Workflow{} 445 } 446 447 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 448 if err != nil { 449 err = s.db.StatusFailed(models.WorkflowId{ 450 PipelineId: pipelineId, 451 Name: w.Name, 452 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 453 if err != nil { 454 return fmt.Errorf("db.StatusFailed: %w", err) 455 } 456 457 continue 458 } 459 460 // inject TANGLED_* env vars after InitWorkflow 461 // This prevents user-defined env vars from overriding them 462 if ewf.Environment == nil { 463 ewf.Environment = make(map[string]string) 464 } 465 maps.Copy(ewf.Environment, pipelineEnv) 466 467 workflows[eng] = append(workflows[eng], *ewf) 468 469 err = s.db.StatusPending(models.WorkflowId{ 470 PipelineId: pipelineId, 471 Name: w.Name, 472 }, s.n) 473 if err != nil { 474 return fmt.Errorf("db.StatusPending: %w", err) 475 } 476 } 477 } 478 479 ok := s.jq.Enqueue(repoDid, queue.Job{ 480 Run: func() error { 481 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 482 RepoDid: repoDid, 483 Workflows: workflows, 484 }, pipelineId) 485 return nil 486 }, 487 OnFail: func(jobError error) { 488 s.l.Error("pipeline run failed", "error", jobError) 489 }, 490 }) 491 if ok { 492 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 493 } else { 494 s.l.Error("failed to enqueue pipeline: queue is full") 495 } 496 } 497 498 return nil 499} 500 501func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) { 502 if repo.RepoDid == nil || *repo.RepoDid == "" { 503 return "", fmt.Errorf("pipeline trigger missing repoDid") 504 } 505 repoDid, err := syntax.ParseDID(*repo.RepoDid) 506 if err != nil { 507 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err) 508 } 509 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 510 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 511 } 512 return repoDid, nil 513} 514 515func (s *Spindle) configureOwner() error { 516 cfgOwner := s.cfg.Server.Owner 517 518 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 519 if err != nil { 520 return err 521 } 522 523 switch len(existing) { 524 case 0: 525 // no owner configured, continue 526 case 1: 527 // find existing owner 528 existingOwner := existing[0] 529 530 // no ownership change, this is okay 531 if existingOwner == s.cfg.Server.Owner { 532 break 533 } 534 535 // remove existing owner 536 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 537 if err != nil { 538 return nil 539 } 540 default: 541 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 542 } 543 544 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 545}