Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "encoding/json" 5 "time" 6 7 "tangled.org/core/api/tangled" 8 "tangled.org/core/eventstream" 9 "tangled.org/core/notifier" 10 "tangled.org/core/spindle/models" 11 "tangled.org/core/tid" 12) 13 14func (d *DB) insertEvent(event eventstream.Event, n *notifier.Notifier) error { 15 return eventstream.Insert(d, event, n) 16} 17 18func (d *DB) GetEvents(cursor int64, limit int) ([]eventstream.Event, error) { 19 return eventstream.List(d, cursor, limit) 20} 21 22func (d *DB) createStatusEvent( 23 workflowId models.WorkflowId, 24 statusKind models.StatusKind, 25 workflowError *string, 26 exitCode *int64, 27 n *notifier.Notifier, 28) error { 29 now := time.Now() 30 pipelineAtUri := workflowId.PipelineId.AtUri() 31 s := tangled.PipelineStatus{ 32 CreatedAt: now.Format(time.RFC3339), 33 Error: workflowError, 34 ExitCode: exitCode, 35 Pipeline: string(pipelineAtUri), 36 Workflow: workflowId.Name, 37 Status: string(statusKind), 38 } 39 40 eventJson, err := json.Marshal(s) 41 if err != nil { 42 return err 43 } 44 45 event := eventstream.Event{ 46 Rkey: tid.TID(), 47 Nsid: tangled.PipelineStatusNSID, 48 EventJson: eventJson, 49 } 50 51 return d.insertEvent(event, n) 52} 53 54func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) { 55 pipelineAtUri := workflowId.PipelineId.AtUri() 56 57 var eventJson string 58 err := d.QueryRow( 59 ` 60 select 61 event from events 62 where 63 nsid = ? 64 and json_extract(event, '$.pipeline') = ? 65 and json_extract(event, '$.workflow') = ? 66 order by 67 created desc 68 limit 69 1 70 `, 71 tangled.PipelineStatusNSID, 72 string(pipelineAtUri), 73 workflowId.Name, 74 ).Scan(&eventJson) 75 76 if err != nil { 77 return nil, err 78 } 79 80 var status tangled.PipelineStatus 81 if err := json.Unmarshal([]byte(eventJson), &status); err != nil { 82 return nil, err 83 } 84 85 return &status, nil 86} 87 88func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 89 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n) 90} 91 92func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 93 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n) 94} 95 96func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 97 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 98} 99 100func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 101 return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 102} 103 104func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 105 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 106} 107 108func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error { 109 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n) 110}