Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/appview/config"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pipelines"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventstream"
18 "tangled.org/core/orm"
19 "tangled.org/core/rbac"
20 spindle "tangled.org/core/spindle/models"
21)
22
23func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) {
24 spindles, err := db.GetSpindles(ctx, d, orm.FilterIsNot("verified", "null"))
25 if err != nil {
26 return nil, err
27 }
28
29 hosts := make([]string, len(spindles))
30 for i, s := range spindles {
31 hosts[i] = s.Instance
32 }
33
34 return bootstrapStream(
35 ctx, "spindlestream", ec.KindSpindle, hosts, c.Redis.Addr,
36 c.Spindlestream,
37 spindleIngester(d, pn),
38 ), nil
39}
40
41func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc {
42 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error {
43 switch msg.Nsid {
44 case tangled.PipelineStatusNSID:
45 return ingestPipelineStatus(ctx, d, pn, source, msg)
46 }
47 return nil
48 }
49}
50
51func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error {
52 var record tangled.PipelineStatus
53 err := json.Unmarshal(msg.EventJson, &record)
54 if err != nil {
55 return err
56 }
57
58 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
59 if err != nil {
60 return err
61 }
62
63 exitCode := 0
64 if record.ExitCode != nil {
65 exitCode = int(*record.ExitCode)
66 }
67
68 // pick the record creation time if possible, or use time.Now
69 created := time.Now()
70 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
71 created = t
72 }
73
74 status := models.PipelineStatus{
75 Spindle: source.Host,
76 Rkey: msg.Rkey,
77 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
78 PipelineRkey: pipelineUri.RecordKey().String(),
79 Created: created,
80 Workflow: record.Workflow,
81 Status: spindle.StatusKind(record.Status),
82 Error: record.Error,
83 ExitCode: exitCode,
84 }
85
86 err = db.AddPipelineStatus(ctx, d, status)
87 if err != nil {
88 return fmt.Errorf("failed to add pipeline status: %w", err)
89 }
90
91 pn.Publish(pipelineUri)
92
93 return nil
94}