Stitch any CI into Tangled
1package jetstream
2
3import (
4 "context"
5 "sync"
6)
7
8// CursorStore persists the jetstream cursor. Implementations commonly back this
9// with a database row so process restarts can resume from the last applied
10// event.
11type CursorStore interface {
12 LoadCursor(context.Context) (*int64, error)
13 SaveCursor(context.Context, int64) error
14}
15
16// MemoryCursorStore is an in-memory CursorStore implementation. It is useful
17// for tests and for consumers that intentionally do not need cursor persistence
18// across process restarts.
19//
20// The zero value is ready to use and starts with no cursor, which makes the
21// consumer start from "now" until SaveCursor is called.
22type MemoryCursorStore struct {
23 mu sync.Mutex
24 cursor *int64
25}
26
27var _ CursorStore = (*MemoryCursorStore)(nil)
28
29// NewMemoryCursorStore returns a MemoryCursorStore initialized with cursor. A
30// nil cursor means no cursor has been saved yet.
31func NewMemoryCursorStore(cursor *int64) *MemoryCursorStore {
32 s := &MemoryCursorStore{}
33 if cursor != nil {
34 v := *cursor
35 s.cursor = &v
36 }
37 return s
38}
39
40// LoadCursor returns the last cursor saved in memory, or nil if none has been
41// saved. The returned pointer is a copy so callers cannot mutate store state
42// without going through SaveCursor.
43func (s *MemoryCursorStore) LoadCursor(context.Context) (*int64, error) {
44 s.mu.Lock()
45 defer s.mu.Unlock()
46
47 if s.cursor == nil {
48 return nil, nil
49 }
50 cursor := *s.cursor
51 return &cursor, nil
52}
53
54// SaveCursor stores cursor in memory.
55func (s *MemoryCursorStore) SaveCursor(_ context.Context, cursor int64) error {
56 s.mu.Lock()
57 defer s.mu.Unlock()
58
59 s.cursor = &cursor
60 return nil
61}