Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

lexutil: add tests

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit b6110cea parent 43b5510a change-id rrnkzmsk
+700
+700
lexutil/client_test.go
··· 1 + package lexutil 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "net/http/httptest" 10 + "strconv" 11 + "sync" 12 + "sync/atomic" 13 + "testing" 14 + "time" 15 + 16 + "github.com/bluesky-social/indigo/events" 17 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 18 + "github.com/gorilla/websocket" 19 + cid "github.com/ipfs/go-cid" 20 + "github.com/stretchr/testify/assert" 21 + cbg "github.com/whyrusleeping/cbor-gen" 22 + xerrors "golang.org/x/xerrors" 23 + ) 24 + 25 + type SubscribeExample_Foo struct { 26 + Seq int64 27 + Foo string 28 + } 29 + 30 + type SubscribeExample_Bar struct { 31 + Seq int64 32 + Bar string 33 + } 34 + 35 + type SubscribeExample_Event struct { 36 + Error *events.ErrorFrame 37 + Foo *SubscribeExample_Foo 38 + Bar *SubscribeExample_Bar 39 + } 40 + 41 + func (e *SubscribeExample_Event) seq() int64 { 42 + switch { 43 + case e.Foo != nil: 44 + return e.Foo.Seq 45 + case e.Bar != nil: 46 + return e.Bar.Seq 47 + default: 48 + return 0 49 + } 50 + } 51 + 52 + func (e *SubscribeExample_Event) Serialize(w io.Writer) error { 53 + cw := cbg.NewCborWriter(w) 54 + header := events.EventHeader{Op: events.EvtKindMessage} 55 + 56 + switch { 57 + case e.Error != nil: 58 + header.Op = events.EvtKindErrorFrame 59 + if err := header.MarshalCBOR(cw); err != nil { 60 + return err 61 + } 62 + return e.Error.MarshalCBOR(cw) 63 + case e.Foo != nil: 64 + header.MsgType = "#foo" 65 + if err := header.MarshalCBOR(cw); err != nil { 66 + return err 67 + } 68 + return e.Foo.MarshalCBOR(cw) 69 + case e.Bar != nil: 70 + header.MsgType = "#bar" 71 + if err := header.MarshalCBOR(cw); err != nil { 72 + return err 73 + } 74 + return e.Bar.MarshalCBOR(cw) 75 + default: 76 + return fmt.Errorf("unrecognized event kind") 77 + } 78 + } 79 + 80 + func (e *SubscribeExample_Event) Deserialize(r io.Reader) error { 81 + var header events.EventHeader 82 + if err := header.UnmarshalCBOR(r); err != nil { 83 + return fmt.Errorf("reading header: %w", err) 84 + } 85 + switch header.Op { 86 + case events.EvtKindMessage: 87 + switch header.MsgType { 88 + case "#foo": 89 + var evt SubscribeExample_Foo 90 + if err := evt.UnmarshalCBOR(r); err != nil { 91 + return err 92 + } 93 + e.Foo = &evt 94 + case "#bar": 95 + var evt SubscribeExample_Bar 96 + if err := evt.UnmarshalCBOR(r); err != nil { 97 + return err 98 + } 99 + e.Bar = &evt 100 + default: 101 + return fmt.Errorf("unknown message type: %s", header.MsgType) 102 + } 103 + case events.EvtKindErrorFrame: 104 + var errframe events.ErrorFrame 105 + if err := errframe.UnmarshalCBOR(r); err != nil { 106 + return err 107 + } 108 + e.Error = &errframe 109 + default: 110 + return fmt.Errorf("unrecognized event stream type: %d", header.Op) 111 + } 112 + return nil 113 + } 114 + 115 + func (t *SubscribeExample_Foo) MarshalCBOR(w io.Writer) error { 116 + if t == nil { 117 + _, err := w.Write(cbg.CborNull) 118 + return err 119 + } 120 + 121 + cw := cbg.NewCborWriter(w) 122 + 123 + if _, err := cw.Write([]byte{162}); err != nil { 124 + return err 125 + } 126 + 127 + // t.Foo (string) (string) 128 + if len("foo") > 1000000 { 129 + return xerrors.Errorf("Value in field \"foo\" was too long") 130 + } 131 + 132 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("foo"))); err != nil { 133 + return err 134 + } 135 + if _, err := cw.WriteString(string("foo")); err != nil { 136 + return err 137 + } 138 + 139 + if len(t.Foo) > 1000000 { 140 + return xerrors.Errorf("Value in field t.Foo was too long") 141 + } 142 + 143 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Foo))); err != nil { 144 + return err 145 + } 146 + if _, err := cw.WriteString(string(t.Foo)); err != nil { 147 + return err 148 + } 149 + 150 + // t.Seq (int64) (int64) 151 + if len("seq") > 1000000 { 152 + return xerrors.Errorf("Value in field \"seq\" was too long") 153 + } 154 + 155 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 156 + return err 157 + } 158 + if _, err := cw.WriteString(string("seq")); err != nil { 159 + return err 160 + } 161 + 162 + if t.Seq >= 0 { 163 + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 164 + return err 165 + } 166 + } else { 167 + if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 168 + return err 169 + } 170 + } 171 + return nil 172 + } 173 + 174 + func (t *SubscribeExample_Foo) UnmarshalCBOR(r io.Reader) (err error) { 175 + *t = SubscribeExample_Foo{} 176 + 177 + cr := cbg.NewCborReader(r) 178 + 179 + maj, extra, err := cr.ReadHeader() 180 + if err != nil { 181 + return err 182 + } 183 + defer func() { 184 + if err == io.EOF { 185 + err = io.ErrUnexpectedEOF 186 + } 187 + }() 188 + 189 + if maj != cbg.MajMap { 190 + return fmt.Errorf("cbor input should be of type map") 191 + } 192 + 193 + if extra > cbg.MaxLength { 194 + return fmt.Errorf("SubscribeExample_Foo: map struct too large (%d)", extra) 195 + } 196 + 197 + n := extra 198 + 199 + nameBuf := make([]byte, 8) 200 + for range n { 201 + nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 202 + if err != nil { 203 + return err 204 + } 205 + 206 + if !ok { 207 + // Field doesn't exist on this type, so ignore it 208 + if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 209 + return err 210 + } 211 + continue 212 + } 213 + 214 + switch string(nameBuf[:nameLen]) { 215 + // t.Foo (string) (string) 216 + case "foo": 217 + 218 + { 219 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 220 + if err != nil { 221 + return err 222 + } 223 + 224 + t.Foo = string(sval) 225 + } 226 + // t.Seq (int64) (int64) 227 + case "seq": 228 + { 229 + maj, extra, err := cr.ReadHeader() 230 + if err != nil { 231 + return err 232 + } 233 + var extraI int64 234 + switch maj { 235 + case cbg.MajUnsignedInt: 236 + extraI = int64(extra) 237 + if extraI < 0 { 238 + return fmt.Errorf("int64 positive overflow") 239 + } 240 + case cbg.MajNegativeInt: 241 + extraI = int64(extra) 242 + if extraI < 0 { 243 + return fmt.Errorf("int64 negative overflow") 244 + } 245 + extraI = -1 - extraI 246 + default: 247 + return fmt.Errorf("wrong type for int64 field: %d", maj) 248 + } 249 + 250 + t.Seq = int64(extraI) 251 + } 252 + 253 + default: 254 + // Field doesn't exist on this type, so ignore it 255 + if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 256 + return err 257 + } 258 + } 259 + } 260 + 261 + return nil 262 + } 263 + 264 + func (t *SubscribeExample_Bar) MarshalCBOR(w io.Writer) error { 265 + if t == nil { 266 + _, err := w.Write(cbg.CborNull) 267 + return err 268 + } 269 + 270 + cw := cbg.NewCborWriter(w) 271 + 272 + if _, err := cw.Write([]byte{162}); err != nil { 273 + return err 274 + } 275 + 276 + // t.Bar (string) (string) 277 + if len("bar") > 1000000 { 278 + return xerrors.Errorf("Value in field \"bar\" was too long") 279 + } 280 + 281 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("bar"))); err != nil { 282 + return err 283 + } 284 + if _, err := cw.WriteString(string("bar")); err != nil { 285 + return err 286 + } 287 + 288 + if len(t.Bar) > 1000000 { 289 + return xerrors.Errorf("Value in field t.Bar was too long") 290 + } 291 + 292 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Bar))); err != nil { 293 + return err 294 + } 295 + if _, err := cw.WriteString(string(t.Bar)); err != nil { 296 + return err 297 + } 298 + 299 + // t.Seq (int64) (int64) 300 + if len("seq") > 1000000 { 301 + return xerrors.Errorf("Value in field \"seq\" was too long") 302 + } 303 + 304 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 305 + return err 306 + } 307 + if _, err := cw.WriteString(string("seq")); err != nil { 308 + return err 309 + } 310 + 311 + if t.Seq >= 0 { 312 + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 313 + return err 314 + } 315 + } else { 316 + if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 317 + return err 318 + } 319 + } 320 + return nil 321 + } 322 + 323 + func (t *SubscribeExample_Bar) UnmarshalCBOR(r io.Reader) (err error) { 324 + *t = SubscribeExample_Bar{} 325 + 326 + cr := cbg.NewCborReader(r) 327 + 328 + maj, extra, err := cr.ReadHeader() 329 + if err != nil { 330 + return err 331 + } 332 + defer func() { 333 + if err == io.EOF { 334 + err = io.ErrUnexpectedEOF 335 + } 336 + }() 337 + 338 + if maj != cbg.MajMap { 339 + return fmt.Errorf("cbor input should be of type map") 340 + } 341 + 342 + if extra > cbg.MaxLength { 343 + return fmt.Errorf("SubscribeExample_Bar: map struct too large (%d)", extra) 344 + } 345 + 346 + n := extra 347 + 348 + nameBuf := make([]byte, 8) 349 + for range n { 350 + nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 351 + if err != nil { 352 + return err 353 + } 354 + 355 + if !ok { 356 + // Field doesn't exist on this type, so ignore it 357 + if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 358 + return err 359 + } 360 + continue 361 + } 362 + 363 + switch string(nameBuf[:nameLen]) { 364 + // t.Bar (string) (string) 365 + case "bar": 366 + 367 + { 368 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 369 + if err != nil { 370 + return err 371 + } 372 + 373 + t.Bar = string(sval) 374 + } 375 + // t.Seq (int64) (int64) 376 + case "seq": 377 + { 378 + maj, extra, err := cr.ReadHeader() 379 + if err != nil { 380 + return err 381 + } 382 + var extraI int64 383 + switch maj { 384 + case cbg.MajUnsignedInt: 385 + extraI = int64(extra) 386 + if extraI < 0 { 387 + return fmt.Errorf("int64 positive overflow") 388 + } 389 + case cbg.MajNegativeInt: 390 + extraI = int64(extra) 391 + if extraI < 0 { 392 + return fmt.Errorf("int64 negative overflow") 393 + } 394 + extraI = -1 - extraI 395 + default: 396 + return fmt.Errorf("wrong type for int64 field: %d", maj) 397 + } 398 + 399 + t.Seq = int64(extraI) 400 + } 401 + 402 + default: 403 + // Field doesn't exist on this type, so ignore it 404 + if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 405 + return err 406 + } 407 + } 408 + } 409 + 410 + return nil 411 + } 412 + 413 + type testScheduler struct { 414 + ch chan *SubscribeExample_Event 415 + lastSeq atomic.Int64 416 + } 417 + 418 + var _ SeqScheduler[SubscribeExample_Event] = (*testScheduler)(nil) 419 + 420 + func newTestScheduler() *testScheduler { 421 + return &testScheduler{ch: make(chan *SubscribeExample_Event, 8)} 422 + } 423 + 424 + func (s *testScheduler) AddWork(ctx context.Context, namespace string, val *SubscribeExample_Event) error { 425 + s.lastSeq.Store(val.seq()) 426 + select { 427 + case s.ch <- val: 428 + case <-ctx.Done(): 429 + } 430 + return nil 431 + } 432 + 433 + func (s *testScheduler) Shutdown() {} 434 + 435 + func (s *testScheduler) LastSeq() int64 { return s.lastSeq.Load() } 436 + 437 + // recv returns the next scheduled event. The timeout is generous because a 438 + // redial during the offline window backs off >=5s. 439 + func (s *testScheduler) recv(t *testing.T) *SubscribeExample_Event { 440 + t.Helper() 441 + select { 442 + case e := <-s.ch: 443 + return e 444 + case <-time.After(12 * time.Second): 445 + t.Fatal("timed out waiting for an event") 446 + return nil 447 + } 448 + } 449 + 450 + type testRedialer struct { 451 + sched *testScheduler 452 + } 453 + 454 + var _ Redialer = (*testRedialer)(nil) 455 + 456 + func (r *testRedialer) Process(ctx context.Context, cr *cbg.CborReader) error { 457 + var evt SubscribeExample_Event 458 + if err := evt.Deserialize(cr); err != nil { 459 + return err 460 + } 461 + return r.sched.AddWork(ctx, "", &evt) 462 + } 463 + 464 + func (r *testRedialer) UpdateParams(ctx context.Context, params map[string]any) bool { 465 + last := r.sched.LastSeq() 466 + if last == 0 { 467 + return false 468 + } 469 + params["cursor"] = last 470 + return true 471 + } 472 + 473 + const testEndpoint = "com.example.subscribeExample" 474 + 475 + // testServer live-tails an append-only event log over a websocket: each 476 + // connection streams events whose seq exceeds the requested cursor, including 477 + // ones added after it opened. The httptest listener stays bound the whole time 478 + // (so the port can't be taken over); Close/Start toggle offline by having the 479 + // handler answer 404, which fails the client's websocket handshake and drives 480 + // it into its redial/backoff loop. 481 + type testServer struct { 482 + URL string 483 + 484 + mu sync.Mutex 485 + events []SubscribeExample_Event 486 + conns map[*websocket.Conn]struct{} 487 + serving bool 488 + } 489 + 490 + func newTestServer(t *testing.T) *testServer { 491 + t.Helper() 492 + ts := &testServer{ 493 + conns: make(map[*websocket.Conn]struct{}), 494 + serving: true, 495 + } 496 + srv := httptest.NewServer(ts.handler()) 497 + ts.URL = srv.URL 498 + t.Cleanup(func() { 499 + ts.Close() // drop hijacked conns before srv.Close so it won't block 500 + srv.Close() 501 + }) 502 + return ts 503 + } 504 + 505 + func (ts *testServer) handler() http.Handler { 506 + up := websocket.Upgrader{} 507 + mux := http.NewServeMux() 508 + mux.HandleFunc("/xrpc/"+testEndpoint, func(w http.ResponseWriter, r *http.Request) { 509 + if !ts.isServing() { 510 + http.Error(w, "offline", http.StatusNotFound) 511 + return 512 + } 513 + 514 + var cursor int64 515 + if c := r.URL.Query().Get("cursor"); c != "" { 516 + cursor, _ = strconv.ParseInt(c, 10, 64) 517 + } 518 + 519 + conn, err := up.Upgrade(w, r, nil) 520 + if err != nil { 521 + return 522 + } 523 + 524 + ts.mu.Lock() 525 + ts.conns[conn] = struct{}{} 526 + ts.mu.Unlock() 527 + defer func() { 528 + ts.mu.Lock() 529 + delete(ts.conns, conn) 530 + ts.mu.Unlock() 531 + conn.Close() 532 + }() 533 + 534 + sent := cursor 535 + for { 536 + ts.mu.Lock() 537 + var batch []SubscribeExample_Event 538 + for _, e := range ts.events { 539 + if e.seq() > sent { 540 + batch = append(batch, e) 541 + } 542 + } 543 + ts.mu.Unlock() 544 + 545 + for i := range batch { 546 + if err := func(conn *websocket.Conn, e *SubscribeExample_Event) error { 547 + var buf bytes.Buffer 548 + if err := e.Serialize(&buf); err != nil { 549 + return err 550 + } 551 + return conn.WriteMessage(websocket.BinaryMessage, buf.Bytes()) 552 + }(conn, &batch[i]); err != nil { 553 + return 554 + } 555 + sent = batch[i].seq() 556 + } 557 + time.Sleep(5 * time.Millisecond) 558 + } 559 + }) 560 + return mux 561 + } 562 + 563 + func (ts *testServer) isServing() bool { 564 + ts.mu.Lock() 565 + defer ts.mu.Unlock() 566 + return ts.serving 567 + } 568 + 569 + // Start brings the server back online. 570 + func (ts *testServer) Start() { 571 + ts.mu.Lock() 572 + ts.serving = true 573 + ts.mu.Unlock() 574 + } 575 + 576 + // Close takes the server offline: new handshakes get 404 and active websocket 577 + // connections are dropped, forcing the subscription to redial. 578 + func (ts *testServer) Close() { 579 + ts.mu.Lock() 580 + ts.serving = false 581 + conns := make([]*websocket.Conn, 0, len(ts.conns)) 582 + for c := range ts.conns { 583 + conns = append(conns, c) 584 + } 585 + ts.mu.Unlock() 586 + 587 + for _, c := range conns { 588 + c.Close() 589 + } 590 + } 591 + 592 + func (ts *testServer) AddEvent(e SubscribeExample_Event) { 593 + ts.mu.Lock() 594 + ts.events = append(ts.events, e) 595 + ts.mu.Unlock() 596 + } 597 + 598 + func waitForReturn(t *testing.T, done <-chan error) { 599 + t.Helper() 600 + select { 601 + case <-done: 602 + case <-time.After(5 * time.Second): 603 + t.Fatal("subscription did not return after context cancel") 604 + } 605 + } 606 + 607 + func TestLexSubscribe_ConsumesAndSchedules(t *testing.T) { 608 + srv := newTestServer(t) 609 + defer srv.Close() 610 + 611 + c := &Client{Client: indigoxrpc.Client{Host: srv.URL}} 612 + sched := newTestScheduler() 613 + 614 + process := func(ctx context.Context, cr *cbg.CborReader) error { 615 + var evt SubscribeExample_Event 616 + if err := evt.Deserialize(cr); err != nil { 617 + return err 618 + } 619 + return sched.AddWork(ctx, "", &evt) 620 + } 621 + 622 + ctx, cancel := context.WithCancel(context.Background()) 623 + done := make(chan error, 1) 624 + go func() { 625 + done <- c.LexSubscribe(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, process) 626 + }() 627 + 628 + srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}) 629 + srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}) 630 + assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t)) 631 + assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t)) 632 + 633 + srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}) 634 + assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t)) 635 + 636 + cancel() 637 + waitForReturn(t, done) 638 + } 639 + 640 + func TestLexSubscribeWithRedialer_ConsumesAndSchedules(t *testing.T) { 641 + srv := newTestServer(t) 642 + defer srv.Close() 643 + 644 + c := &Client{Client: indigoxrpc.Client{Host: srv.URL}} 645 + sched := newTestScheduler() 646 + redialer := &testRedialer{sched: sched} 647 + 648 + ctx, cancel := context.WithCancel(context.Background()) 649 + done := make(chan error, 1) 650 + go func() { 651 + done <- c.LexSubscribeWithRedialer(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, redialer) 652 + }() 653 + 654 + srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}) 655 + srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}) 656 + assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t)) 657 + assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t)) 658 + 659 + srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}) 660 + srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}) 661 + assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t)) 662 + assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}, sched.recv(t)) 663 + 664 + cancel() 665 + waitForReturn(t, done) 666 + } 667 + 668 + func TestLexSubscribeWithRedialer_HandlesDowntime(t *testing.T) { 669 + srv := newTestServer(t) 670 + defer srv.Close() 671 + 672 + c := &Client{Client: indigoxrpc.Client{Host: srv.URL}} 673 + sched := newTestScheduler() 674 + redialer := &testRedialer{sched: sched} 675 + 676 + ctx, cancel := context.WithCancel(context.Background()) 677 + done := make(chan error, 1) 678 + go func() { 679 + done <- c.LexSubscribeWithRedialer(ctx, testEndpoint, map[string]any{"cursor": int64(0)}, redialer) 680 + }() 681 + 682 + srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}) 683 + srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}) 684 + assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 1, Foo: "foo-1"}}, sched.recv(t)) 685 + assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 2, Bar: "bar-2"}}, sched.recv(t)) 686 + 687 + // offline, add events, back online: the subscription redials and resumes 688 + srv.Close() 689 + 690 + srv.AddEvent(SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}) 691 + srv.AddEvent(SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}) 692 + 693 + srv.Start() 694 + 695 + assert.Equal(t, &SubscribeExample_Event{Foo: &SubscribeExample_Foo{Seq: 3, Foo: "foo-3"}}, sched.recv(t)) 696 + assert.Equal(t, &SubscribeExample_Event{Bar: &SubscribeExample_Bar{Seq: 4, Bar: "bar-4"}}, sched.recv(t)) 697 + 698 + cancel() 699 + waitForReturn(t, done) 700 + }