Stitch any CI into Tangled
2

Configure Feed

Select the types of activity you want to include in your feed.

at main 1.6 kB View raw
1package jetstream 2 3import ( 4 "context" 5 "sync" 6) 7 8// CursorStore persists the jetstream cursor. Implementations commonly back this 9// with a database row so process restarts can resume from the last applied 10// event. 11type CursorStore interface { 12 LoadCursor(context.Context) (*int64, error) 13 SaveCursor(context.Context, int64) error 14} 15 16// MemoryCursorStore is an in-memory CursorStore implementation. It is useful 17// for tests and for consumers that intentionally do not need cursor persistence 18// across process restarts. 19// 20// The zero value is ready to use and starts with no cursor, which makes the 21// consumer start from "now" until SaveCursor is called. 22type MemoryCursorStore struct { 23 mu sync.Mutex 24 cursor *int64 25} 26 27var _ CursorStore = (*MemoryCursorStore)(nil) 28 29// NewMemoryCursorStore returns a MemoryCursorStore initialized with cursor. A 30// nil cursor means no cursor has been saved yet. 31func NewMemoryCursorStore(cursor *int64) *MemoryCursorStore { 32 s := &MemoryCursorStore{} 33 if cursor != nil { 34 v := *cursor 35 s.cursor = &v 36 } 37 return s 38} 39 40// LoadCursor returns the last cursor saved in memory, or nil if none has been 41// saved. The returned pointer is a copy so callers cannot mutate store state 42// without going through SaveCursor. 43func (s *MemoryCursorStore) LoadCursor(context.Context) (*int64, error) { 44 s.mu.Lock() 45 defer s.mu.Unlock() 46 47 if s.cursor == nil { 48 return nil, nil 49 } 50 cursor := *s.cursor 51 return &cursor, nil 52} 53 54// SaveCursor stores cursor in memory. 55func (s *MemoryCursorStore) SaveCursor(_ context.Context, cursor int64) error { 56 s.mu.Lock() 57 defer s.mu.Unlock() 58 59 s.cursor = &cursor 60 return nil 61}