Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "net/http"
10 "os"
11 "time"
12
13 "tangled.org/core/eventstream"
14 "tangled.org/core/log"
15 "tangled.org/core/spindle/models"
16
17 "github.com/go-chi/chi/v5"
18 "github.com/gorilla/websocket"
19 "github.com/hpcloud/tail"
20)
21
22var upgrader = websocket.Upgrader{
23 ReadBufferSize: 1024,
24 WriteBufferSize: 1024,
25}
26
27func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
28 l := log.SubLogger(s.l, "eventstream")
29 l.Debug("received new connection")
30
31 err := eventstream.Stream(w, r, eventstream.StreamConfig{
32 Backend: s.db,
33 Notifier: s.n,
34 Logger: l,
35 })
36 if err != nil && !errors.Is(err, eventstream.ErrDrainCap) {
37 l.Error("event stream ended with error", "err", err)
38 }
39}
40
41func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
42 wid, err := getWorkflowID(r)
43 if err != nil {
44 http.Error(w, err.Error(), http.StatusBadRequest)
45 return
46 }
47
48 l := s.l.With("handler", "Logs")
49 l = s.l.With("wid", wid)
50
51 conn, err := upgrader.Upgrade(w, r, nil)
52 if err != nil {
53 l.Error("websocket upgrade failed", "err", err)
54 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
55 return
56 }
57 defer func() {
58 _ = conn.WriteControl(
59 websocket.CloseMessage,
60 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
61 time.Now().Add(time.Second),
62 )
63 conn.Close()
64 }()
65 l.Debug("upgraded http to wss")
66
67 ctx, cancel := context.WithCancel(r.Context())
68 defer cancel()
69
70 go func() {
71 for {
72 if _, _, err := conn.NextReader(); err != nil {
73 l.Debug("client disconnected", "err", err)
74 cancel()
75 return
76 }
77 }
78 }()
79
80 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
81 l.Info("log stream ended", "err", err)
82 }
83
84 l.Info("logs connection closed")
85}
86
87func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
88 status, err := s.db.GetStatus(wid)
89 if err != nil {
90 return err
91 }
92 isFinished := models.StatusKind(status.Status).IsFinish()
93
94 filePath := models.LogFilePath(s.cfg.Server.LogDir, wid)
95
96 if status.Status == models.StatusKindFailed.String() && status.Error != nil {
97 if _, err := os.Stat(filePath); os.IsNotExist(err) {
98 msgs := []models.LogLine{
99 {
100 Kind: models.LogKindControl,
101 Content: "",
102 StepId: 0,
103 StepKind: models.StepKindUser,
104 },
105 {
106 Kind: models.LogKindData,
107 Content: *status.Error,
108 },
109 }
110
111 for _, msg := range msgs {
112 b, err := json.Marshal(msg)
113 if err != nil {
114 return err
115 }
116
117 if err := conn.WriteMessage(websocket.TextMessage, b); err != nil {
118 return fmt.Errorf("failed to write to websocket: %w", err)
119 }
120 }
121
122 return nil
123 }
124 }
125
126 config := tail.Config{
127 Follow: !isFinished,
128 ReOpen: !isFinished,
129 MustExist: false,
130 Location: &tail.SeekInfo{
131 Offset: 0,
132 Whence: io.SeekStart,
133 },
134 // Logger: tail.DiscardingLogger,
135 }
136
137 t, err := tail.TailFile(filePath, config)
138 if err != nil {
139 return fmt.Errorf("failed to tail log file: %w", err)
140 }
141 defer t.Stop()
142
143 for {
144 select {
145 case <-ctx.Done():
146 return ctx.Err()
147 case line := <-t.Lines:
148 if line == nil && isFinished {
149 return fmt.Errorf("tail completed")
150 }
151
152 if line == nil {
153 return fmt.Errorf("tail channel closed unexpectedly")
154 }
155
156 if line.Err != nil {
157 return fmt.Errorf("error tailing log file: %w", line.Err)
158 }
159
160 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil {
161 return fmt.Errorf("failed to write to websocket: %w", err)
162 }
163 case <-time.After(30 * time.Second):
164 // send a keep-alive
165 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
166 return fmt.Errorf("failed to write control: %w", err)
167 }
168 }
169 }
170}
171
172func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
173 knot := chi.URLParam(r, "knot")
174 rkey := chi.URLParam(r, "rkey")
175 name := chi.URLParam(r, "name")
176
177 if knot == "" || rkey == "" || name == "" {
178 return models.WorkflowId{}, fmt.Errorf("missing required parameters")
179 }
180
181 return models.WorkflowId{
182 PipelineId: models.PipelineId{
183 Knot: knot,
184 Rkey: rkey,
185 },
186 Name: name,
187 }, nil
188}