Monorepo for Tangled tangled.org
12

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "encoding/json" 5 "time" 6 7 "tangled.org/core/api/tangled" 8 "tangled.org/core/eventstream" 9 "tangled.org/core/notifier" 10 "tangled.org/core/spindle/models" 11 "tangled.org/core/tid" 12) 13 14func (d *DB) insertEvent(event eventstream.Event, n *notifier.Notifier) error { 15 return eventstream.Insert(d, event, n) 16} 17 18func (d *DB) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 19 return eventstream.List(d, cursor, limit) 20} 21 22func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 23 eventJson, err := json.Marshal(pipeline) 24 if err != nil { 25 return err 26 } 27 event := eventstream.Event{ 28 Rkey: rkey, 29 Nsid: tangled.PipelineNSID, 30 Created: time.Now().UnixNano(), 31 EventJson: eventJson, 32 } 33 return d.insertEvent(event, n) 34} 35 36func (d *DB) createStatusEvent( 37 workflowId models.WorkflowId, 38 statusKind models.StatusKind, 39 workflowError *string, 40 exitCode *int64, 41 n *notifier.Notifier, 42) error { 43 now := time.Now() 44 pipelineAtUri := workflowId.PipelineId.AtUri() 45 s := tangled.PipelineStatus{ 46 CreatedAt: now.Format(time.RFC3339), 47 Error: workflowError, 48 ExitCode: exitCode, 49 Pipeline: string(pipelineAtUri), 50 Workflow: workflowId.Name, 51 Status: string(statusKind), 52 } 53 54 eventJson, err := json.Marshal(s) 55 if err != nil { 56 return err 57 } 58 59 event := eventstream.Event{ 60 Rkey: tid.TID(), 61 Nsid: tangled.PipelineStatusNSID, 62 EventJson: eventJson, 63 } 64 65 return d.insertEvent(event, n) 66} 67 68func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) { 69 pipelineAtUri := workflowId.PipelineId.AtUri() 70 71 var eventJson string 72 err := d.QueryRow( 73 ` 74 select 75 event from events 76 where 77 nsid = ? 78 and json_extract(event, '$.pipeline') = ? 79 and json_extract(event, '$.workflow') = ? 80 order by 81 created desc 82 limit 83 1 84 `, 85 tangled.PipelineStatusNSID, 86 string(pipelineAtUri), 87 workflowId.Name, 88 ).Scan(&eventJson) 89 90 if err != nil { 91 return nil, err 92 } 93 94 var status tangled.PipelineStatus 95 if err := json.Unmarshal([]byte(eventJson), &status); err != nil { 96 return nil, err 97 } 98 99 return &status, nil 100} 101 102func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 103 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n) 104} 105 106func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 107 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n) 108} 109 110func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 111 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 112} 113 114func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 115 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 116} 117 118func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 119 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 120} 121 122func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error { 123 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n) 124}