Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1package lexutil 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "net/http/httptest" 10 "strconv" 11 "sync" 12 "sync/atomic" 13 "testing" 14 "time" 15 16 "github.com/bluesky-social/indigo/events" 17 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 18 "github.com/gorilla/websocket" 19 cid "github.com/ipfs/go-cid" 20 "github.com/stretchr/testify/assert" 21 cbg "github.com/whyrusleeping/cbor-gen" 22 xerrors "golang.org/x/xerrors" 23) 24 25type SubscribeExample_Foo struct { 26 Seq int64 27 Foo string 28} 29 30type SubscribeExample_Bar struct { 31 Seq int64 32 Bar string 33} 34 35type SubscribeExample_Event struct { 36 Error *events.ErrorFrame 37 Foo *SubscribeExample_Foo 38 Bar *SubscribeExample_Bar 39} 40 41func (e *SubscribeExample_Event) seq() int64 { 42 switch { 43 case e.Foo != nil: 44 return e.Foo.Seq 45 case e.Bar != nil: 46 return e.Bar.Seq 47 default: 48 return 0 49 } 50} 51 52func (e *SubscribeExample_Event) Serialize(w io.Writer) error { 53 cw := cbg.NewCborWriter(w) 54 header := events.EventHeader{Op: events.EvtKindMessage} 55 56 switch { 57 case e.Error != nil: 58 header.Op = events.EvtKindErrorFrame 59 if err := header.MarshalCBOR(cw); err != nil { 60 return err 61 } 62 return e.Error.MarshalCBOR(cw) 63 case e.Foo != nil: 64 header.MsgType = "#foo" 65 if err := header.MarshalCBOR(cw); err != nil { 66 return err 67 } 68 return e.Foo.MarshalCBOR(cw) 69 case e.Bar != nil: 70 header.MsgType = "#bar" 71 if err := header.MarshalCBOR(cw); err != nil { 72 return err 73 } 74 return e.Bar.MarshalCBOR(cw) 75 default: 76 return fmt.Errorf("unrecognized event kind") 77 } 78} 79 80func (e *SubscribeExample_Event) Deserialize(r io.Reader) error { 81 var header events.EventHeader 82 if err := header.UnmarshalCBOR(r); err != nil { 83 return fmt.Errorf("reading header: %w", err) 84 } 85 switch header.Op { 86 case events.EvtKindMessage: 87 switch header.MsgType { 88 case "#foo": 89 var evt SubscribeExample_Foo 90 if err := evt.UnmarshalCBOR(r); err != nil { 91 return err 92 } 93 e.Foo = &evt 94 case "#bar": 95 var evt SubscribeExample_Bar 96 if err := evt.UnmarshalCBOR(r); err != nil { 97 return err 98 } 99 e.Bar = &evt 100 default: 101 return fmt.Errorf("unknown message type: %s", header.MsgType) 102 } 103 case events.EvtKindErrorFrame: 104 var errframe events.ErrorFrame 105 if err := errframe.UnmarshalCBOR(r); err != nil { 106 return err 107 } 108 e.Error = &errframe 109 default: 110 return fmt.Errorf("unrecognized event stream type: %d", header.Op) 111 } 112 return nil 113} 114 115func (t *SubscribeExample_Foo) MarshalCBOR(w io.Writer) error { 116 if t == nil { 117 _, err := w.Write(cbg.CborNull) 118 return err 119 } 120 121 cw := cbg.NewCborWriter(w) 122 123 if _, err := cw.Write([]byte{162}); err != nil { 124 return err 125 } 126 127 // t.Foo (string) (string) 128 if len("foo") > 1000000 { 129 return xerrors.Errorf("Value in field \"foo\" was too long") 130 } 131 132 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("foo"))); err != nil { 133 return err 134 } 135 if _, err := cw.WriteString(string("foo")); err != nil { 136 return err 137 } 138 139 if len(t.Foo) > 1000000 { 140 return xerrors.Errorf("Value in field t.Foo was too long") 141 } 142 143 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Foo))); err != nil { 144 return err 145 } 146 if _, err := cw.WriteString(string(t.Foo)); err != nil { 147 return err 148 } 149 150 // t.Seq (int64) (int64) 151 if len("seq") > 1000000 { 152 return xerrors.Errorf("Value in field \"seq\" was too long") 153 } 154 155 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 156 return err 157 } 158 if _, err := cw.WriteString(string("seq")); err != nil { 159 return err 160 } 161 162 if t.Seq >= 0 { 163 if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 164 return err 165 } 166 } else { 167 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 168 return err 169 } 170 } 171 return nil 172} 173 174func (t *SubscribeExample_Foo) UnmarshalCBOR(r io.Reader) (err error) { 175 *t = SubscribeExample_Foo{} 176 177 cr := cbg.NewCborReader(r) 178 179 maj, extra, err := cr.ReadHeader() 180 if err != nil { 181 return err 182 } 183 defer func() { 184 if err == io.EOF { 185 err = io.ErrUnexpectedEOF 186 } 187 }() 188 189 if maj != cbg.MajMap { 190 return fmt.Errorf("cbor input should be of type map") 191 } 192 193 if extra > cbg.MaxLength { 194 return fmt.Errorf("SubscribeExample_Foo: map struct too large (%d)", extra) 195 } 196 197 n := extra 198 199 nameBuf := make([]byte, 8) 200 for range n { 201 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 202 if err != nil { 203 return err 204 } 205 206 if !ok { 207 // Field doesn't exist on this type, so ignore it 208 if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 209 return err 210 } 211 continue 212 } 213 214 switch string(nameBuf[:nameLen]) { 215 // t.Foo (string) (string) 216 case "foo": 217 218 { 219 sval, err := cbg.ReadStringWithMax(cr, 1000000) 220 if err != nil { 221 return err 222 } 223 224 t.Foo = string(sval) 225 } 226 // t.Seq (int64) (int64) 227 case "seq": 228 { 229 maj, extra, err := cr.ReadHeader() 230 if err != nil { 231 return err 232 } 233 var extraI int64 234 switch maj { 235 case cbg.MajUnsignedInt: 236 extraI = int64(extra) 237 if extraI < 0 { 238 return fmt.Errorf("int64 positive overflow") 239 } 240 case cbg.MajNegativeInt: 241 extraI = int64(extra) 242 if extraI < 0 { 243 return fmt.Errorf("int64 negative overflow") 244 } 245 extraI = -1 - extraI 246 default: 247 return fmt.Errorf("wrong type for int64 field: %d", maj) 248 } 249 250 t.Seq = int64(extraI) 251 } 252 253 default: 254 // Field doesn't exist on this type, so ignore it 255 if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 256 return err 257 } 258 } 259 } 260 261 return nil 262} 263 264func (t *SubscribeExample_Bar) MarshalCBOR(w io.Writer) error { 265 if t == nil { 266 _, err := w.Write(cbg.CborNull) 267 return err 268 } 269 270 cw := cbg.NewCborWriter(w) 271 272 if _, err := cw.Write([]byte{162}); err != nil { 273 return err 274 } 275 276 // t.Bar (string) (string) 277 if len("bar") > 1000000 { 278 return xerrors.Errorf("Value in field \"bar\" was too long") 279 } 280 281 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("bar"))); err != nil { 282 return err 283 } 284 if _, err := cw.WriteString(string("bar")); err != nil { 285 return err 286 } 287 288 if len(t.Bar) > 1000000 { 289 return xerrors.Errorf("Value in field t.Bar was too long") 290 } 291 292 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Bar))); err != nil { 293 return err 294 } 295 if _, err := cw.WriteString(string(t.Bar)); err != nil { 296 return err 297 } 298 299 // t.Seq (int64) (int64) 300 if len("seq") > 1000000 { 301 return xerrors.Errorf("Value in field \"seq\" was too long") 302 } 303 304 if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 305 return err 306 } 307 if _, err := cw.WriteString(string("seq")); err != nil { 308 return err 309 } 310 311 if t.Seq >= 0 { 312 if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 313 return err 314 } 315 } else { 316 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 317 return err 318 } 319 } 320 return nil 321} 322 323func (t *SubscribeExample_Bar) UnmarshalCBOR(r io.Reader) (err error) { 324 *t = SubscribeExample_Bar{} 325 326 cr := cbg.NewCborReader(r) 327 328 maj, extra, err := cr.ReadHeader() 329 if err != nil { 330 return err 331 } 332 defer func() { 333 if err == io.EOF { 334 err = io.ErrUnexpectedEOF 335 } 336 }() 337 338 if maj != cbg.MajMap { 339 return fmt.Errorf("cbor input should be of type map") 340 } 341 342 if extra > cbg.MaxLength { 343 return fmt.Errorf("SubscribeExample_Bar: map struct too large (%d)", extra) 344 } 345 346 n := extra 347 348 nameBuf := make([]byte, 8) 349 for range n { 350 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 351 if err != nil { 352 return err 353 } 354 355 if !ok { 356 // Field doesn't exist on this type, so ignore it 357 if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 358 return err 359 } 360 continue 361 } 362 363 switch string(nameBuf[:nameLen]) { 364 // t.Bar (string) (string) 365 case "bar": 366 367 { 368 sval, err := cbg.ReadStringWithMax(cr, 1000000) 369 if err != nil { 370 return err 371 } 372 373 t.Bar = string(sval) 374 } 375 // t.Seq (int64) (int64) 376 case "seq": 377 { 378 maj, extra, err := cr.ReadHeader() 379 if err != nil { 380 return err 381 } 382 var extraI int64 383 switch maj { 384 case cbg.MajUnsignedInt: 385 extraI = int64(extra) 386 if extraI < 0 { 387 return fmt.Errorf("int64 positive overflow") 388 } 389 case cbg.MajNegativeInt: 390 extraI = int64(extra) 391 if extraI < 0 { 392 return fmt.Errorf("int64 negative overflow") 393 } 394 extraI = -1 - extraI 395 default: 396 return fmt.Errorf("wrong type for int64 field: %d", maj) 397 } 398 399 t.Seq = int64(extraI) 400 } 401 402 default: 403 // Field doesn't exist on this type, so ignore it 404 if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 405 return err 406 } 407 } 408 } 409 410 return nil 411} 412 413type testScheduler struct { 414 ch chan *SubscribeExample_Event 415 lastSeq atomic.Int64 416} 417 418var _ SeqScheduler[SubscribeExample_Event] = (*testScheduler)(nil) 419 420func newTestScheduler() *testScheduler { 421 return &testScheduler{ch: make(chan *SubscribeExample_Event, 8)} 422} 423 424func (s *testScheduler) AddWork(ctx context.Context, namespace string, val *SubscribeExample_Event) error { 425 s.lastSeq.Store(val.seq()) 426 select { 427 case s.ch <- val: 428 case <-ctx.Done(): 429 } 430 return nil 431} 432 433func (s *testScheduler) Shutdown() {} 434 435func (s *testScheduler) LastSeq() int64 { return s.lastSeq.Load() } 436 437// recv returns the next scheduled event. The timeout is generous because a 438// redial during the offline window backs off >=5s. 439func (s *testScheduler) recv(t *testing.T) *SubscribeExample_Event { 440 t.Helper() 441 select { 442 case e := <-s.ch: 443 return e 444 case <-time.After(12 * time.Second): 445 t.Fatal("timed out waiting for an event") 446 return nil 447 } 448} 449 450type testRedialer struct { 451 sched *testScheduler 452} 453 454var _ Redialer = (*testRedialer)(nil) 455 456func (r *testRedialer) Process(ctx context.Context, cr *cbg.CborReader) error { 457 var evt SubscribeExample_Event 458 if err := evt.Deserialize(cr); err != nil { 459 return err 460 } 461 return r.sched.AddWork(ctx, "", &evt) 462} 463 464func (r *testRedialer) UpdateParams(ctx context.Context, params map[string]any) bool { 465 last := r.sched.LastSeq() 466 if last == 0 { 467 return false 468 } 469 params["cursor"] = last 470 return true 471} 472 473const testEndpoint = "com.example.subscribeExample" 474 475// testServer live-tails an append-only event log over a websocket: each 476// connection streams events whose seq exceeds the requested cursor, including 477// ones added after it opened. The httptest listener stays bound the whole time 478// (so the port can't be taken over); Close/Start toggle offline by having the 479// handler answer 404, which fails the client's websocket handshake and drives 480// it into its redial/backoff loop. 481type testServer struct { 482 URL string 483 484 mu sync.Mutex 485 events []SubscribeExample_Event 486 conns map[*websocket.Conn]struct{} 487 serving bool 488} 489 490func newTestServer(t *testing.T) *testServer { 491 t.Helper() 492 ts := &testServer{ 493 conns: make(map[*websocket.Conn]struct{}), 494 serving: true, 495 } 496 srv := httptest.NewServer(ts.handler()) 497 ts.URL = srv.URL 498 t.Cleanup(func() { 499 ts.Close() // drop hijacked conns before srv.Close so it won't block 500 srv.Close() 501 }) 502 return ts 503} 504 505func (ts *testServer) handler() http.Handler { 506 up := websocket.Upgrader{} 507 mux := http.NewServeMux() 508 mux.HandleFunc("/xrpc/"+testEndpoint, func(w http.ResponseWriter, r *http.Request) { 509 if !ts.isServing() { 510 http.Error(w, "offline", http.StatusNotFound) 511 return 512 } 513 514 var cursor int64 515 if c := r.URL.Query().Get("cursor"); c != "" { 516 cursor, _ = strconv.ParseInt(c, 10, 64) 517 } 518 519 conn, err := up.Upgrade(w, r, nil) 520 if err != nil { 521 return 522 } 523 524 ts.mu.Lock() 525 ts.conns[conn] = struct{}{} 526 ts.mu.Unlock() 527 defer func() { 528 ts.mu.Lock() 529 delete(ts.conns, conn) 530 ts.mu.Unlock() 531 conn.Close() 532 }() 533 534 sent := cursor 535 for { 536 ts.mu.Lock() 537 var batch []SubscribeExample_Event 538 for _, e := range ts.events { 539 if e.seq() > sent { 540 batch = append(batch, e) 541 } 542 } 543 ts.mu.Unlock() 544 545 for i := range batch { 546 if err := func(conn *websocket.Conn, e *SubscribeExample_Event) error { 547 var buf bytes.Buffer 548 if err := e.Serialize(&buf); err != nil { 549 return err 550 } 551 return conn.WriteMessage(websocket.BinaryMessage, buf.Bytes()) 552 }(conn, &batch[i]); err != nil { 553 return 554 } 555 sent = batch[i].seq() 556 } 557 time.Sleep(5 * time.Millisecond) 558 } 559 }) 560 return mux 561} 562 563func (ts *testServer) isServing() bool { 564 ts.mu.Lock() 565 defer ts.mu.Unlock() 566 return ts.serving 567} 568 569// Start brings the server back online. 570func (ts *testServer) Start() { 571 ts.mu.Lock() 572 ts.serving = true 573 ts.mu.Unlock() 574} 575 576// Close takes the server offline: new handshakes get 404 and active websocket 577// connections are dropped, forcing the subscription to redial. 578func (ts *testServer) Close() { 579 ts.mu.Lock() 580 ts.serving = false 581 conns := make([]*websocket.Conn, 0, len(ts.conns)) 582 for c := range ts.conns { 583 conns = append(conns, c) 584 } 585 ts.mu.Unlock() 586 587 for _, c := range conns { 588 c.Close() 589 } 590} 591 592func (ts *testServer) AddEvent(e SubscribeExample_Event) { 593 ts.mu.Lock() 594 ts.events = append(ts.events, e) 595 ts.mu.Unlock() 596} 597 598func waitForReturn(t *testing.T, done <-chan error) { 599 t.Helper() 600 select { 601 case <-done: 602 case <-time.After(5 * time.Second): 603 t.Fatal("subscription did not return after context cancel") 604 } 605} 606 607func TestLexSubscribe_ConsumesAndSchedules(t *testing.T) { 608 srv := newTestServer(t) 609 defer srv.Close() 610 611 c := &Client{Client: indigoxrpc.Client{Host: srv.URL}} 612 sched := newTestScheduler() 613 614 process := func(ctx context.Context, cr *cbg.CborReader) error { 615 var evt SubscribeExample_Event 616 if err := evt.Deserialize(cr); err != nil { 617 return err 618 } 619 return sched.AddWork(ctx, "", &evt) 620 } 621 622 ctx, cancel := context.WithCancel(context.Background()) 623 done := make(chan error, 1) 624 go func() { 625 done <- c.LexSubscribe(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, process) 626 }() 627 628 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}) 629 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}) 630 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t)) 631 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t)) 632 633 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}) 634 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t)) 635 636 cancel() 637 waitForReturn(t, done) 638} 639 640func TestLexSubscribeWithRedialer_ConsumesAndSchedules(t *testing.T) { 641 srv := newTestServer(t) 642 defer srv.Close() 643 644 c := &Client{Client: indigoxrpc.Client{Host: srv.URL}} 645 sched := newTestScheduler() 646 redialer := &testRedialer{sched: sched} 647 648 ctx, cancel := context.WithCancel(context.Background()) 649 done := make(chan error, 1) 650 go func() { 651 done <- c.LexSubscribeWithRedialer(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, redialer) 652 }() 653 654 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}) 655 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}) 656 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t)) 657 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t)) 658 659 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}) 660 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}) 661 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t)) 662 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}, sched.recv(t)) 663 664 cancel() 665 waitForReturn(t, done) 666} 667 668func TestLexSubscribeWithRedialer_HandlesDowntime(t *testing.T) { 669 srv := newTestServer(t) 670 defer srv.Close() 671 672 c := &Client{Client: indigoxrpc.Client{Host: srv.URL}} 673 sched := newTestScheduler() 674 redialer := &testRedialer{sched: sched} 675 676 ctx, cancel := context.WithCancel(context.Background()) 677 done := make(chan error, 1) 678 go func() { 679 done <- c.LexSubscribeWithRedialer(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, redialer) 680 }() 681 682 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}) 683 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}) 684 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t)) 685 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t)) 686 687 // offline, add events, back online: the subscription redials and resumes 688 srv.Close() 689 690 srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}) 691 srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}) 692 693 srv.Start() 694 695 assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t)) 696 assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}, sched.recv(t)) 697 698 cancel() 699 waitForReturn(t, done) 700}