Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/models" 15 "tangled.org/core/appview/pipelines" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventstream" 18 "tangled.org/core/orm" 19 "tangled.org/core/rbac" 20 spindle "tangled.org/core/spindle/models" 21) 22 23func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) { 24 spindles, err := db.GetSpindles(ctx, d, orm.FilterIsNot("verified", "null")) 25 if err != nil { 26 return nil, err 27 } 28 29 hosts := make([]string, len(spindles)) 30 for i, s := range spindles { 31 hosts[i] = s.Instance 32 } 33 34 return bootstrapStream( 35 ctx, "spindlestream", ec.KindSpindle, hosts, c.Redis.Addr, 36 c.Spindlestream, c.Core.Dev, 37 spindleIngester(d, pn), 38 ), nil 39} 40 41func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 42 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 43 switch msg.Nsid { 44 case tangled.PipelineStatusNSID: 45 return ingestPipelineStatus(ctx, d, pn, source, msg) 46 } 47 return nil 48 } 49} 50 51func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error { 52 var record tangled.PipelineStatus 53 err := json.Unmarshal(msg.EventJson, &record) 54 if err != nil { 55 return err 56 } 57 58 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 59 if err != nil { 60 return err 61 } 62 63 exitCode := 0 64 if record.ExitCode != nil { 65 exitCode = int(*record.ExitCode) 66 } 67 68 // pick the record creation time if possible, or use time.Now 69 created := time.Now() 70 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 71 created = t 72 } 73 74 status := models.PipelineStatus{ 75 Spindle: source.Host, 76 Rkey: msg.Rkey, 77 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 78 PipelineRkey: pipelineUri.RecordKey().String(), 79 Created: created, 80 Workflow: record.Workflow, 81 Status: spindle.StatusKind(record.Status), 82 Error: record.Error, 83 ExitCode: exitCode, 84 } 85 86 err = db.AddPipelineStatus(ctx, d, status) 87 if err != nil { 88 return fmt.Errorf("failed to add pipeline status: %w", err) 89 } 90 91 pn.Publish(pipelineUri) 92 93 return nil 94}