Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

wip: spindle/xrpc: implement `ci.pipeline.subscribeLogs`

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 23, 2026, 5:18 AM +0900) commit 9f30fefe parent fb886a6d change-id tunmorwp
+209
+199
spindle/xrpc/ci_pipeline_subscribe_logs.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "sync" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/atproto/atclient" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/gorilla/websocket" 13 + "tangled.org/core/api/tangled" 14 + ) 15 + 16 + func (x *Xrpc) HandleCiPipelineSubscribeLogs(w http.ResponseWriter, r *http.Request) { 17 + var ( 18 + pipelineQuery = r.URL.Query().Get("pipeline") 19 + workflows = r.URL.Query()["workflows"] 20 + ) 21 + 22 + pipeline, err := syntax.ParseTID(pipelineQuery) 23 + if err != nil { 24 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("pipeline parameter invalid: %s", pipelineQuery)}) 25 + } 26 + 27 + x.handleSubscribeLogs(w, r, pipeline, workflows) 28 + } 29 + 30 + var wsUpgrader = websocket.Upgrader{ 31 + ReadBufferSize: 10_000, 32 + WriteBufferSize: 10_000, 33 + } 34 + 35 + func (x *Xrpc) handleSubscribeLogs(w http.ResponseWriter, r *http.Request, pipeline syntax.TID, workflows []string) { 36 + l := x.Logger.With("pipeline", pipeline, "workflows", workflows) 37 + 38 + ctx, cancel := context.WithCancel(r.Context()) 39 + defer cancel() 40 + 41 + conn, err := wsUpgrader.Upgrade(w, r, w.Header()) 42 + if err != nil { 43 + // TODO: writeJson(w, ) 44 + return 45 + } 46 + defer conn.Close() 47 + 48 + lastWriteLk := sync.Mutex{} 49 + lastWrite := time.Now() 50 + 51 + // Start a goroutine to ping the client every 30 seconds to check if it's 52 + // still alive. If the client doesn't respond to a ping within 5 seconds, 53 + // we'll close the connection and teardown the consumer. 54 + go func() { 55 + ticker := time.NewTicker(30 * time.Second) 56 + defer ticker.Stop() 57 + 58 + for { 59 + select { 60 + case <-ticker.C: 61 + lastWriteLk.Lock() 62 + lw := lastWrite 63 + lastWriteLk.Unlock() 64 + 65 + if time.Since(lw) < 30*time.Second { 66 + continue 67 + } 68 + 69 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 70 + l.Warn("failed to ping client", "err", err) 71 + cancel() 72 + return 73 + } 74 + case <-ctx.Done(): 75 + return 76 + } 77 + } 78 + }() 79 + 80 + conn.SetPingHandler(func(message string) error { 81 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 82 + if err == websocket.ErrCloseSent { 83 + return nil 84 + } 85 + return err 86 + }) 87 + 88 + // Start a goroutine to read messages from the client and discard them. 89 + go func() { 90 + for { 91 + _, _, err := conn.ReadMessage() 92 + if err != nil { 93 + l.Warn("failed to read message from client", "err", err) 94 + cancel() 95 + return 96 + } 97 + } 98 + }() 99 + 100 + evts := make(chan tangled.CiPipelineSubscribeLogs_Event, 16) 101 + go mockEvents(ctx, evts) 102 + 103 + for { 104 + select { 105 + case <-ctx.Done(): 106 + return 107 + case evt, ok := <-evts: 108 + if !ok { 109 + l.Error("event stream closed unexpectedly") 110 + return 111 + } 112 + 113 + wc, err := conn.NextWriter(websocket.BinaryMessage) 114 + if err != nil { 115 + l.Error("failed to get next writer", "err", err) 116 + return // err 117 + } 118 + 119 + err = evt.Serialize(wc) 120 + if err != nil { 121 + return // err 122 + } 123 + 124 + if err := wc.Close(); err != nil { 125 + l.Warn("failed to flush-close our event write", "err", err) 126 + return 127 + } 128 + 129 + lastWriteLk.Lock() 130 + lastWrite = time.Now() 131 + lastWriteLk.Unlock() 132 + } 133 + } 134 + } 135 + 136 + func strptr(s string) *string { return &s } 137 + 138 + // mockEvents streams a fixed, fabricated log sequence onto out, ignoring the 139 + // real pipeline/workflow inputs. Used to exercise the appview log viewer until 140 + // the real event source is wired up. 141 + func mockEvents(ctx context.Context, out chan<- tangled.CiPipelineSubscribeLogs_Event) { 142 + const wf = "build" 143 + now := func() string { return time.Now().UTC().Format(time.RFC3339) } 144 + 145 + send := func(ev tangled.CiPipelineSubscribeLogs_Event) bool { 146 + select { 147 + case out <- ev: 148 + return true 149 + case <-ctx.Done(): 150 + return false 151 + } 152 + } 153 + dataEv := func(step int64, content string) tangled.CiPipelineSubscribeLogs_Event { 154 + return tangled.CiPipelineSubscribeLogs_Event{Data: &tangled.CiPipelineSubscribeLogs_Data{ 155 + Step: step, Content: content, Stream: "stdout", Time: now(), Workflow: wf, 156 + }} 157 + } 158 + endEv := func(step int64) tangled.CiPipelineSubscribeLogs_Event { 159 + return tangled.CiPipelineSubscribeLogs_Event{Control: &tangled.CiPipelineSubscribeLogs_Control{ 160 + Status: strptr("end"), Step: step, Time: now(), Workflow: wf, 161 + }} 162 + } 163 + 164 + // system step: clone (collapsed in the viewer) 165 + if !send(tangled.CiPipelineSubscribeLogs_Event{Control: &tangled.CiPipelineSubscribeLogs_Control{ 166 + Status: strptr("start"), Kind: strptr("system"), Step: 1, 167 + Content: "clone repository", Command: strptr("git clone https://example.com/repo"), Time: now(), Workflow: wf, 168 + }}) { 169 + return 170 + } 171 + for _, line := range []string{"Cloning into 'repo'...\n", "remote: Enumerating objects\n", "\x1b[32mClone complete\x1b[0m\n"} { 172 + if !send(dataEv(1, line)) { 173 + return 174 + } 175 + } 176 + if !send(endEv(1)) { 177 + return 178 + } 179 + 180 + // user step: test — stream a counter line every second until the client leaves 181 + if !send(tangled.CiPipelineSubscribeLogs_Event{Control: &tangled.CiPipelineSubscribeLogs_Control{ 182 + Status: strptr("start"), Kind: strptr("user"), Step: 2, 183 + Content: "run tests", Command: strptr("go test ./..."), Time: now(), Workflow: wf, 184 + }}) { 185 + return 186 + } 187 + ticker := time.NewTicker(time.Second) 188 + defer ticker.Stop() 189 + for i := 1; ; i++ { 190 + select { 191 + case <-ctx.Done(): 192 + return 193 + case <-ticker.C: 194 + if !send(dataEv(2, fmt.Sprintf("test log line %d\n", i))) { 195 + return 196 + } 197 + } 198 + } 199 + }
+10
spindle/xrpc/xrpc.go
··· 48 48 49 49 // service query endpoints (no auth required) 50 50 r.Get("/"+tangled.OwnerNSID, x.Owner) 51 + r.Get("/"+tangled.CiPipelineSubscribeLogsNSID, x.HandleCiPipelineSubscribeLogs) 51 52 52 53 return r 53 54 } ··· 60 61 w.WriteHeader(status) 61 62 json.NewEncoder(w).Encode(e) 62 63 } 64 + 65 + func writeJson(w http.ResponseWriter, status int, response any) error { 66 + w.Header().Set("Content-Type", "application/json") 67 + w.WriteHeader(status) 68 + if err := json.NewEncoder(w).Encode(response); err != nil { 69 + return err 70 + } 71 + return nil 72 + }