Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

api/tangled,lexutil: add missing types for DRISL-CBOR event streaming

indigo's `lexutil.Client` doesn't support subscription xrpc methods. New
`extlexutil.Client` extends the `LexDo` method to support subscription.
It won't perform redialing since we don't know which param is cursor and
which property is event sequence.

since lexgen doesn't generate code subscription xrpc methods, we hand
write it. `api/tangled/*_ext.go` files will be treated as non-generated
code.

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit 43b5510a parent 9772ea71 change-id nrzosuqq
+381 -2
+1
.gitattributes
··· 1 1 api/tangled/** linguist-generated -diff 2 + api/tangled/*_ext.go -linguist-generated diff 2 3 flake.lock -diff
+98
api/tangled/pipelinesubscribeLogs_ext.go
··· 1 + // extending code generated from sh.tangled.ci.pipeline.subscribeLogs 2 + 3 + package tangled 4 + 5 + import ( 6 + "context" 7 + "fmt" 8 + "io" 9 + 10 + "github.com/bluesky-social/indigo/events" 11 + lexutil "github.com/bluesky-social/indigo/lex/util" 12 + cbg "github.com/whyrusleeping/cbor-gen" 13 + extlexutil "tangled.org/core/lexutil" 14 + ) 15 + 16 + // TODO: generate codes below from lexicon 17 + type CiPipelineSubscribeLogs_Event struct { 18 + Error *events.ErrorFrame 19 + Control *CiPipelineSubscribeLogs_Control 20 + Data *CiPipelineSubscribeLogs_Data 21 + 22 + // some private fields for internal routing perf 23 + Preserialized []byte `json:"-" cborgen:"-"` 24 + } 25 + 26 + func (xevt *CiPipelineSubscribeLogs_Event) Serialize(wc io.Writer) error { 27 + header := events.EventHeader{Op: events.EvtKindMessage} 28 + var obj lexutil.CBOR 29 + 30 + switch { 31 + case xevt.Error != nil: 32 + header.Op = events.EvtKindErrorFrame 33 + obj = xevt.Error 34 + case xevt.Control != nil: 35 + header.MsgType = "#control" 36 + obj = xevt.Control 37 + case xevt.Data != nil: 38 + header.MsgType = "#data" 39 + obj = xevt.Data 40 + default: 41 + return fmt.Errorf("unrecognized event kind") 42 + } 43 + 44 + cborWriter := cbg.NewCborWriter(wc) 45 + if err := header.MarshalCBOR(cborWriter); err != nil { 46 + return fmt.Errorf("failed to write header: %w", err) 47 + } 48 + return obj.MarshalCBOR(cborWriter) 49 + } 50 + 51 + func (xevt *CiPipelineSubscribeLogs_Event) Deserialize(r io.Reader) error { 52 + var header events.EventHeader 53 + if err := header.UnmarshalCBOR(r); err != nil { 54 + return fmt.Errorf("reading header: %w", err) 55 + } 56 + switch header.Op { 57 + case events.EvtKindMessage: 58 + switch header.MsgType { 59 + case "#control": 60 + var evt CiPipelineSubscribeLogs_Control 61 + if err := evt.UnmarshalCBOR(r); err != nil { 62 + return fmt.Errorf("reading repoCommit event: %w", err) 63 + } 64 + xevt.Control = &evt 65 + case "#data": 66 + var evt CiPipelineSubscribeLogs_Data 67 + if err := evt.UnmarshalCBOR(r); err != nil { 68 + return fmt.Errorf("reading repoSync event: %w", err) 69 + } 70 + xevt.Data = &evt 71 + } 72 + case events.EvtKindErrorFrame: 73 + var errframe events.ErrorFrame 74 + if err := errframe.UnmarshalCBOR(r); err != nil { 75 + return err 76 + } 77 + xevt.Error = &errframe 78 + default: 79 + return fmt.Errorf("unrecognized event stream type: %d", header.Op) 80 + } 81 + return nil 82 + } 83 + 84 + func CiPipelineSubscribeLogs(ctx context.Context, c extlexutil.LexClient, pipeline string, workflows []string, sched extlexutil.Scheduler[CiPipelineSubscribeLogs_Event]) error { 85 + defer sched.Shutdown() 86 + 87 + params := map[string]any{} 88 + params["pipeline"] = pipeline 89 + params["workflows"] = workflows 90 + 91 + return c.LexDo(ctx, extlexutil.Subscription, "", CiPipelineSubscribeLogsNSID, params, nil, func(ctx context.Context, cr *cbg.CborReader) error { 92 + var evt CiPipelineSubscribeLogs_Event 93 + if err := evt.Deserialize(cr); err != nil { 94 + return err 95 + } 96 + return sched.AddWork(ctx, "", &evt) 97 + }) 98 + }
+9 -2
flake.nix
··· 472 472 rootDir=$(jj --ignore-working-copy root || git rev-parse --show-toplevel) || (echo "error: can't find repo root?"; exit 1) 473 473 cd "$rootDir" 474 474 475 - rm -f api/tangled/* 475 + # *_ext.go are hand-written extensions; never remove or mutate them 476 + find api/tangled -maxdepth 1 -type f -not -name '*_ext.go' -delete 476 477 lexgen --build-file lexicon-build-config.json lexicons 477 - sed -i.bak 's/\tutil/\/\/\tutil/' api/tangled/* 478 + 479 + # disable type registration temporarily while running cborgen 480 + find api/tangled -maxdepth 1 -name '*.go' -not -name '*_ext.go' -exec \ 481 + sed -i.bak 's/\tutil/\/\/\tutil/' {} + 478 482 # lexgen generates incomplete Marshaler/Unmarshaler for union types 479 483 find api/tangled/*.go -not -name "cbor_gen.go" -exec \ 480 484 sed -i '/^func.*\(MarshalCBOR\|UnmarshalCBOR\)/,/^}/ s/^/\/\/ /' {} + 485 + for f in api/tangled/*_ext.go; do [ -e "''$f" ] && mv "''$f" "''$f.bak"; done 481 486 ${pkgs.gotools}/bin/goimports -w api/tangled/* 482 487 go run ./cmd/cborgen/ 488 + for f in api/tangled/*_ext.go.bak; do [ -e "''$f" ] && mv "''$f" "''${f%.bak}"; done 489 + 483 490 lexgen --build-file lexicon-build-config.json lexicons 484 491 rm api/tangled/*.bak 485 492 '';
+224
lexutil/client.go
··· 1 + package lexutil 2 + 3 + import ( 4 + "cmp" 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + "net/http" 9 + "net/url" 10 + "time" 11 + 12 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 13 + "github.com/carlmjohnson/versioninfo" 14 + "github.com/gorilla/websocket" 15 + cbg "github.com/whyrusleeping/cbor-gen" 16 + ) 17 + 18 + type Client struct { 19 + indigoxrpc.Client 20 + Dialer websocket.Dialer 21 + Logger *slog.Logger 22 + } 23 + 24 + var _ LexClient = (*Client)(nil) 25 + 26 + func makeParams(p map[string]any) url.Values { 27 + params := url.Values{} 28 + for k, v := range p { 29 + if s, ok := v.([]string); ok { 30 + for _, v := range s { 31 + params.Add(k, v) 32 + } 33 + } else { 34 + params.Add(k, fmt.Sprint(v)) 35 + } 36 + } 37 + return params 38 + } 39 + 40 + type processFn func(ctx context.Context, cr *cbg.CborReader) error 41 + 42 + func (c *Client) LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error { 43 + switch method { 44 + case Subscription: 45 + if process, ok := out.(processFn); ok { 46 + return c.LexSubscribe(ctx, endpoint, params, process) 47 + } else if redialer, ok := out.(Redialer); ok { 48 + return c.LexSubscribeWithRedialer(ctx, endpoint, params, redialer) 49 + } else { 50 + return fmt.Errorf("unknown output type: %T", out) 51 + } 52 + default: 53 + return c.Client.LexDo(ctx, method, inputEncoding, endpoint, params, bodyData, out) 54 + } 55 + } 56 + 57 + func (c *Client) getHeader() http.Header { 58 + header := http.Header{} 59 + if c.UserAgent != nil { 60 + header.Set("User-Agent", *c.UserAgent) 61 + } else { 62 + header.Set("User-Agent", "extlexutil/"+versioninfo.Short()) 63 + } 64 + if c.Headers != nil { 65 + for k, v := range c.Headers { 66 + header.Set(k, v) 67 + } 68 + } 69 + return header 70 + } 71 + 72 + func (c *Client) LexSubscribe(ctx context.Context, endpoint string, params map[string]any, process func(ctx context.Context, cr *cbg.CborReader) error) error { 73 + logger := cmp.Or(c.Logger, slog.Default().With("system", "events")) 74 + rurl, err := url.Parse(c.Host) 75 + if err != nil { 76 + return err 77 + } 78 + if rurl.Scheme == "http" { 79 + rurl.Scheme = "ws" 80 + } else { 81 + rurl.Scheme = "wss" 82 + } 83 + surl := rurl.JoinPath("/xrpc", endpoint) 84 + surl.RawQuery = makeParams(params).Encode() 85 + 86 + header := c.getHeader() 87 + 88 + u := surl.String() 89 + conn, resp, err := c.Dialer.DialContext(ctx, u, header) 90 + if err != nil { 91 + return fmt.Errorf("%w: %w", ErrDialFailure, err) 92 + } 93 + 94 + logger.Debug("event subscription response", "code", resp.StatusCode, "url", u) 95 + 96 + return c.handleConn(ctx, conn, process) 97 + } 98 + 99 + func (c *Client) LexSubscribeWithRedialer(ctx context.Context, endpoint string, params map[string]any, redialer Redialer) error { 100 + logger := cmp.Or(c.Logger, slog.Default().With("system", "events")) 101 + rurl, err := url.Parse(c.Host) 102 + if err != nil { 103 + return err 104 + } 105 + if rurl.Scheme == "http" { 106 + rurl.Scheme = "ws" 107 + } else { 108 + rurl.Scheme = "wss" 109 + } 110 + surl := rurl.JoinPath("/xrpc", endpoint) 111 + 112 + header := c.getHeader() 113 + 114 + var backoff int 115 + for { 116 + select { 117 + case <-ctx.Done(): 118 + return ctx.Err() 119 + default: 120 + } 121 + 122 + surl.RawQuery = makeParams(params).Encode() 123 + 124 + u := surl.String() 125 + conn, resp, err := c.Dialer.DialContext(ctx, u, header) 126 + if err != nil { 127 + logger.Warn("dialing failed", "err", err, "backoff", backoff) 128 + time.Sleep(time.Duration(5+backoff) * time.Second) 129 + backoff++ 130 + 131 + if backoff > 15 { 132 + return fmt.Errorf("%w: %w", ErrDialFailure, err) 133 + } 134 + 135 + continue 136 + } 137 + 138 + logger.Debug("event subscription response", "code", resp.StatusCode, "url", u) 139 + 140 + if err := c.handleConn(ctx, conn, redialer.Process); err != nil { 141 + logger.Warn("host connection failed", "err", err, "backoff", backoff) 142 + } 143 + 144 + // updates cursor & backoff 145 + updated := redialer.UpdateParams(ctx, params) 146 + if updated { 147 + backoff = 0 148 + } 149 + } 150 + } 151 + 152 + func (c *Client) handleConn(ctx context.Context, conn *websocket.Conn, process func(ctx context.Context, cr *cbg.CborReader) error) error { 153 + logger := cmp.Or(c.Logger, slog.Default().With("system", "events")) 154 + ctx, cancel := context.WithCancel(ctx) 155 + defer cancel() 156 + 157 + go func() { 158 + t := time.NewTicker(time.Second * 30) 159 + defer t.Stop() 160 + failcount := 0 161 + 162 + for { 163 + 164 + select { 165 + case <-t.C: 166 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 167 + logger.Warn("failed to ping", "err", err) 168 + failcount++ 169 + if failcount >= 4 { 170 + logger.Error("too many ping fails", "count", failcount) 171 + conn.Close() 172 + return 173 + } 174 + } else { 175 + failcount = 0 // ok ping 176 + } 177 + case <-ctx.Done(): 178 + conn.Close() 179 + return 180 + } 181 + } 182 + }() 183 + 184 + conn.SetPingHandler(func(message string) error { 185 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 186 + if err == websocket.ErrCloseSent { 187 + return nil 188 + } 189 + return err 190 + }) 191 + 192 + conn.SetPongHandler(func(_ string) error { 193 + if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 194 + logger.Error("failed to set read deadline", "err", err) 195 + } 196 + 197 + return nil 198 + }) 199 + 200 + cr := new(cbg.CborReader) 201 + 202 + for { 203 + select { 204 + case <-ctx.Done(): 205 + return ctx.Err() 206 + default: 207 + } 208 + 209 + mt, rawReader, err := conn.NextReader() 210 + if err != nil { 211 + return fmt.Errorf("conn err at read: %w", err) 212 + } 213 + 214 + if mt != websocket.BinaryMessage { 215 + return fmt.Errorf("expected binary message from subscription endpoint") 216 + } 217 + 218 + cr.SetReader(rawReader) 219 + 220 + if err := process(ctx, cr); err != nil { 221 + return err 222 + } 223 + } 224 + }
+49
lexutil/lexutil.go
··· 1 + // extended version of indigo/lex/util package before upstreaming it 2 + package lexutil 3 + 4 + import ( 5 + "context" 6 + "errors" 7 + "io" 8 + 9 + lexutil "github.com/bluesky-social/indigo/lex/util" 10 + cbg "github.com/whyrusleeping/cbor-gen" 11 + ) 12 + 13 + type LexClient interface { 14 + lexutil.LexClient 15 + // LexSubscribe is basic event subscriber without redialing logic 16 + // 17 + // golang doesnt allow generics in method so we have to pass raw processFn here instead of Scheduler[T] 18 + LexSubscribe(ctx context.Context, endpoint string, params map[string]any, process func(ctx context.Context, cr *cbg.CborReader) error) error 19 + } 20 + 21 + const Subscription = "subscription" 22 + 23 + var ( 24 + ErrDialFailure = errors.New("dialing failed") 25 + ErrConnFailure = errors.New("connection failed") 26 + ) 27 + 28 + type EventStreamMessage interface { 29 + Serialize(wc io.Writer) error 30 + Deserialize(r io.Reader) error 31 + } 32 + 33 + type Scheduler[T any] interface { 34 + AddWork(ctx context.Context, namespace string, val *T) error 35 + Shutdown() 36 + } 37 + 38 + type SeqScheduler[T any] interface { 39 + Scheduler[T] 40 + LastSeq() int64 41 + } 42 + 43 + type Redialer interface { 44 + // Process decodes the raw message and schedule it 45 + Process(ctx context.Context, cr *cbg.CborReader) error 46 + 47 + // UpdateParams increments the cursor parameter based on LastSeq stored in internal scheduler 48 + UpdateParams(ctx context.Context, params map[string]any) (updated bool) 49 + }