Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "encoding/json"
5 "time"
6
7 "tangled.org/core/api/tangled"
8 "tangled.org/core/eventstream"
9 "tangled.org/core/notifier"
10 "tangled.org/core/spindle/models"
11 "tangled.org/core/tid"
12)
13
14func (d *DB) insertEvent(event eventstream.Event, n *notifier.Notifier) error {
15 return eventstream.Insert(d, event, n)
16}
17
18func (d *DB) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) {
19 return eventstream.List(d, cursor, limit)
20}
21
22func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error {
23 eventJson, err := json.Marshal(pipeline)
24 if err != nil {
25 return err
26 }
27 event := eventstream.Event{
28 Rkey: rkey,
29 Nsid: tangled.PipelineNSID,
30 Created: time.Now().UnixNano(),
31 EventJson: eventJson,
32 }
33 return d.insertEvent(event, n)
34}
35
36func (d *DB) createStatusEvent(
37 workflowId models.WorkflowId,
38 statusKind models.StatusKind,
39 workflowError *string,
40 exitCode *int64,
41 n *notifier.Notifier,
42) error {
43 now := time.Now()
44 pipelineAtUri := workflowId.PipelineId.AtUri()
45 s := tangled.PipelineStatus{
46 CreatedAt: now.Format(time.RFC3339),
47 Error: workflowError,
48 ExitCode: exitCode,
49 Pipeline: string(pipelineAtUri),
50 Workflow: workflowId.Name,
51 Status: string(statusKind),
52 }
53
54 eventJson, err := json.Marshal(s)
55 if err != nil {
56 return err
57 }
58
59 event := eventstream.Event{
60 Rkey: tid.TID(),
61 Nsid: tangled.PipelineStatusNSID,
62 EventJson: eventJson,
63 }
64
65 return d.insertEvent(event, n)
66}
67
68func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
69 pipelineAtUri := workflowId.PipelineId.AtUri()
70
71 var eventJson string
72 err := d.QueryRow(
73 `
74 select
75 event from events
76 where
77 nsid = ?
78 and json_extract(event, '$.pipeline') = ?
79 and json_extract(event, '$.workflow') = ?
80 order by
81 created desc
82 limit
83 1
84 `,
85 tangled.PipelineStatusNSID,
86 string(pipelineAtUri),
87 workflowId.Name,
88 ).Scan(&eventJson)
89
90 if err != nil {
91 return nil, err
92 }
93
94 var status tangled.PipelineStatus
95 if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
96 return nil, err
97 }
98
99 return &status, nil
100}
101
102func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
103 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
104}
105
106func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
107 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n)
108}
109
110func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
111 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
112}
113
114func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
115 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n)
116}
117
118func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
119 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n)
120}
121
122func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error {
123 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n)
124}