Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview: listen for pipeline events from spindlestream

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit 3554ecbe parent b656f9e7 change-id nzxyxowu
+88 -82
-82
appview/state/knotstream.go
··· 27 27 "tangled.org/core/log" 28 28 "tangled.org/core/orm" 29 29 "tangled.org/core/rbac" 30 - "tangled.org/core/workflow" 31 30 32 31 "github.com/bluesky-social/indigo/atproto/syntax" 33 32 "github.com/go-git/go-git/v5/plumbing" ··· 80 79 switch msg.Nsid { 81 80 case tangled.GitRefUpdateNSID: 82 81 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 83 - case tangled.PipelineNSID: 84 - return ingestPipeline(d, source, msg) 85 82 case knotdb.RepoDIDAssignNSID: 86 83 return ingestDIDAssign(d, enforcer, source, msg, ctx) 87 84 case knotdb.KnotMemberUpdateNSID: ··· 386 383 } 387 384 388 385 return tx.Commit() 389 - } 390 - 391 - func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error { 392 - var record tangled.Pipeline 393 - err := json.Unmarshal(msg.EventJson, &record) 394 - if err != nil { 395 - return err 396 - } 397 - 398 - if record.TriggerMetadata == nil { 399 - return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 400 - } 401 - 402 - if record.TriggerMetadata.Repo == nil { 403 - return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 404 - } 405 - 406 - repoName := "" 407 - if record.TriggerMetadata.Repo.Repo != nil { 408 - repoName = *record.TriggerMetadata.Repo.Repo 409 - } 410 - 411 - repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 412 - if lookupErr != nil { 413 - return fmt.Errorf("failed to look up repo: %w", lookupErr) 414 - } 415 - if repo.Spindle == "" { 416 - return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 417 - } 418 - 419 - // trigger info 420 - var trigger models.Trigger 421 - var sha string 422 - trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 423 - switch trigger.Kind { 424 - case workflow.TriggerKindPush: 425 - trigger.PushRef = &record.TriggerMetadata.Push.Ref 426 - trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 427 - trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 428 - sha = *trigger.PushNewSha 429 - case workflow.TriggerKindPullRequest: 430 - trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 431 - trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 432 - trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 433 - trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 434 - sha = *trigger.PRSourceSha 435 - } 436 - 437 - tx, err := d.Begin() 438 - if err != nil { 439 - return fmt.Errorf("failed to start txn: %w", err) 440 - } 441 - 442 - triggerId, err := db.AddTrigger(tx, trigger) 443 - if err != nil { 444 - return fmt.Errorf("failed to add trigger entry: %w", err) 445 - } 446 - 447 - pipeline := models.Pipeline{ 448 - Rkey: msg.Rkey, 449 - Knot: source.Host, 450 - RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 451 - RepoName: repoName, 452 - RepoDid: repo.RepoDid, 453 - TriggerId: int(triggerId), 454 - Sha: sha, 455 - } 456 - 457 - err = db.AddPipeline(tx, pipeline) 458 - if err != nil { 459 - return fmt.Errorf("failed to add pipeline: %w", err) 460 - } 461 - 462 - err = tx.Commit() 463 - if err != nil { 464 - return fmt.Errorf("failed to commit txn: %w", err) 465 - } 466 - 467 - return nil 468 386 } 469 387 470 388 func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error {
+88
appview/state/spindlestream.go
··· 15 15 "tangled.org/core/appview/pipelines" 16 16 ec "tangled.org/core/eventconsumer" 17 17 "tangled.org/core/eventstream" 18 + "tangled.org/core/log" 18 19 "tangled.org/core/orm" 19 20 "tangled.org/core/rbac" 20 21 spindle "tangled.org/core/spindle/models" 22 + "tangled.org/core/workflow" 21 23 ) 22 24 23 25 func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) { ··· 41 43 func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 42 44 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 43 45 switch msg.Nsid { 46 + case tangled.PipelineNSID: 47 + return ingestPipeline(ctx, d, source, msg) 44 48 case tangled.PipelineStatusNSID: 45 49 return ingestPipelineStatus(ctx, d, pn, source, msg) 46 50 } 47 51 return nil 48 52 } 53 + } 54 + 55 + func ingestPipeline(ctx context.Context, d *db.DB, source ec.Source, msg eventstream.Event) error { 56 + l := log.FromContext(ctx) 57 + 58 + var record tangled.Pipeline 59 + if err := json.Unmarshal(msg.EventJson, &record); err != nil { 60 + return fmt.Errorf("unmarshal pipeline: %w", err) 61 + } 62 + 63 + if record.TriggerMetadata == nil { 64 + return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 65 + } 66 + 67 + if record.TriggerMetadata.Repo == nil { 68 + return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 69 + } 70 + 71 + repoName := "" 72 + if record.TriggerMetadata.Repo.Repo != nil { 73 + repoName = *record.TriggerMetadata.Repo.Repo 74 + } 75 + 76 + repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 77 + if lookupErr != nil { 78 + return fmt.Errorf("failed to look up repo: %w", lookupErr) 79 + } 80 + if repo.Spindle == "" { 81 + return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 82 + } 83 + 84 + // trigger info 85 + var trigger models.Trigger 86 + var sha string 87 + trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 88 + switch trigger.Kind { 89 + case workflow.TriggerKindPush: 90 + trigger.PushRef = &record.TriggerMetadata.Push.Ref 91 + trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 92 + trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 93 + sha = *trigger.PushNewSha 94 + case workflow.TriggerKindPullRequest: 95 + trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 96 + trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 97 + trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 98 + trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 99 + sha = *trigger.PRSourceSha 100 + } 101 + 102 + tx, err := d.Begin() 103 + if err != nil { 104 + return fmt.Errorf("failed to start txn: %w", err) 105 + } 106 + 107 + triggerId, err := db.AddTrigger(tx, trigger) 108 + if err != nil { 109 + return fmt.Errorf("failed to add trigger entry: %w", err) 110 + } 111 + 112 + // TODO: we shouldn't even use knot to identify pipelines 113 + knot := record.TriggerMetadata.Repo.Knot 114 + pipeline := models.Pipeline{ 115 + Rkey: msg.Rkey, 116 + Knot: knot, 117 + RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 118 + RepoName: repoName, 119 + RepoDid: repo.RepoDid, 120 + TriggerId: int(triggerId), 121 + Sha: sha, 122 + } 123 + 124 + err = db.AddPipeline(tx, pipeline) 125 + if err != nil { 126 + return fmt.Errorf("failed to add pipeline: %w", err) 127 + } 128 + 129 + err = tx.Commit() 130 + if err != nil { 131 + return fmt.Errorf("failed to commit txn: %w", err) 132 + } 133 + 134 + l.Info("added pipeline", "pipeline", pipeline) 135 + 136 + return nil 49 137 } 50 138 51 139 func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error {