Monorepo for Tangled
tangled.org
1package lexutil
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "net/http/httptest"
10 "strconv"
11 "sync"
12 "sync/atomic"
13 "testing"
14 "time"
15
16 "github.com/bluesky-social/indigo/events"
17 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
18 "github.com/gorilla/websocket"
19 cid "github.com/ipfs/go-cid"
20 "github.com/stretchr/testify/assert"
21 cbg "github.com/whyrusleeping/cbor-gen"
22 xerrors "golang.org/x/xerrors"
23)
24
25type SubscribeExample_Foo struct {
26 Seq int64
27 Foo string
28}
29
30type SubscribeExample_Bar struct {
31 Seq int64
32 Bar string
33}
34
35type SubscribeExample_Event struct {
36 Error *events.ErrorFrame
37 Foo *SubscribeExample_Foo
38 Bar *SubscribeExample_Bar
39}
40
41func (e *SubscribeExample_Event) seq() int64 {
42 switch {
43 case e.Foo != nil:
44 return e.Foo.Seq
45 case e.Bar != nil:
46 return e.Bar.Seq
47 default:
48 return 0
49 }
50}
51
52func (e *SubscribeExample_Event) Serialize(w io.Writer) error {
53 cw := cbg.NewCborWriter(w)
54 header := events.EventHeader{Op: events.EvtKindMessage}
55
56 switch {
57 case e.Error != nil:
58 header.Op = events.EvtKindErrorFrame
59 if err := header.MarshalCBOR(cw); err != nil {
60 return err
61 }
62 return e.Error.MarshalCBOR(cw)
63 case e.Foo != nil:
64 header.MsgType = "#foo"
65 if err := header.MarshalCBOR(cw); err != nil {
66 return err
67 }
68 return e.Foo.MarshalCBOR(cw)
69 case e.Bar != nil:
70 header.MsgType = "#bar"
71 if err := header.MarshalCBOR(cw); err != nil {
72 return err
73 }
74 return e.Bar.MarshalCBOR(cw)
75 default:
76 return fmt.Errorf("unrecognized event kind")
77 }
78}
79
80func (e *SubscribeExample_Event) Deserialize(r io.Reader) error {
81 var header events.EventHeader
82 if err := header.UnmarshalCBOR(r); err != nil {
83 return fmt.Errorf("reading header: %w", err)
84 }
85 switch header.Op {
86 case events.EvtKindMessage:
87 switch header.MsgType {
88 case "#foo":
89 var evt SubscribeExample_Foo
90 if err := evt.UnmarshalCBOR(r); err != nil {
91 return err
92 }
93 e.Foo = &evt
94 case "#bar":
95 var evt SubscribeExample_Bar
96 if err := evt.UnmarshalCBOR(r); err != nil {
97 return err
98 }
99 e.Bar = &evt
100 default:
101 return fmt.Errorf("unknown message type: %s", header.MsgType)
102 }
103 case events.EvtKindErrorFrame:
104 var errframe events.ErrorFrame
105 if err := errframe.UnmarshalCBOR(r); err != nil {
106 return err
107 }
108 e.Error = &errframe
109 default:
110 return fmt.Errorf("unrecognized event stream type: %d", header.Op)
111 }
112 return nil
113}
114
115func (t *SubscribeExample_Foo) MarshalCBOR(w io.Writer) error {
116 if t == nil {
117 _, err := w.Write(cbg.CborNull)
118 return err
119 }
120
121 cw := cbg.NewCborWriter(w)
122
123 if _, err := cw.Write([]byte{162}); err != nil {
124 return err
125 }
126
127 // t.Foo (string) (string)
128 if len("foo") > 1000000 {
129 return xerrors.Errorf("Value in field \"foo\" was too long")
130 }
131
132 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("foo"))); err != nil {
133 return err
134 }
135 if _, err := cw.WriteString(string("foo")); err != nil {
136 return err
137 }
138
139 if len(t.Foo) > 1000000 {
140 return xerrors.Errorf("Value in field t.Foo was too long")
141 }
142
143 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Foo))); err != nil {
144 return err
145 }
146 if _, err := cw.WriteString(string(t.Foo)); err != nil {
147 return err
148 }
149
150 // t.Seq (int64) (int64)
151 if len("seq") > 1000000 {
152 return xerrors.Errorf("Value in field \"seq\" was too long")
153 }
154
155 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
156 return err
157 }
158 if _, err := cw.WriteString(string("seq")); err != nil {
159 return err
160 }
161
162 if t.Seq >= 0 {
163 if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
164 return err
165 }
166 } else {
167 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
168 return err
169 }
170 }
171 return nil
172}
173
174func (t *SubscribeExample_Foo) UnmarshalCBOR(r io.Reader) (err error) {
175 *t = SubscribeExample_Foo{}
176
177 cr := cbg.NewCborReader(r)
178
179 maj, extra, err := cr.ReadHeader()
180 if err != nil {
181 return err
182 }
183 defer func() {
184 if err == io.EOF {
185 err = io.ErrUnexpectedEOF
186 }
187 }()
188
189 if maj != cbg.MajMap {
190 return fmt.Errorf("cbor input should be of type map")
191 }
192
193 if extra > cbg.MaxLength {
194 return fmt.Errorf("SubscribeExample_Foo: map struct too large (%d)", extra)
195 }
196
197 n := extra
198
199 nameBuf := make([]byte, 8)
200 for range n {
201 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
202 if err != nil {
203 return err
204 }
205
206 if !ok {
207 // Field doesn't exist on this type, so ignore it
208 if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
209 return err
210 }
211 continue
212 }
213
214 switch string(nameBuf[:nameLen]) {
215 // t.Foo (string) (string)
216 case "foo":
217
218 {
219 sval, err := cbg.ReadStringWithMax(cr, 1000000)
220 if err != nil {
221 return err
222 }
223
224 t.Foo = string(sval)
225 }
226 // t.Seq (int64) (int64)
227 case "seq":
228 {
229 maj, extra, err := cr.ReadHeader()
230 if err != nil {
231 return err
232 }
233 var extraI int64
234 switch maj {
235 case cbg.MajUnsignedInt:
236 extraI = int64(extra)
237 if extraI < 0 {
238 return fmt.Errorf("int64 positive overflow")
239 }
240 case cbg.MajNegativeInt:
241 extraI = int64(extra)
242 if extraI < 0 {
243 return fmt.Errorf("int64 negative overflow")
244 }
245 extraI = -1 - extraI
246 default:
247 return fmt.Errorf("wrong type for int64 field: %d", maj)
248 }
249
250 t.Seq = int64(extraI)
251 }
252
253 default:
254 // Field doesn't exist on this type, so ignore it
255 if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
256 return err
257 }
258 }
259 }
260
261 return nil
262}
263
264func (t *SubscribeExample_Bar) MarshalCBOR(w io.Writer) error {
265 if t == nil {
266 _, err := w.Write(cbg.CborNull)
267 return err
268 }
269
270 cw := cbg.NewCborWriter(w)
271
272 if _, err := cw.Write([]byte{162}); err != nil {
273 return err
274 }
275
276 // t.Bar (string) (string)
277 if len("bar") > 1000000 {
278 return xerrors.Errorf("Value in field \"bar\" was too long")
279 }
280
281 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("bar"))); err != nil {
282 return err
283 }
284 if _, err := cw.WriteString(string("bar")); err != nil {
285 return err
286 }
287
288 if len(t.Bar) > 1000000 {
289 return xerrors.Errorf("Value in field t.Bar was too long")
290 }
291
292 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Bar))); err != nil {
293 return err
294 }
295 if _, err := cw.WriteString(string(t.Bar)); err != nil {
296 return err
297 }
298
299 // t.Seq (int64) (int64)
300 if len("seq") > 1000000 {
301 return xerrors.Errorf("Value in field \"seq\" was too long")
302 }
303
304 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
305 return err
306 }
307 if _, err := cw.WriteString(string("seq")); err != nil {
308 return err
309 }
310
311 if t.Seq >= 0 {
312 if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
313 return err
314 }
315 } else {
316 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
317 return err
318 }
319 }
320 return nil
321}
322
323func (t *SubscribeExample_Bar) UnmarshalCBOR(r io.Reader) (err error) {
324 *t = SubscribeExample_Bar{}
325
326 cr := cbg.NewCborReader(r)
327
328 maj, extra, err := cr.ReadHeader()
329 if err != nil {
330 return err
331 }
332 defer func() {
333 if err == io.EOF {
334 err = io.ErrUnexpectedEOF
335 }
336 }()
337
338 if maj != cbg.MajMap {
339 return fmt.Errorf("cbor input should be of type map")
340 }
341
342 if extra > cbg.MaxLength {
343 return fmt.Errorf("SubscribeExample_Bar: map struct too large (%d)", extra)
344 }
345
346 n := extra
347
348 nameBuf := make([]byte, 8)
349 for range n {
350 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
351 if err != nil {
352 return err
353 }
354
355 if !ok {
356 // Field doesn't exist on this type, so ignore it
357 if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
358 return err
359 }
360 continue
361 }
362
363 switch string(nameBuf[:nameLen]) {
364 // t.Bar (string) (string)
365 case "bar":
366
367 {
368 sval, err := cbg.ReadStringWithMax(cr, 1000000)
369 if err != nil {
370 return err
371 }
372
373 t.Bar = string(sval)
374 }
375 // t.Seq (int64) (int64)
376 case "seq":
377 {
378 maj, extra, err := cr.ReadHeader()
379 if err != nil {
380 return err
381 }
382 var extraI int64
383 switch maj {
384 case cbg.MajUnsignedInt:
385 extraI = int64(extra)
386 if extraI < 0 {
387 return fmt.Errorf("int64 positive overflow")
388 }
389 case cbg.MajNegativeInt:
390 extraI = int64(extra)
391 if extraI < 0 {
392 return fmt.Errorf("int64 negative overflow")
393 }
394 extraI = -1 - extraI
395 default:
396 return fmt.Errorf("wrong type for int64 field: %d", maj)
397 }
398
399 t.Seq = int64(extraI)
400 }
401
402 default:
403 // Field doesn't exist on this type, so ignore it
404 if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
405 return err
406 }
407 }
408 }
409
410 return nil
411}
412
413type testScheduler struct {
414 ch chan *SubscribeExample_Event
415 lastSeq atomic.Int64
416}
417
418var _ SeqScheduler[SubscribeExample_Event] = (*testScheduler)(nil)
419
420func newTestScheduler() *testScheduler {
421 return &testScheduler{ch: make(chan *SubscribeExample_Event, 8)}
422}
423
424func (s *testScheduler) AddWork(ctx context.Context, namespace string, val *SubscribeExample_Event) error {
425 s.lastSeq.Store(val.seq())
426 select {
427 case s.ch <- val:
428 case <-ctx.Done():
429 }
430 return nil
431}
432
433func (s *testScheduler) Shutdown() {}
434
435func (s *testScheduler) LastSeq() int64 { return s.lastSeq.Load() }
436
437// recv returns the next scheduled event. The timeout is generous because a
438// redial during the offline window backs off >=5s.
439func (s *testScheduler) recv(t *testing.T) *SubscribeExample_Event {
440 t.Helper()
441 select {
442 case e := <-s.ch:
443 return e
444 case <-time.After(12 * time.Second):
445 t.Fatal("timed out waiting for an event")
446 return nil
447 }
448}
449
450type testRedialer struct {
451 sched *testScheduler
452}
453
454var _ Redialer = (*testRedialer)(nil)
455
456func (r *testRedialer) Process(ctx context.Context, cr *cbg.CborReader) error {
457 var evt SubscribeExample_Event
458 if err := evt.Deserialize(cr); err != nil {
459 return err
460 }
461 return r.sched.AddWork(ctx, "", &evt)
462}
463
464func (r *testRedialer) UpdateParams(ctx context.Context, params map[string]any) bool {
465 last := r.sched.LastSeq()
466 if last == 0 {
467 return false
468 }
469 params["cursor"] = last
470 return true
471}
472
473const testEndpoint = "com.example.subscribeExample"
474
475// testServer live-tails an append-only event log over a websocket: each
476// connection streams events whose seq exceeds the requested cursor, including
477// ones added after it opened. The httptest listener stays bound the whole time
478// (so the port can't be taken over); Close/Start toggle offline by having the
479// handler answer 404, which fails the client's websocket handshake and drives
480// it into its redial/backoff loop.
481type testServer struct {
482 URL string
483
484 mu sync.Mutex
485 events []SubscribeExample_Event
486 conns map[*websocket.Conn]struct{}
487 serving bool
488}
489
490func newTestServer(t *testing.T) *testServer {
491 t.Helper()
492 ts := &testServer{
493 conns: make(map[*websocket.Conn]struct{}),
494 serving: true,
495 }
496 srv := httptest.NewServer(ts.handler())
497 ts.URL = srv.URL
498 t.Cleanup(func() {
499 ts.Close() // drop hijacked conns before srv.Close so it won't block
500 srv.Close()
501 })
502 return ts
503}
504
505func (ts *testServer) handler() http.Handler {
506 up := websocket.Upgrader{}
507 mux := http.NewServeMux()
508 mux.HandleFunc("/xrpc/"+testEndpoint, func(w http.ResponseWriter, r *http.Request) {
509 if !ts.isServing() {
510 http.Error(w, "offline", http.StatusNotFound)
511 return
512 }
513
514 var cursor int64
515 if c := r.URL.Query().Get("cursor"); c != "" {
516 cursor, _ = strconv.ParseInt(c, 10, 64)
517 }
518
519 conn, err := up.Upgrade(w, r, nil)
520 if err != nil {
521 return
522 }
523
524 ts.mu.Lock()
525 ts.conns[conn] = struct{}{}
526 ts.mu.Unlock()
527 defer func() {
528 ts.mu.Lock()
529 delete(ts.conns, conn)
530 ts.mu.Unlock()
531 conn.Close()
532 }()
533
534 sent := cursor
535 for {
536 ts.mu.Lock()
537 var batch []SubscribeExample_Event
538 for _, e := range ts.events {
539 if e.seq() > sent {
540 batch = append(batch, e)
541 }
542 }
543 ts.mu.Unlock()
544
545 for i := range batch {
546 if err := func(conn *websocket.Conn, e *SubscribeExample_Event) error {
547 var buf bytes.Buffer
548 if err := e.Serialize(&buf); err != nil {
549 return err
550 }
551 return conn.WriteMessage(websocket.BinaryMessage, buf.Bytes())
552 }(conn, &batch[i]); err != nil {
553 return
554 }
555 sent = batch[i].seq()
556 }
557 time.Sleep(5 * time.Millisecond)
558 }
559 })
560 return mux
561}
562
563func (ts *testServer) isServing() bool {
564 ts.mu.Lock()
565 defer ts.mu.Unlock()
566 return ts.serving
567}
568
569// Start brings the server back online.
570func (ts *testServer) Start() {
571 ts.mu.Lock()
572 ts.serving = true
573 ts.mu.Unlock()
574}
575
576// Close takes the server offline: new handshakes get 404 and active websocket
577// connections are dropped, forcing the subscription to redial.
578func (ts *testServer) Close() {
579 ts.mu.Lock()
580 ts.serving = false
581 conns := make([]*websocket.Conn, 0, len(ts.conns))
582 for c := range ts.conns {
583 conns = append(conns, c)
584 }
585 ts.mu.Unlock()
586
587 for _, c := range conns {
588 c.Close()
589 }
590}
591
592func (ts *testServer) AddEvent(e SubscribeExample_Event) {
593 ts.mu.Lock()
594 ts.events = append(ts.events, e)
595 ts.mu.Unlock()
596}
597
598func waitForReturn(t *testing.T, done <-chan error) {
599 t.Helper()
600 select {
601 case <-done:
602 case <-time.After(5 * time.Second):
603 t.Fatal("subscription did not return after context cancel")
604 }
605}
606
607func TestLexSubscribe_ConsumesAndSchedules(t *testing.T) {
608 srv := newTestServer(t)
609 defer srv.Close()
610
611 c := &Client{Client: indigoxrpc.Client{Host: srv.URL}}
612 sched := newTestScheduler()
613
614 process := func(ctx context.Context, cr *cbg.CborReader) error {
615 var evt SubscribeExample_Event
616 if err := evt.Deserialize(cr); err != nil {
617 return err
618 }
619 return sched.AddWork(ctx, "", &evt)
620 }
621
622 ctx, cancel := context.WithCancel(context.Background())
623 done := make(chan error, 1)
624 go func() {
625 done <- c.LexSubscribe(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, process)
626 }()
627
628 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}})
629 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}})
630 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t))
631 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t))
632
633 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}})
634 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t))
635
636 cancel()
637 waitForReturn(t, done)
638}
639
640func TestLexSubscribeWithRedialer_ConsumesAndSchedules(t *testing.T) {
641 srv := newTestServer(t)
642 defer srv.Close()
643
644 c := &Client{Client: indigoxrpc.Client{Host: srv.URL}}
645 sched := newTestScheduler()
646 redialer := &testRedialer{sched: sched}
647
648 ctx, cancel := context.WithCancel(context.Background())
649 done := make(chan error, 1)
650 go func() {
651 done <- c.LexSubscribeWithRedialer(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, redialer)
652 }()
653
654 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}})
655 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}})
656 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t))
657 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t))
658
659 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}})
660 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}})
661 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t))
662 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}, sched.recv(t))
663
664 cancel()
665 waitForReturn(t, done)
666}
667
668func TestLexSubscribeWithRedialer_HandlesDowntime(t *testing.T) {
669 srv := newTestServer(t)
670 defer srv.Close()
671
672 c := &Client{Client: indigoxrpc.Client{Host: srv.URL}}
673 sched := newTestScheduler()
674 redialer := &testRedialer{sched: sched}
675
676 ctx, cancel := context.WithCancel(context.Background())
677 done := make(chan error, 1)
678 go func() {
679 done <- c.LexSubscribeWithRedialer(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, redialer)
680 }()
681
682 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}})
683 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}})
684 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t))
685 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t))
686
687 // offline, add events, back online: the subscription redials and resumes
688 srv.Close()
689
690 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}})
691 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}})
692
693 srv.Start()
694
695 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t))
696 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}, sched.recv(t))
697
698 cancel()
699 waitForReturn(t, done)
700}