Monorepo for Tangled tangled.org
10

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/models" 15 "tangled.org/core/appview/pipelines" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventstream" 18 "tangled.org/core/log" 19 "tangled.org/core/orm" 20 "tangled.org/core/rbac" 21 spindle "tangled.org/core/spindle/models" 22 "tangled.org/core/workflow" 23) 24 25func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) { 26 spindles, err := db.GetSpindles(ctx, d, orm.FilterIsNot("verified", "null")) 27 if err != nil { 28 return nil, err 29 } 30 31 hosts := make([]string, len(spindles)) 32 for i, s := range spindles { 33 hosts[i] = s.Instance 34 } 35 36 return bootstrapStream( 37 ctx, "spindlestream", ec.KindSpindle, hosts, c.Redis.Addr, 38 c.Spindlestream, 39 spindleIngester(d, pn), 40 ), nil 41} 42 43func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 44 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 45 switch msg.Nsid { 46 case tangled.PipelineNSID: 47 return ingestPipeline(ctx, d, source, msg) 48 case tangled.PipelineStatusNSID: 49 return ingestPipelineStatus(ctx, d, pn, source, msg) 50 } 51 return nil 52 } 53} 54 55func ingestPipeline(ctx context.Context, d *db.DB, source ec.Source, msg eventstream.Event) error { 56 l := log.FromContext(ctx) 57 58 var record tangled.Pipeline 59 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 60 return fmt.Errorf("unmarshal pipeline: %w", err) 61 } 62 63 if record.TriggerMetadata == nil { 64 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 65 } 66 67 if record.TriggerMetadata.Repo == nil { 68 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 69 } 70 71 repoName := "" 72 if record.TriggerMetadata.Repo.Repo != nil { 73 repoName = *record.TriggerMetadata.Repo.Repo 74 } 75 76 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 77 if lookupErr != nil { 78 return fmt.Errorf("failed to look up repo: %w", lookupErr) 79 } 80 if repo.Spindle == "" { 81 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 82 } 83 84 // trigger info 85 var trigger models.Trigger 86 var sha string 87 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 88 switch trigger.Kind { 89 case workflow.TriggerKindPush: 90 trigger.PushRef = &record.TriggerMetadata.Push.Ref 91 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 92 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 93 sha = *trigger.PushNewSha 94 case workflow.TriggerKindPullRequest: 95 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 96 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 97 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 98 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 99 sha = *trigger.PRSourceSha 100 } 101 102 tx, err := d.Begin() 103 if err != nil { 104 return fmt.Errorf("failed to start txn: %w", err) 105 } 106 107 triggerId, err := db.AddTrigger(tx, trigger) 108 if err != nil { 109 return fmt.Errorf("failed to add trigger entry: %w", err) 110 } 111 112 // TODO: we shouldn't even use knot to identify pipelines 113 knot := record.TriggerMetadata.Repo.Knot 114 pipeline := models.Pipeline{ 115 Rkey: msg.Rkey, 116 Knot: knot, 117 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 118 RepoName: repoName, 119 RepoDid: repo.RepoDid, 120 TriggerId: int(triggerId), 121 Sha: sha, 122 } 123 124 err = db.AddPipeline(tx, pipeline) 125 if err != nil { 126 return fmt.Errorf("failed to add pipeline: %w", err) 127 } 128 129 err = tx.Commit() 130 if err != nil { 131 return fmt.Errorf("failed to commit txn: %w", err) 132 } 133 134 l.Info("added pipeline", "pipeline", pipeline) 135 136 return nil 137} 138 139func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error { 140 var record tangled.PipelineStatus 141 err := json.Unmarshal(msg.EventJson, &record) 142 if err != nil { 143 return err 144 } 145 146 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 147 if err != nil { 148 return err 149 } 150 151 exitCode := 0 152 if record.ExitCode != nil { 153 exitCode = int(*record.ExitCode) 154 } 155 156 // pick the record creation time if possible, or use time.Now 157 created := time.Now() 158 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 159 created = t 160 } 161 162 status := models.PipelineStatus{ 163 Spindle: source.Host, 164 Rkey: msg.Rkey, 165 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 166 PipelineRkey: pipelineUri.RecordKey().String(), 167 Created: created, 168 Workflow: record.Workflow, 169 Status: spindle.StatusKind(record.Status), 170 Error: record.Error, 171 ExitCode: exitCode, 172 } 173 174 err = db.AddPipelineStatus(ctx, d, status) 175 if err != nil { 176 return fmt.Errorf("failed to add pipeline status: %w", err) 177 } 178 179 pn.Publish(pipelineUri) 180 181 return nil 182}