Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "net/http" 10 "os" 11 "time" 12 13 "tangled.org/core/eventstream" 14 "tangled.org/core/log" 15 "tangled.org/core/spindle/models" 16 17 "github.com/go-chi/chi/v5" 18 "github.com/gorilla/websocket" 19 "github.com/hpcloud/tail" 20) 21 22var upgrader = websocket.Upgrader{ 23 ReadBufferSize: 1024, 24 WriteBufferSize: 1024, 25} 26 27func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 28 l := log.SubLogger(s.l, "eventstream") 29 l.Debug("received new connection") 30 31 err := eventstream.Stream(w, r, eventstream.StreamConfig{ 32 Backend: s.db, 33 Notifier: s.n, 34 Logger: l, 35 }) 36 if err != nil && !errors.Is(err, eventstream.ErrDrainCap) { 37 l.Error("event stream ended with error", "err", err) 38 } 39} 40 41func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 42 wid, err := getWorkflowID(r) 43 if err != nil { 44 http.Error(w, err.Error(), http.StatusBadRequest) 45 return 46 } 47 48 l := s.l.With("handler", "Logs") 49 l = s.l.With("wid", wid) 50 51 conn, err := upgrader.Upgrade(w, r, nil) 52 if err != nil { 53 l.Error("websocket upgrade failed", "err", err) 54 http.Error(w, "failed to upgrade", http.StatusInternalServerError) 55 return 56 } 57 defer func() { 58 _ = conn.WriteControl( 59 websocket.CloseMessage, 60 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 61 time.Now().Add(time.Second), 62 ) 63 conn.Close() 64 }() 65 l.Debug("upgraded http to wss") 66 67 ctx, cancel := context.WithCancel(r.Context()) 68 defer cancel() 69 70 go func() { 71 for { 72 if _, _, err := conn.NextReader(); err != nil { 73 l.Debug("client disconnected", "err", err) 74 cancel() 75 return 76 } 77 } 78 }() 79 80 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil { 81 l.Info("log stream ended", "err", err) 82 } 83 84 l.Info("logs connection closed") 85} 86 87func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error { 88 status, err := s.db.GetStatus(wid) 89 if err != nil { 90 return err 91 } 92 isFinished := models.StatusKind(status.Status).IsFinish() 93 94 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid) 95 96 if status.Status == models.StatusKindFailed.String() && status.Error != nil { 97 if _, err := os.Stat(filePath); os.IsNotExist(err) { 98 msgs := []models.LogLine{ 99 { 100 Kind: models.LogKindControl, 101 Content: "", 102 StepId: 0, 103 StepKind: models.StepKindUser, 104 }, 105 { 106 Kind: models.LogKindData, 107 Content: *status.Error, 108 }, 109 } 110 111 for _, msg := range msgs { 112 b, err := json.Marshal(msg) 113 if err != nil { 114 return err 115 } 116 117 if err := conn.WriteMessage(websocket.TextMessage, b); err != nil { 118 return fmt.Errorf("failed to write to websocket: %w", err) 119 } 120 } 121 122 return nil 123 } 124 } 125 126 config := tail.Config{ 127 Follow: !isFinished, 128 ReOpen: !isFinished, 129 MustExist: false, 130 Location: &tail.SeekInfo{ 131 Offset: 0, 132 Whence: io.SeekStart, 133 }, 134 // Logger: tail.DiscardingLogger, 135 } 136 137 t, err := tail.TailFile(filePath, config) 138 if err != nil { 139 return fmt.Errorf("failed to tail log file: %w", err) 140 } 141 defer t.Stop() 142 143 for { 144 select { 145 case <-ctx.Done(): 146 return ctx.Err() 147 case line := <-t.Lines: 148 if line == nil && isFinished { 149 return fmt.Errorf("tail completed") 150 } 151 152 if line == nil { 153 return fmt.Errorf("tail channel closed unexpectedly") 154 } 155 156 if line.Err != nil { 157 return fmt.Errorf("error tailing log file: %w", line.Err) 158 } 159 160 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil { 161 return fmt.Errorf("failed to write to websocket: %w", err) 162 } 163 case <-time.After(30 * time.Second): 164 // send a keep-alive 165 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 166 return fmt.Errorf("failed to write control: %w", err) 167 } 168 } 169 } 170} 171 172func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 173 knot := chi.URLParam(r, "knot") 174 rkey := chi.URLParam(r, "rkey") 175 name := chi.URLParam(r, "name") 176 177 if knot == "" || rkey == "" || name == "" { 178 return models.WorkflowId{}, fmt.Errorf("missing required parameters") 179 } 180 181 return models.WorkflowId{ 182 PipelineId: models.PipelineId{ 183 Knot: knot, 184 Rkey: rkey, 185 }, 186 Name: name, 187 }, nil 188}