Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview/state: switch streams to eventstream, backfill legacy cursors

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
date (May 29, 2026, 2:50 PM +0300) commit 4053d0e3 parent 3bee528b change-id xoomtzxr
+313 -79
+12
appview/knots/knots.go
··· 337 337 return 338 338 } 339 339 340 + if registration.Registered != nil { 341 + remaining, rErr := db.GetRegistrations(k.Db, 342 + orm.FilterEq("domain", domain), 343 + orm.FilterIsNot("registered", "null"), 344 + ) 345 + if rErr != nil { 346 + l.Warn("failed to check remaining registrations after delete", "err", rErr) 347 + } else if len(remaining) == 0 { 348 + go k.Knotstream.RemoveSource(eventconsumer.NewKnotSource(domain)) 349 + } 350 + } 351 + 340 352 shouldRedirect := r.Header.Get("shouldRedirect") 341 353 if shouldRedirect == "true" { 342 354 k.Pages.HxRedirect(w, "/knots")
+10 -1
appview/repo/repo.go
··· 168 168 return 169 169 } 170 170 171 + oldSpindle := f.Spindle 172 + if oldSpindle != "" && oldSpindle != newSpindle { 173 + remaining, qErr := db.GetRepos(rp.db, orm.FilterEq("spindle", oldSpindle)) 174 + if qErr != nil { 175 + l.Warn("failed to count repos using old spindle", "err", qErr) 176 + } else if len(remaining) == 0 { 177 + rp.spindlestream.RemoveSource(eventconsumer.NewSpindleSource(oldSpindle)) 178 + } 179 + } 180 + 171 181 if !removingSpindle { 172 - // add this spindle to spindle stream 173 182 rp.spindlestream.AddSource( 174 183 context.Background(), 175 184 eventconsumer.NewSpindleSource(newSpindle),
+19 -39
appview/state/knotstream.go
··· 14 14 "tangled.org/core/appview/notify" 15 15 16 16 "tangled.org/core/api/tangled" 17 - "tangled.org/core/appview/cache" 18 17 "tangled.org/core/appview/config" 19 18 "tangled.org/core/appview/db" 20 19 "tangled.org/core/appview/models" 21 20 "tangled.org/core/appview/sites" 22 21 ec "tangled.org/core/eventconsumer" 23 - "tangled.org/core/eventconsumer/cursor" 22 + "tangled.org/core/eventstream" 24 23 knotdb "tangled.org/core/knotserver/db" 25 24 "tangled.org/core/log" 26 25 "tangled.org/core/orm" ··· 33 32 ) 34 33 35 34 func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 36 - logger := log.FromContext(ctx) 37 - logger = log.SubLogger(logger, "knotstream") 38 - 39 - knots, err := db.GetRegistrations( 40 - d, 41 - orm.FilterIsNot("registered", "null"), 42 - ) 35 + knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null")) 43 36 if err != nil { 44 37 return nil, err 45 38 } 46 39 47 - srcs := make(map[ec.Source]struct{}) 48 - for _, k := range knots { 49 - s := ec.NewKnotSource(k.Domain) 50 - srcs[s] = struct{}{} 51 - } 52 - 53 - cache := cache.New(c.Redis.Addr) 54 - cursorStore := cursor.NewRedisCursorStore(cache) 55 - 56 - cfg := ec.ConsumerConfig{ 57 - Sources: srcs, 58 - ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 59 - RetryInterval: c.Knotstream.RetryInterval, 60 - MaxRetryInterval: c.Knotstream.MaxRetryInterval, 61 - ConnectionTimeout: c.Knotstream.ConnectionTimeout, 62 - WorkerCount: c.Knotstream.WorkerCount, 63 - QueueSize: c.Knotstream.QueueSize, 64 - Logger: logger, 65 - Dev: c.Core.Dev, 66 - CursorStore: &cursorStore, 40 + hosts := make([]string, len(knots)) 41 + for i, k := range knots { 42 + hosts[i] = k.Domain 67 43 } 68 44 69 - return ec.NewConsumer(cfg), nil 45 + return bootstrapStream( 46 + ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr, 47 + c.Knotstream, c.Core.Dev, 48 + knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 49 + ), nil 70 50 } 71 51 72 52 func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { ··· 84 64 } 85 65 86 66 func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 87 - return func(ctx context.Context, source ec.Source, msg ec.Message) error { 67 + return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 88 68 switch msg.Nsid { 89 69 case tangled.GitRefUpdateNSID: 90 70 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) ··· 99 79 } 100 80 101 81 // TODO(boltless): remove this. knotmirror should do all sort of indexing 102 - func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error { 82 + func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error { 103 83 logger := log.FromContext(ctx) 104 84 105 85 var record tangled.GitRefUpdate ··· 112 92 if err != nil { 113 93 return err 114 94 } 115 - if !slices.Contains(knownKnots, source.Key()) { 116 - return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 95 + if !slices.Contains(knownKnots, source.Host) { 96 + return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Host) 117 97 } 118 98 119 99 if record.Repo == "" { 120 - return fmt.Errorf("gitRefUpdate from %s missing repo", source.Key()) 100 + return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host) 121 101 } 122 102 123 103 repo, lookupErr := db.GetRepoByDid(d, record.Repo) ··· 285 265 return tx.Commit() 286 266 } 287 267 288 - func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 268 + func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error { 289 269 var record tangled.Pipeline 290 270 err := json.Unmarshal(msg.EventJson, &record) 291 271 if err != nil { ··· 343 323 344 324 pipeline := models.Pipeline{ 345 325 Rkey: msg.Rkey, 346 - Knot: source.Key(), 326 + Knot: source.Host, 347 327 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 348 328 RepoName: repoName, 349 329 RepoDid: repo.RepoDid, ··· 364 344 return nil 365 345 } 366 346 367 - func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error { 347 + func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error { 368 348 logger := log.FromContext(ctx) 369 349 370 350 var record knotdb.RepoDIDAssign ··· 393 373 return nil 394 374 } 395 375 repo := repos[0] 396 - knot := source.Key() 376 + knot := source.Host 397 377 398 378 if repo.Knot != knot { 399 379 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot)
+15 -39
appview/state/spindlestream.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "fmt" 7 - "log/slog" 8 7 "strings" 9 8 "time" 10 9 11 10 "github.com/bluesky-social/indigo/atproto/syntax" 12 11 "tangled.org/core/api/tangled" 13 - "tangled.org/core/appview/cache" 14 12 "tangled.org/core/appview/config" 15 13 "tangled.org/core/appview/db" 16 14 "tangled.org/core/appview/models" 17 15 "tangled.org/core/appview/pipelines" 18 16 ec "tangled.org/core/eventconsumer" 19 - "tangled.org/core/eventconsumer/cursor" 20 - "tangled.org/core/log" 17 + "tangled.org/core/eventstream" 21 18 "tangled.org/core/orm" 22 19 "tangled.org/core/rbac" 23 20 spindle "tangled.org/core/spindle/models" 24 21 ) 25 22 26 23 func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, pn *pipelines.StatusNotifier) (*ec.Consumer, error) { 27 - logger := log.FromContext(ctx) 28 - logger = log.SubLogger(logger, "spindlestream") 29 - 30 - spindles, err := db.GetSpindles( 31 - ctx, 32 - d, 33 - orm.FilterIsNot("verified", "null"), 34 - ) 24 + spindles, err := db.GetSpindles(ctx, d, orm.FilterIsNot("verified", "null")) 35 25 if err != nil { 36 26 return nil, err 37 27 } 38 28 39 - srcs := make(map[ec.Source]struct{}) 40 - for _, s := range spindles { 41 - src := ec.NewSpindleSource(s.Instance) 42 - srcs[src] = struct{}{} 43 - } 44 - 45 - cache := cache.New(c.Redis.Addr) 46 - cursorStore := cursor.NewRedisCursorStore(cache) 47 - 48 - cfg := ec.ConsumerConfig{ 49 - Sources: srcs, 50 - ProcessFunc: spindleIngester(ctx, logger, d, pn), 51 - RetryInterval: c.Spindlestream.RetryInterval, 52 - MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 53 - ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 54 - WorkerCount: c.Spindlestream.WorkerCount, 55 - QueueSize: c.Spindlestream.QueueSize, 56 - Logger: logger, 57 - Dev: c.Core.Dev, 58 - CursorStore: &cursorStore, 29 + hosts := make([]string, len(spindles)) 30 + for i, s := range spindles { 31 + hosts[i] = s.Instance 59 32 } 60 33 61 - return ec.NewConsumer(cfg), nil 34 + return bootstrapStream( 35 + ctx, "spindlestream", ec.KindSpindle, hosts, c.Redis.Addr, 36 + c.Spindlestream, c.Core.Dev, 37 + spindleIngester(d, pn), 38 + ), nil 62 39 } 63 40 64 - func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 65 - return func(ctx context.Context, source ec.Source, msg ec.Message) error { 41 + func spindleIngester(d *db.DB, pn *pipelines.StatusNotifier) ec.ProcessFunc { 42 + return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 66 43 switch msg.Nsid { 67 44 case tangled.PipelineStatusNSID: 68 - return ingestPipelineStatus(ctx, logger, d, pn, source, msg) 45 + return ingestPipelineStatus(ctx, d, pn, source, msg) 69 46 } 70 - 71 47 return nil 72 48 } 73 49 } 74 50 75 - func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg ec.Message) error { 51 + func ingestPipelineStatus(ctx context.Context, d *db.DB, pn *pipelines.StatusNotifier, source ec.Source, msg eventstream.Event) error { 76 52 var record tangled.PipelineStatus 77 53 err := json.Unmarshal(msg.EventJson, &record) 78 54 if err != nil { ··· 96 72 } 97 73 98 74 status := models.PipelineStatus{ 99 - Spindle: source.Key(), 75 + Spindle: source.Host, 100 76 Rkey: msg.Rkey, 101 77 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 102 78 PipelineRkey: pipelineUri.RecordKey().String(),
+145
appview/state/spindlestream_test.go
··· 1 + package state 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "log/slog" 7 + "net/http" 8 + "net/http/httptest" 9 + "path/filepath" 10 + "strings" 11 + "testing" 12 + "time" 13 + 14 + "tangled.org/core/appview/db" 15 + "tangled.org/core/appview/pipelines" 16 + ec "tangled.org/core/eventconsumer" 17 + "tangled.org/core/eventconsumer/cursor" 18 + "tangled.org/core/eventstream" 19 + "tangled.org/core/notifier" 20 + spindledb "tangled.org/core/spindle/db" 21 + spindlemodels "tangled.org/core/spindle/models" 22 + ) 23 + 24 + func TestColdStart_SpindleEventsRebuildPipelineStatuses(t *testing.T) { 25 + ctx := t.Context() 26 + 27 + spindleDB, err := spindledb.Make(ctx, filepath.Join(t.TempDir(), "spindle.db")) 28 + if err != nil { 29 + t.Fatalf("spindle Make: %v", err) 30 + } 31 + t.Cleanup(func() { spindleDB.Close() }) 32 + 33 + n := notifier.New() 34 + workflowId := spindlemodels.WorkflowId{ 35 + PipelineId: spindlemodels.PipelineId{Knot: "knot.boltless.example", Rkey: "pipeline-rk1"}, 36 + Name: "build", 37 + } 38 + for _, step := range []func() error{ 39 + func() error { return spindleDB.StatusPending(workflowId, &n) }, 40 + func() error { return spindleDB.StatusRunning(workflowId, &n) }, 41 + func() error { return spindleDB.StatusSuccess(workflowId, &n) }, 42 + } { 43 + if err := step(); err != nil { 44 + t.Fatalf("seed spindle event: %v", err) 45 + } 46 + } 47 + 48 + mux := http.NewServeMux() 49 + mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { 50 + _ = eventstream.Stream(w, r, eventstream.StreamConfig{ 51 + Backend: spindleDB, 52 + Notifier: &n, 53 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 54 + }) 55 + }) 56 + srv := httptest.NewServer(mux) 57 + t.Cleanup(srv.Close) 58 + source := ec.Source{Kind: "test", Host: strings.TrimPrefix(srv.URL, "http://")} 59 + 60 + appviewDB, err := db.Make(ctx, filepath.Join(t.TempDir(), "appview.db")) 61 + if err != nil { 62 + t.Fatalf("appview Make: %v", err) 63 + } 64 + t.Cleanup(func() { appviewDB.Close() }) 65 + 66 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 67 + processFunc := spindleIngester(appviewDB, pipelines.NewStatusNotifier()) 68 + 69 + cfg := ec.ConsumerConfig{ 70 + ProcessFunc: processFunc, 71 + WorkerCount: 1, 72 + QueueSize: 16, 73 + ConnectionTimeout: 2 * time.Second, 74 + CursorStore: &cursor.MemoryStore{}, 75 + URLFunc: ec.DefaultURL(true), 76 + Logger: logger, 77 + } 78 + c := ec.NewConsumer(cfg) 79 + 80 + consumerCtx, cancel := context.WithCancel(ctx) 81 + defer cancel() 82 + c.Start(consumerCtx) 83 + c.AddSource(consumerCtx, source) 84 + 85 + deadline := time.Now().Add(3 * time.Second) 86 + for time.Now().Before(deadline) { 87 + var n int 88 + if err := appviewDB.QueryRow(`select count(*) from pipeline_statuses`).Scan(&n); err != nil { 89 + t.Fatalf("count: %v", err) 90 + } 91 + if n >= 3 { 92 + break 93 + } 94 + time.Sleep(20 * time.Millisecond) 95 + } 96 + 97 + rows, err := appviewDB.Query(` 98 + select spindle, pipeline_knot, pipeline_rkey, workflow, status 99 + from pipeline_statuses 100 + order by created asc 101 + `) 102 + if err != nil { 103 + t.Fatalf("query: %v", err) 104 + } 105 + defer rows.Close() 106 + 107 + type rec struct { 108 + spindle, knot, rkey, workflow, status string 109 + } 110 + var got []rec 111 + for rows.Next() { 112 + var r rec 113 + if err := rows.Scan(&r.spindle, &r.knot, &r.rkey, &r.workflow, &r.status); err != nil { 114 + t.Fatalf("scan: %v", err) 115 + } 116 + got = append(got, r) 117 + } 118 + 119 + if len(got) != 3 { 120 + t.Fatalf("pipeline_statuses rows = %d, want 3: %+v", len(got), got) 121 + } 122 + 123 + wantStatuses := []string{"pending", "running", "success"} 124 + gotStatuses := map[string]bool{} 125 + for _, r := range got { 126 + gotStatuses[r.status] = true 127 + if r.spindle != source.Host { 128 + t.Errorf("spindle = %q, want %q", r.spindle, source.Host) 129 + } 130 + if r.knot != workflowId.Knot { 131 + t.Errorf("pipeline_knot = %q, want %q", r.knot, workflowId.Knot) 132 + } 133 + if r.rkey != workflowId.Rkey { 134 + t.Errorf("pipeline_rkey = %q, want %q", r.rkey, workflowId.Rkey) 135 + } 136 + if r.workflow != workflowId.Name { 137 + t.Errorf("workflow = %q, want %q", r.workflow, workflowId.Name) 138 + } 139 + } 140 + for _, want := range wantStatuses { 141 + if !gotStatuses[want] { 142 + t.Errorf("missing status %q in projection", want) 143 + } 144 + } 145 + }
+56
appview/state/streams.go
··· 1 + package state 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/core/appview/cache" 7 + "tangled.org/core/appview/config" 8 + ec "tangled.org/core/eventconsumer" 9 + "tangled.org/core/eventconsumer/cursor" 10 + "tangled.org/core/log" 11 + ) 12 + 13 + func bootstrapStream( 14 + ctx context.Context, 15 + name string, 16 + kind ec.Kind, 17 + hosts []string, 18 + redisAddr string, 19 + streamCfg config.ConsumerConfig, 20 + dev bool, 21 + processFn ec.ProcessFunc, 22 + ) *ec.Consumer { 23 + logger := log.SubLogger(log.FromContext(ctx), name) 24 + 25 + redisCache := cache.New(redisAddr) 26 + cursorStore := cursor.NewRedisCursorStore(redisCache) 27 + 28 + srcs := make(map[ec.Source]struct{}, len(hosts)) 29 + for _, h := range hosts { 30 + src := ec.Source{Kind: kind, Host: h} 31 + migrateLegacyCursor(&cursorStore, src) 32 + srcs[src] = struct{}{} 33 + } 34 + 35 + return ec.NewConsumer(ec.ConsumerConfig{ 36 + Sources: srcs, 37 + ProcessFunc: processFn, 38 + RetryInterval: streamCfg.RetryInterval, 39 + MaxRetryInterval: streamCfg.MaxRetryInterval, 40 + ConnectionTimeout: streamCfg.ConnectionTimeout, 41 + WorkerCount: streamCfg.WorkerCount, 42 + QueueSize: streamCfg.QueueSize, 43 + Logger: logger, 44 + URLFunc: ec.DefaultURL(dev), 45 + CursorStore: &cursorStore, 46 + }) 47 + } 48 + 49 + func migrateLegacyCursor(store cursor.Store, src ec.Source) { 50 + if store.Get(src.Key()) != 0 { 51 + return 52 + } 53 + if legacy := store.Get(src.Host); legacy != 0 { 54 + store.Set(src.Key(), legacy) 55 + } 56 + }
+56
appview/state/streams_test.go
··· 1 + package state 2 + 3 + import ( 4 + "testing" 5 + 6 + ec "tangled.org/core/eventconsumer" 7 + "tangled.org/core/eventconsumer/cursor" 8 + ) 9 + 10 + func TestMigrateLegacyCursor_CopiesBareHostToKindKey(t *testing.T) { 11 + store := &cursor.MemoryStore{} 12 + store.Set("clam.oyster.cafe", 1700000000123456789) 13 + 14 + migrateLegacyCursor(store, ec.NewKnotSource("clam.oyster.cafe")) 15 + 16 + if got := store.Get("knot:clam.oyster.cafe"); got != 1700000000123456789 { 17 + t.Fatalf("new key cursor = %d, want legacy value", got) 18 + } 19 + } 20 + 21 + func TestMigrateLegacyCursor_DoesNotClobberAdvancedCursor(t *testing.T) { 22 + store := &cursor.MemoryStore{} 23 + store.Set("knot:whelk.oyster.cafe", 999) 24 + store.Set("whelk.oyster.cafe", 100) 25 + 26 + migrateLegacyCursor(store, ec.NewKnotSource("whelk.oyster.cafe")) 27 + 28 + if got := store.Get("knot:whelk.oyster.cafe"); got != 999 { 29 + t.Fatalf("new key cursor = %d, want it left untouched at 999", got) 30 + } 31 + } 32 + 33 + func TestMigrateLegacyCursor_NoLegacyIsNoOp(t *testing.T) { 34 + store := &cursor.MemoryStore{} 35 + 36 + migrateLegacyCursor(store, ec.NewKnotSource("limpet.nel.pet")) 37 + 38 + if got := store.Get("knot:limpet.nel.pet"); got != 0 { 39 + t.Fatalf("new key cursor = %d, want 0", got) 40 + } 41 + } 42 + 43 + func TestMigrateLegacyCursor_KindsStayNamespaced(t *testing.T) { 44 + store := &cursor.MemoryStore{} 45 + store.Set("mussel.oyster.cafe", 500) 46 + 47 + migrateLegacyCursor(store, ec.NewKnotSource("mussel.oyster.cafe")) 48 + migrateLegacyCursor(store, ec.NewSpindleSource("mussel.oyster.cafe")) 49 + 50 + if got := store.Get("knot:mussel.oyster.cafe"); got != 500 { 51 + t.Fatalf("knot cursor = %d, want 500", got) 52 + } 53 + if got := store.Get("spindle:mussel.oyster.cafe"); got != 500 { 54 + t.Fatalf("spindle cursor = %d, want 500", got) 55 + } 56 + }