Monorepo for Tangled tangled.org
10

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "path/filepath" 12 "sync" 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/go-chi/chi/v5" 17 "github.com/hashicorp/go-version" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/eventconsumer" 20 "tangled.org/core/eventconsumer/cursor" 21 "tangled.org/core/eventstream" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/jetstream" 24 "tangled.org/core/log" 25 "tangled.org/core/notifier" 26 "tangled.org/core/rbac" 27 "tangled.org/core/spindle/config" 28 "tangled.org/core/spindle/db" 29 "tangled.org/core/spindle/engine" 30 "tangled.org/core/spindle/engines/dummy" 31 "tangled.org/core/spindle/engines/microvm" 32 "tangled.org/core/spindle/engines/nixery" 33 "tangled.org/core/spindle/git" 34 "tangled.org/core/spindle/models" 35 "tangled.org/core/spindle/queue" 36 "tangled.org/core/spindle/secrets" 37 "tangled.org/core/spindle/xrpc" 38 "tangled.org/core/xrpc/serviceauth" 39) 40 41//go:embed motd 42var defaultMotd []byte 43 44const ( 45 rbacDomain = "thisserver" 46) 47 48type Spindle struct { 49 jc *jetstream.JetstreamClient 50 tap *Tap 51 embedTap *embeddedTap 52 db *db.DB 53 e *rbac.Enforcer 54 l *slog.Logger 55 n *notifier.Notifier 56 engs map[string]models.Engine 57 jq *queue.Queue 58 cfg *config.Config 59 ks *eventconsumer.Consumer 60 res *idresolver.Resolver 61 vault secrets.Manager 62 motd []byte 63 motdMu sync.RWMutex 64 rootCtx context.Context 65} 66 67// New creates a new Spindle server with the provided configuration and engines. 68func New(ctx context.Context, cfg *config.Config, d *db.DB, engines map[string]models.Engine) (*Spindle, error) { 69 logger := log.FromContext(ctx) 70 71 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 74 } 75 e.E.EnableAutoSave(true) 76 77 n := notifier.New() 78 79 var vault secrets.Manager 80 switch cfg.Server.Secrets.Provider { 81 case "openbao": 82 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 83 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 84 } 85 vault, err = secrets.NewOpenBaoManager( 86 cfg.Server.Secrets.OpenBao.ProxyAddr, 87 logger, 88 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 89 ) 90 if err != nil { 91 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 92 } 93 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 94 case "sqlite", "": 95 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 96 if err != nil { 97 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 98 } 99 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 100 default: 101 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 102 } 103 104 if err := runStartupMigrations(ctx, d, cfg.Server.Tap.Embed, cfg.Server.Tap.DBPath, logger); err != nil { 105 return nil, fmt.Errorf("failed to run startup migrations: %w", err) 106 } 107 108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 110 111 collections := []string{ 112 tangled.SpindleMemberNSID, 113 tangled.RepoNSID, 114 tangled.RepoCollaboratorNSID, 115 } 116 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 117 if err != nil { 118 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 119 } 120 jc.AddDid(cfg.Server.Owner) 121 122 // Check if the spindle knows about any Dids; 123 dids, err := d.GetAllDids() 124 if err != nil { 125 return nil, fmt.Errorf("failed to get all dids: %w", err) 126 } 127 for _, d := range dids { 128 jc.AddDid(d) 129 } 130 131 knownRepos, err := d.AllRepos() 132 if err != nil { 133 return nil, fmt.Errorf("failed to get known repos: %w", err) 134 } 135 for _, r := range knownRepos { 136 if r.Owner != "" { 137 jc.AddDid(r.Owner.String()) 138 } 139 } 140 141 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 142 143 spindle := &Spindle{ 144 jc: jc, 145 e: e, 146 db: d, 147 l: logger, 148 n: &n, 149 engs: engines, 150 jq: jq, 151 cfg: cfg, 152 res: resolver, 153 vault: vault, 154 motd: defaultMotd, 155 rootCtx: ctx, 156 } 157 158 err = e.AddSpindle(rbacDomain) 159 if err != nil { 160 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 161 } 162 err = spindle.configureOwner() 163 if err != nil { 164 return nil, err 165 } 166 logger.Info("owner set", "did", cfg.Server.Owner) 167 168 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 169 if err != nil { 170 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 171 } 172 173 err = jc.StartJetstream(ctx, spindle.ingest()) 174 if err != nil { 175 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 176 } 177 178 // for each incoming sh.tangled.pipeline, we execute 179 // spindle.processPipeline, which in turn enqueues the pipeline 180 // job in the above registered queue. 181 ccfg := eventconsumer.NewConsumerConfig() 182 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 183 ccfg.ProcessFunc = spindle.processPipeline 184 ccfg.CursorStore = cursorStore 185 if cfg.Server.Dev { 186 ccfg.RetryInterval = 5 * time.Second 187 ccfg.MaxRetryInterval = 10 * time.Second 188 } else { 189 ccfg.RetryInterval = 1 * time.Minute 190 ccfg.MaxRetryInterval = 10 * time.Minute 191 } 192 knownKnots, err := d.Knots() 193 if err != nil { 194 return nil, err 195 } 196 for _, knot := range knownKnots { 197 logger.Info("adding source start", "knot", knot) 198 src := eventconsumer.NewKnotSource(knot) 199 eventconsumer.MigrateLegacyCursor(cursorStore, src) 200 ccfg.Sources[src] = struct{}{} 201 } 202 spindle.ks = eventconsumer.NewConsumer(*ccfg) 203 204 if cfg.Server.Tap.Embed { 205 pw, err := randomAdminPassword() 206 if err != nil { 207 return nil, err 208 } 209 cfg.Server.Tap.AdminPassword = pw 210 logger.Info("embedded tap: using random admin password") 211 } 212 spindle.tap = NewTapClient(spindle) 213 214 return spindle, nil 215} 216 217// DB returns the database instance. 218func (s *Spindle) DB() *db.DB { 219 return s.db 220} 221 222// Queue returns the job queue instance. 223func (s *Spindle) Queue() *queue.Queue { 224 return s.jq 225} 226 227// Engines returns the map of available engines. 228func (s *Spindle) Engines() map[string]models.Engine { 229 return s.engs 230} 231 232// Vault returns the secrets manager instance. 233func (s *Spindle) Vault() secrets.Manager { 234 return s.vault 235} 236 237// Notifier returns the notifier instance. 238func (s *Spindle) Notifier() *notifier.Notifier { 239 return s.n 240} 241 242// Enforcer returns the RBAC enforcer instance. 243func (s *Spindle) Enforcer() *rbac.Enforcer { 244 return s.e 245} 246 247// SetMotdContent sets custom MOTD content, replacing the embedded default. 248func (s *Spindle) SetMotdContent(content []byte) { 249 s.motdMu.Lock() 250 defer s.motdMu.Unlock() 251 s.motd = content 252} 253 254// GetMotdContent returns the current MOTD content. 255func (s *Spindle) GetMotdContent() []byte { 256 s.motdMu.RLock() 257 defer s.motdMu.RUnlock() 258 return s.motd 259} 260 261// Start starts the Spindle server (blocking). 262func (s *Spindle) Start(ctx context.Context) error { 263 // starts a job queue runner in the background 264 s.jq.Start() 265 defer s.jq.Stop() 266 267 // Stop vault token renewal if it implements Stopper 268 if stopper, ok := s.vault.(secrets.Stopper); ok { 269 defer stopper.Stop() 270 } 271 272 tapCtx, tapCancel := context.WithCancel(ctx) 273 274 if s.cfg.Server.Tap.Embed { 275 emb, err := startEmbeddedTap(tapCtx, s.cfg, log.SubLogger(s.l, "embedtap")) 276 if err != nil { 277 tapCancel() 278 return fmt.Errorf("starting embedded tap: %w", err) 279 } 280 s.embedTap = emb 281 defer func() { 282 tapCancel() 283 s.embedTap.Shutdown() 284 }() 285 286 go s.watchTapDrain(tapCtx, tapCancel) 287 } else { 288 defer tapCancel() 289 } 290 291 go func() { 292 s.l.Info("starting knot event consumer") 293 s.ks.Start(ctx) 294 }() 295 296 s.l.Info("starting tap client", "url", s.cfg.Server.Tap.Url) 297 s.tap.Start(tapCtx) 298 299 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 300 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 301} 302 303func (s *Spindle) declareTapInterest(ctx context.Context) { 304 repos, err := s.db.AllRepos() 305 if err != nil { 306 s.l.Warn("tap declare: failed to load known repos", "err", err) 307 return 308 } 309 seen := make(map[syntax.DID]struct{}, len(repos)) 310 dids := make([]syntax.DID, 0, len(repos)) 311 for _, r := range repos { 312 if r.Owner == "" { 313 continue 314 } 315 if _, ok := seen[r.Owner]; ok { 316 continue 317 } 318 seen[r.Owner] = struct{}{} 319 dids = append(dids, r.Owner) 320 } 321 if err := s.tap.AddOwnerDIDs(ctx, dids); err != nil { 322 s.l.Warn("tap declare: AddRepos rejected", "count", len(dids), "err", err) 323 return 324 } 325 s.l.Info("tap declare: known owner DIDs registered", "count", len(dids)) 326} 327 328func Run(ctx context.Context) error { 329 cfg, err := config.Load(ctx) 330 if err != nil { 331 return fmt.Errorf("failed to load config: %w", err) 332 } 333 334 if err := ensureGitVersion(); err != nil { 335 return fmt.Errorf("ensuring git version: %w", err) 336 } 337 338 d, err := db.Make(ctx, cfg.Server.DBPath) 339 if err != nil { 340 return fmt.Errorf("failed to setup db: %w", err) 341 } 342 343 nixeryEng, err := nixery.New(ctx, cfg) 344 if err != nil { 345 return err 346 } 347 348 microvmEng, err := microvm.New(ctx, cfg, d) 349 if err != nil { 350 return err 351 } 352 353 s, err := New(ctx, cfg, d, map[string]models.Engine{ 354 "nixery": nixeryEng, 355 "microvm": microvmEng, 356 "dummy": dummy.New(log.FromContext(ctx)), 357 }) 358 if err != nil { 359 return err 360 } 361 362 return s.Start(ctx) 363} 364 365func (s *Spindle) Router() http.Handler { 366 mux := chi.NewRouter() 367 368 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 369 w.Write(s.GetMotdContent()) 370 }) 371 mux.HandleFunc("/events", s.Events) 372 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 373 374 mux.Mount("/xrpc", s.XrpcRouter()) 375 return mux 376} 377 378func (s *Spindle) XrpcRouter() http.Handler { 379 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res.Directory(), s.cfg.Server.Did().String()) 380 381 l := log.SubLogger(s.l, "xrpc") 382 383 x := xrpc.Xrpc{ 384 Logger: l, 385 Db: s.db, 386 Enforcer: s.e, 387 Engines: s.engs, 388 Config: s.cfg, 389 Resolver: s.res, 390 Vault: s.vault, 391 Notifier: s.Notifier(), 392 ServiceAuth: serviceAuth, 393 } 394 395 return x.Router() 396} 397 398func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 399 l := log.FromContext(ctx).With("handler", "processKnotStream") 400 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 401 if msg.Nsid == tangled.PipelineNSID { 402 return nil 403 tpl := tangled.Pipeline{} 404 err := json.Unmarshal(msg.EventJson, &tpl) 405 if err != nil { 406 s.l.Error("failed to unmarshal pipeline event", "err", err) 407 return err 408 } 409 410 if tpl.TriggerMetadata == nil { 411 return fmt.Errorf("no trigger metadata found") 412 } 413 414 if tpl.TriggerMetadata.Repo == nil { 415 return fmt.Errorf("no repo data found") 416 } 417 418 if src.Host != tpl.TriggerMetadata.Repo.Knot { 419 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot) 420 } 421 422 repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 423 if err != nil { 424 return err 425 } 426 427 pipelineId := models.PipelineId{ 428 Knot: src.Host, 429 Rkey: msg.Rkey, 430 } 431 432 workflows := make(map[models.Engine][]models.Workflow) 433 434 // Build pipeline environment variables once for all workflows 435 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId) 436 437 for _, w := range tpl.Workflows { 438 if w != nil { 439 if _, ok := s.engs[w.Engine]; !ok { 440 err = s.db.StatusFailed(models.WorkflowId{ 441 PipelineId: pipelineId, 442 Name: w.Name, 443 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 444 if err != nil { 445 return fmt.Errorf("db.StatusFailed: %w", err) 446 } 447 448 continue 449 } 450 451 eng := s.engs[w.Engine] 452 453 if _, ok := workflows[eng]; !ok { 454 workflows[eng] = []models.Workflow{} 455 } 456 457 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 458 if err != nil { 459 err = s.db.StatusFailed(models.WorkflowId{ 460 PipelineId: pipelineId, 461 Name: w.Name, 462 }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 463 if err != nil { 464 return fmt.Errorf("db.StatusFailed: %w", err) 465 } 466 467 continue 468 } 469 470 // inject TANGLED_* env vars after InitWorkflow 471 // This prevents user-defined env vars from overriding them 472 if ewf.Environment == nil { 473 ewf.Environment = make(map[string]string) 474 } 475 maps.Copy(ewf.Environment, pipelineEnv) 476 477 workflows[eng] = append(workflows[eng], *ewf) 478 479 err = s.db.StatusPending(models.WorkflowId{ 480 PipelineId: pipelineId, 481 Name: w.Name, 482 }, s.n) 483 if err != nil { 484 return fmt.Errorf("db.StatusPending: %w", err) 485 } 486 } 487 } 488 489 ok := s.jq.Enqueue(repoDid, queue.Job{ 490 Run: func() error { 491 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 492 RepoDid: repoDid, 493 Workflows: workflows, 494 }, pipelineId) 495 return nil 496 }, 497 OnFail: func(jobError error) { 498 s.l.Error("pipeline run failed", "error", jobError) 499 }, 500 }) 501 if ok { 502 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 503 } else { 504 s.l.Error("failed to enqueue pipeline: queue is full") 505 } 506 } else if msg.Nsid == tangled.GitRefUpdateNSID { 507 event := tangled.GitRefUpdate{} 508 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 509 l.Error("error unmarshalling", "err", err) 510 return err 511 } 512 l = l.With("repo", event.Repo, "ref", event.Ref, "newSha", event.NewSha) 513 l.Debug("debug") 514 515 repoDid := syntax.DID(event.Repo) 516 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 517 return fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 518 } 519 520 // NOTE: we are blindly trusting the knot that it will return only repos it own 521 repoCloneUri := s.newRepoCloneUrl(src.Key(), syntax.DID(event.Repo)) 522 repoPath := s.newRepoPath(syntax.DID(event.Repo)) 523 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 524 return fmt.Errorf("sync git repo: %w", err) 525 } 526 l.Info("synced git repo") 527 528 // TODO: plan the pipeline 529 } 530 531 return nil 532} 533 534// newRepoPath creates a path to store repository by its did and rkey. 535// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 536func (s *Spindle) newRepoPath(repo syntax.DID) string { 537 return filepath.Join(s.cfg.Server.RepoDir, repo.String()) 538} 539 540func (s *Spindle) newRepoCloneUrl(knot string, did syntax.DID) string { 541 scheme := "https://" 542 if s.cfg.Server.Dev { 543 scheme = "http://" 544 } 545 return fmt.Sprintf("%s%s/%s", scheme, knot, did) 546} 547 548const RequiredVersion = "2.49.0" 549 550func ensureGitVersion() error { 551 v, err := git.Version() 552 if err != nil { 553 return fmt.Errorf("fetching git version: %w", err) 554 } 555 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 556 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 557 } 558 return nil 559} 560 561func (s *Spindle) resolvePipelineRepoDid(repo *tangled.Pipeline_TriggerRepo) (syntax.DID, error) { 562 if repo.RepoDid == nil || *repo.RepoDid == "" { 563 return "", fmt.Errorf("pipeline trigger missing repoDid") 564 } 565 repoDid, err := syntax.ParseDID(*repo.RepoDid) 566 if err != nil { 567 return "", fmt.Errorf("parse repoDid %s: %w", *repo.RepoDid, err) 568 } 569 if _, err := s.db.GetRepoByDid(repoDid); err != nil { 570 return "", fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 571 } 572 return repoDid, nil 573} 574 575func (s *Spindle) configureOwner() error { 576 cfgOwner := s.cfg.Server.Owner 577 578 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 579 if err != nil { 580 return err 581 } 582 583 switch len(existing) { 584 case 0: 585 // no owner configured, continue 586 case 1: 587 // find existing owner 588 existingOwner := existing[0] 589 590 // no ownership change, this is okay 591 if existingOwner == s.cfg.Server.Owner { 592 break 593 } 594 595 // remove existing owner 596 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 597 if err != nil { 598 return nil 599 } 600 default: 601 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 602 } 603 604 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 605}