Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "encoding/json"
5 "time"
6
7 "tangled.org/core/api/tangled"
8 "tangled.org/core/eventstream"
9 "tangled.org/core/notifier"
10 "tangled.org/core/spindle/models"
11 "tangled.org/core/tid"
12)
13
14func (d *DB) insertEvent(event eventstream.Event, n *notifier.Notifier) error {
15 return eventstream.Insert(d, event, n)
16}
17
18func (d *DB) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) {
19 return eventstream.List(d, cursor, limit)
20}
21
22func (d *DB) createStatusEvent(
23 workflowId models.WorkflowId,
24 statusKind models.StatusKind,
25 workflowError *string,
26 exitCode *int64,
27 n *notifier.Notifier,
28) error {
29 now := time.Now()
30 pipelineAtUri := workflowId.PipelineId.AtUri()
31 s := tangled.PipelineStatus{
32 CreatedAt: now.Format(time.RFC3339),
33 Error: workflowError,
34 ExitCode: exitCode,
35 Pipeline: string(pipelineAtUri),
36 Workflow: workflowId.Name,
37 Status: string(statusKind),
38 }
39
40 eventJson, err := json.Marshal(s)
41 if err != nil {
42 return err
43 }
44
45 event := eventstream.Event{
46 Rkey: tid.TID(),
47 Nsid: tangled.PipelineStatusNSID,
48 EventJson: eventJson,
49 }
50
51 return d.insertEvent(event, n)
52}
53
54func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
55 pipelineAtUri := workflowId.PipelineId.AtUri()
56
57 var eventJson string
58 err := d.QueryRow(
59 `
60 select
61 event from events
62 where
63 nsid = ?
64 and json_extract(event, '$.pipeline') = ?
65 and json_extract(event, '$.workflow') = ?
66 order by
67 created desc
68 limit
69 1
70 `,
71 tangled.PipelineStatusNSID,
72 string(pipelineAtUri),
73 workflowId.Name,
74 ).Scan(&eventJson)
75
76 if err != nil {
77 return nil, err
78 }
79
80 var status tangled.PipelineStatus
81 if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
82 return nil, err
83 }
84
85 return &status, nil
86}
87
88func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
89 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
90}
91
92func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
93 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n)
94}
95
96func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
97 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
98}
99
100func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
101 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n)
102}
103
104func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
105 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n)
106}
107
108func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error {
109 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n)
110}