Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

eventconsumer: share legacy cursor migration

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 12, 2026, 11:32 AM +0300) commit 49e34bda parent bdebc315 change-id nyswuwqn
+191 -67
+1 -10
appview/state/streams.go
··· 28 28 srcs := make(map[ec.Source]struct{}, len(hosts)) 29 29 for _, h := range hosts { 30 30 src := ec.Source{Kind: kind, Host: h} 31 - migrateLegacyCursor(&cursorStore, src) 31 + ec.MigrateLegacyCursor(&cursorStore, src) 32 32 srcs[src] = struct{}{} 33 33 } 34 34 ··· 45 45 CursorStore: &cursorStore, 46 46 }) 47 47 } 48 - 49 - func migrateLegacyCursor(store cursor.Store, src ec.Source) { 50 - if store.Get(src.Key()) != 0 { 51 - return 52 - } 53 - if legacy := store.Get(src.Host); legacy != 0 { 54 - store.Set(src.Key(), legacy) 55 - } 56 - }
-56
appview/state/streams_test.go
··· 1 - package state 2 - 3 - import ( 4 - "testing" 5 - 6 - ec "tangled.org/core/eventconsumer" 7 - "tangled.org/core/eventconsumer/cursor" 8 - ) 9 - 10 - func TestMigrateLegacyCursor_CopiesBareHostToKindKey(t *testing.T) { 11 - store := &cursor.MemoryStore{} 12 - store.Set("clam.oyster.cafe", 1700000000123456789) 13 - 14 - migrateLegacyCursor(store, ec.NewKnotSource("clam.oyster.cafe")) 15 - 16 - if got := store.Get("knot:clam.oyster.cafe"); got != 1700000000123456789 { 17 - t.Fatalf("new key cursor = %d, want legacy value", got) 18 - } 19 - } 20 - 21 - func TestMigrateLegacyCursor_DoesNotClobberAdvancedCursor(t *testing.T) { 22 - store := &cursor.MemoryStore{} 23 - store.Set("knot:whelk.oyster.cafe", 999) 24 - store.Set("whelk.oyster.cafe", 100) 25 - 26 - migrateLegacyCursor(store, ec.NewKnotSource("whelk.oyster.cafe")) 27 - 28 - if got := store.Get("knot:whelk.oyster.cafe"); got != 999 { 29 - t.Fatalf("new key cursor = %d, want it left untouched at 999", got) 30 - } 31 - } 32 - 33 - func TestMigrateLegacyCursor_NoLegacyIsNoOp(t *testing.T) { 34 - store := &cursor.MemoryStore{} 35 - 36 - migrateLegacyCursor(store, ec.NewKnotSource("limpet.nel.pet")) 37 - 38 - if got := store.Get("knot:limpet.nel.pet"); got != 0 { 39 - t.Fatalf("new key cursor = %d, want 0", got) 40 - } 41 - } 42 - 43 - func TestMigrateLegacyCursor_KindsStayNamespaced(t *testing.T) { 44 - store := &cursor.MemoryStore{} 45 - store.Set("mussel.oyster.cafe", 500) 46 - 47 - migrateLegacyCursor(store, ec.NewKnotSource("mussel.oyster.cafe")) 48 - migrateLegacyCursor(store, ec.NewSpindleSource("mussel.oyster.cafe")) 49 - 50 - if got := store.Get("knot:mussel.oyster.cafe"); got != 500 { 51 - t.Fatalf("knot cursor = %d, want 500", got) 52 - } 53 - if got := store.Get("spindle:mussel.oyster.cafe"); got != 500 { 54 - t.Fatalf("spindle cursor = %d, want 500", got) 55 - } 56 - }
+67
eventconsumer/migrate_test.go
··· 1 + package eventconsumer 2 + 3 + import ( 4 + "testing" 5 + 6 + "tangled.org/core/eventconsumer/cursor" 7 + ) 8 + 9 + func TestMigrateLegacyCursor(t *testing.T) { 10 + const host = "whelk.knot.tld" 11 + src := NewKnotSource(host) 12 + 13 + t.Run("copies legacy bare-host cursor to namespaced key", func(t *testing.T) { 14 + store := &cursor.MemoryStore{} 15 + store.Set(host, 42) 16 + 17 + MigrateLegacyCursor(store, src) 18 + 19 + if got := store.Get(src.Key()); got != 42 { 20 + t.Fatalf("namespaced key = %d, want 42", got) 21 + } 22 + if got := store.Get(host); got != 42 { 23 + t.Fatalf("legacy key = %d, want it left at 42", got) 24 + } 25 + }) 26 + 27 + t.Run("does not pave over an already-migrated cursor", func(t *testing.T) { 28 + store := &cursor.MemoryStore{} 29 + store.Set(src.Key(), 100) 30 + store.Set(host, 42) 31 + 32 + MigrateLegacyCursor(store, src) 33 + 34 + if got := store.Get(src.Key()); got != 100 { 35 + t.Fatalf("namespaced key = %d, want 100", got) 36 + } 37 + }) 38 + 39 + t.Run("noop when neither key is set", func(t *testing.T) { 40 + store := &cursor.MemoryStore{} 41 + 42 + MigrateLegacyCursor(store, src) 43 + 44 + if got := store.Get(src.Key()); got != 0 { 45 + t.Fatalf("namespaced key = %d, want 0", got) 46 + } 47 + }) 48 + 49 + t.Run("namespaces the same host by kind", func(t *testing.T) { 50 + const shared = "mussel.knot.tld" 51 + knot := NewKnotSource(shared) 52 + spindle := NewSpindleSource(shared) 53 + 54 + store := &cursor.MemoryStore{} 55 + store.Set(shared, 500) 56 + 57 + MigrateLegacyCursor(store, knot) 58 + MigrateLegacyCursor(store, spindle) 59 + 60 + if got := store.Get(knot.Key()); got != 500 { 61 + t.Fatalf("knot key = %d, want 500", got) 62 + } 63 + if got := store.Get(spindle.Key()); got != 500 { 64 + t.Fatalf("spindle key = %d, want 500", got) 65 + } 66 + }) 67 + }
+11
eventconsumer/source.go
··· 3 3 import ( 4 4 "net/url" 5 5 "strconv" 6 + 7 + "tangled.org/core/eventconsumer/cursor" 6 8 ) 7 9 8 10 type Kind string ··· 21 23 func NewSpindleSource(host string) Source { return Source{Kind: KindSpindle, Host: host} } 22 24 23 25 func (s Source) Key() string { return string(s.Kind) + ":" + s.Host } 26 + 27 + func MigrateLegacyCursor(store cursor.Store, s Source) { 28 + if store.Get(s.Key()) != 0 { 29 + return 30 + } 31 + if legacy := store.Get(s.Host); legacy != 0 { 32 + store.Set(s.Key(), legacy) 33 + } 34 + } 24 35 25 36 func DefaultURL(dev bool) func(Source, int64) (*url.URL, error) { 26 37 scheme := "wss"
+109
eventconsumer/upgrade_test.go
··· 1 + package eventconsumer 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "log/slog" 7 + "path/filepath" 8 + "sync" 9 + "testing" 10 + "time" 11 + 12 + "tangled.org/core/eventconsumer/cursor" 13 + "tangled.org/core/eventstream" 14 + ) 15 + 16 + func sqliteCursorStore(t *testing.T) cursor.Store { 17 + t.Helper() 18 + store, err := cursor.NewSQLiteStore(filepath.Join(t.TempDir(), "spindle.db")) 19 + if err != nil { 20 + t.Fatalf("new sqlite cursor store: %v", err) 21 + } 22 + return store 23 + } 24 + 25 + func drainProcessed(t *testing.T, store cursor.Store, source Source) []int64 { 26 + t.Helper() 27 + 28 + var mu sync.Mutex 29 + var seen []int64 30 + 31 + c := NewConsumer(ConsumerConfig{ 32 + ProcessFunc: func(_ context.Context, _ Source, msg eventstream.Event) error { 33 + mu.Lock() 34 + seen = append(seen, msg.Created) 35 + mu.Unlock() 36 + return nil 37 + }, 38 + WorkerCount: 1, 39 + QueueSize: 16, 40 + ConnectionTimeout: 2 * time.Second, 41 + CursorStore: store, 42 + URLFunc: DefaultURL(true), 43 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 44 + }) 45 + 46 + ctx, cancel := context.WithCancel(context.Background()) 47 + t.Cleanup(cancel) 48 + c.Start(ctx) 49 + c.AddSource(ctx, source) 50 + 51 + deadline := time.Now().Add(3 * time.Second) 52 + last, stable := -1, 0 53 + for time.Now().Before(deadline) { 54 + time.Sleep(100 * time.Millisecond) 55 + mu.Lock() 56 + n := len(seen) 57 + mu.Unlock() 58 + if n == last { 59 + if stable++; stable >= 3 && n > 0 { 60 + break 61 + } 62 + } else { 63 + last, stable = n, 0 64 + } 65 + } 66 + 67 + mu.Lock() 68 + defer mu.Unlock() 69 + return append([]int64(nil), seen...) 70 + } 71 + 72 + func TestSpindleUpgrade_OrphanedCursorReplaysFromZero(t *testing.T) { 73 + src := &memSrc{} 74 + for i := range 8 { 75 + src.add(mkEv(i)) 76 + } 77 + source, _ := startEventServer(t, src) 78 + 79 + store := sqliteCursorStore(t) 80 + store.Set(source.Host, 5) 81 + 82 + seen := drainProcessed(t, store, source) 83 + 84 + if len(seen) != 8 { 85 + t.Fatalf("orphaned bare-host cursor processed %d events, want a full replay of 8: %v", len(seen), seen) 86 + } 87 + } 88 + 89 + func TestSpindleUpgrade_MigratedCursorResumesNoReplay(t *testing.T) { 90 + src := &memSrc{} 91 + for i := range 8 { 92 + src.add(mkEv(i)) 93 + } 94 + source, _ := startEventServer(t, src) 95 + 96 + store := sqliteCursorStore(t) 97 + store.Set(source.Host, 5) 98 + 99 + MigrateLegacyCursor(store, source) 100 + 101 + seen := drainProcessed(t, store, source) 102 + 103 + if len(seen) != 3 { 104 + t.Fatalf("migrated cursor processed %d events, want a resume of 3: %v", len(seen), seen) 105 + } 106 + if seen[0] != 6 || seen[2] != 8 { 107 + t.Fatalf("resumed events = %v, want [6 7 8]", seen) 108 + } 109 + }
+3 -1
spindle/server.go
··· 194 194 } 195 195 for _, knot := range knownKnots { 196 196 logger.Info("adding source start", "knot", knot) 197 - ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 197 + src := eventconsumer.NewKnotSource(knot) 198 + eventconsumer.MigrateLegacyCursor(cursorStore, src) 199 + ccfg.Sources[src] = struct{}{} 198 200 } 199 201 spindle.ks = eventconsumer.NewConsumer(*ccfg) 200 202