Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pipelines"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventstream"
18 "tangled.org/core/log"
19 "tangled.org/core/orm"
20 "tangled.org/core/rbac"
21 spindle "tangled.org/core/spindle/models"
22 "tangled.org/core/workflow"
23)
24
25func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) {
26 spindles, err := db.GetSpindles(ctx, d, orm.FilterIsNot("verified", "null"))
27 if err != nil {
28 return nil, err
29 }
30
31 hosts := make([]string, len(spindles))
32 for i, s := range spindles {
33 hosts[i] = s.Instance
34 }
35
36 return bootstrapStream(
37 ctx, "spindlestream", ec.KindSpindle, hosts, c.Redis.Addr,
38 c.Spindlestream,
39 spindleIngester(d, pn),
40 ), nil
41}
42
43func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc {
44 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error {
45 switch msg.Nsid {
46 case tangled.PipelineNSID:
47 return ingestPipeline(ctx, d, source, msg)
48 case tangled.PipelineStatusNSID:
49 return ingestPipelineStatus(ctx, d, pn, source, msg)
50 }
51 return nil
52 }
53}
54
55func ingestPipeline(ctx context.Context, d *db.DB, source ec.Source, msg eventstream.Event) error {
56 l := log.FromContext(ctx)
57
58 var record tangled.Pipeline
59 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
60 return fmt.Errorf("unmarshal pipeline: %w", err)
61 }
62
63 if record.TriggerMetadata == nil {
64 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
65 }
66
67 if record.TriggerMetadata.Repo == nil {
68 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
69 }
70
71 repoName := ""
72 if record.TriggerMetadata.Repo.Repo != nil {
73 repoName = *record.TriggerMetadata.Repo.Repo
74 }
75
76 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName)
77 if lookupErr != nil {
78 return fmt.Errorf("failed to look up repo: %w", lookupErr)
79 }
80 if repo.Spindle == "" {
81 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
82 }
83
84 // trigger info
85 var trigger models.Trigger
86 var sha string
87 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
88 switch trigger.Kind {
89 case workflow.TriggerKindPush:
90 trigger.PushRef = &record.TriggerMetadata.Push.Ref
91 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
92 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
93 sha = *trigger.PushNewSha
94 case workflow.TriggerKindPullRequest:
95 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
96 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
97 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
98 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
99 sha = *trigger.PRSourceSha
100 }
101
102 tx, err := d.Begin()
103 if err != nil {
104 return fmt.Errorf("failed to start txn: %w", err)
105 }
106
107 triggerId, err := db.AddTrigger(tx, trigger)
108 if err != nil {
109 return fmt.Errorf("failed to add trigger entry: %w", err)
110 }
111
112 // TODO: we shouldn't even use knot to identify pipelines
113 knot := record.TriggerMetadata.Repo.Knot
114 pipeline := models.Pipeline{
115 Rkey: msg.Rkey,
116 Knot: knot,
117 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
118 RepoName: repoName,
119 RepoDid: repo.RepoDid,
120 TriggerId: int(triggerId),
121 Sha: sha,
122 }
123
124 err = db.AddPipeline(tx, pipeline)
125 if err != nil {
126 return fmt.Errorf("failed to add pipeline: %w", err)
127 }
128
129 err = tx.Commit()
130 if err != nil {
131 return fmt.Errorf("failed to commit txn: %w", err)
132 }
133
134 l.Info("added pipeline", "pipeline", pipeline)
135
136 return nil
137}
138
139func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error {
140 var record tangled.PipelineStatus
141 err := json.Unmarshal(msg.EventJson, &record)
142 if err != nil {
143 return err
144 }
145
146 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
147 if err != nil {
148 return err
149 }
150
151 exitCode := 0
152 if record.ExitCode != nil {
153 exitCode = int(*record.ExitCode)
154 }
155
156 // pick the record creation time if possible, or use time.Now
157 created := time.Now()
158 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
159 created = t
160 }
161
162 status := models.PipelineStatus{
163 Spindle: source.Host,
164 Rkey: msg.Rkey,
165 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
166 PipelineRkey: pipelineUri.RecordKey().String(),
167 Created: created,
168 Workflow: record.Workflow,
169 Status: spindle.StatusKind(record.Status),
170 Error: record.Error,
171 ExitCode: exitCode,
172 }
173
174 err = db.AddPipelineStatus(ctx, d, status)
175 if err != nil {
176 return fmt.Errorf("failed to add pipeline status: %w", err)
177 }
178
179 pn.Publish(pipelineUri)
180
181 return nil
182}