Monorepo for Tangled
tangled.org
1package knotacl
2
3import (
4 "context"
5 "errors"
6 "path/filepath"
7 "slices"
8 "sync"
9 "testing"
10 "time"
11
12 "github.com/bluesky-social/indigo/atproto/syntax"
13
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 "tangled.org/core/orm"
17)
18
19var rosterTestBase = time.Unix(1700000000, 0)
20
21const rosterTTL = 5 * time.Minute
22
23type fakeLister struct {
24 mu sync.Mutex
25 memberCalls int
26 members []string
27 err error
28 started chan struct{}
29 block chan struct{}
30}
31
32func (f *fakeLister) GetKnotMembers(ctx context.Context, host string) ([]string, error) {
33 f.mu.Lock()
34 f.memberCalls++
35 members, err, started, block := f.members, f.err, f.started, f.block
36 f.mu.Unlock()
37 if started != nil {
38 close(started)
39 }
40 if block != nil {
41 <-block
42 }
43 if err != nil {
44 return nil, err
45 }
46 return members, nil
47}
48
49func (f *fakeLister) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) {
50 f.mu.Lock()
51 defer f.mu.Unlock()
52 return f.members, f.err
53}
54
55func (f *fakeLister) calls() int {
56 f.mu.Lock()
57 defer f.mu.Unlock()
58 return f.memberCalls
59}
60
61func (f *fakeLister) set(members []string, err error) {
62 f.mu.Lock()
63 defer f.mu.Unlock()
64 f.members, f.err = members, err
65}
66
67func (f *fakeLister) arm(started, block chan struct{}) {
68 f.mu.Lock()
69 defer f.mu.Unlock()
70 f.started, f.block = started, block
71}
72
73type fakeClock struct {
74 mu sync.Mutex
75 t time.Time
76}
77
78func (c *fakeClock) now() time.Time {
79 c.mu.Lock()
80 defer c.mu.Unlock()
81 return c.t
82}
83
84func (c *fakeClock) advance(d time.Duration) {
85 c.mu.Lock()
86 defer c.mu.Unlock()
87 c.t = c.t.Add(d)
88}
89
90func rosterTestDB(t *testing.T) *db.DB {
91 t.Helper()
92 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "appview.db"))
93 if err != nil {
94 t.Fatalf("db.Make: %v", err)
95 }
96 t.Cleanup(func() { d.Close() })
97 return d
98}
99
100const rosterHost = "knot.nel.pet"
101
102func membersForHost(t *testing.T, d *db.DB, host string) []models.KnotMember {
103 t.Helper()
104 rows, err := db.GetKnotMembers(d, orm.FilterEq("domain", host))
105 if err != nil {
106 t.Fatalf("GetKnotMembers: %v", err)
107 }
108 return rows
109}
110
111func TestRoster_BootstrapThenServesFromSqlite(t *testing.T) {
112 clk := &fakeClock{t: rosterTestBase}
113 f := &fakeLister{members: []string{"did:plc:boltless"}}
114 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
115 ctx := context.Background()
116
117 got, err := r.GetKnotMembers(ctx, rosterHost)
118 if err != nil {
119 t.Fatal(err)
120 }
121 if !slices.Equal(got, []string{"did:plc:boltless"}) {
122 t.Errorf("bootstrap read = %v, want the drained roster", got)
123 }
124
125 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
126 t.Fatal(err)
127 }
128 if f.calls() != 1 {
129 t.Errorf("memberCalls=%d, want 1; a read within the reconcile TTL must be served from sqlite", f.calls())
130 }
131
132 clk.advance(rosterTTL)
133 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
134 t.Fatal(err)
135 }
136 if f.calls() != 2 {
137 t.Errorf("memberCalls=%d, want 2; a stale scope must reconcile from XRPC", f.calls())
138 }
139}
140
141func TestRoster_EventDeltaVisibleWithoutXRPC(t *testing.T) {
142 clk := &fakeClock{t: rosterTestBase}
143 f := &fakeLister{members: []string{"did:plc:boltless"}}
144 d := rosterTestDB(t)
145 r := newRoster(d, f, rosterTTL, clk.now, nil)
146 ctx := context.Background()
147
148 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
149 t.Fatal(err)
150 }
151
152 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:akshay"), 1); err != nil {
153 t.Fatalf("AddKnotMember: %v", err)
154 }
155
156 got, err := r.GetKnotMembers(ctx, rosterHost)
157 if err != nil {
158 t.Fatal(err)
159 }
160 if !slices.Equal(got, []string{"did:plc:akshay", "did:plc:boltless"}) {
161 t.Errorf("post-delta read = %v, want the pushed member reflected", got)
162 }
163 if f.calls() != 1 {
164 t.Errorf("memberCalls=%d, want 1; a pushed delta must not trigger an XRPC reconcile", f.calls())
165 }
166}
167
168func TestRoster_ColdUnreachableErrors(t *testing.T) {
169 clk := &fakeClock{t: rosterTestBase}
170 f := &fakeLister{err: errors.New("knot unreachable")}
171 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
172
173 if _, err := r.GetKnotMembers(context.Background(), rosterHost); !errors.Is(err, ErrKnotUnreachable) {
174 t.Fatalf("err=%v, want ErrKnotUnreachable when cold and the bootstrap drain fails", err)
175 }
176}
177
178func TestRoster_StaleServedWhenUnreachable(t *testing.T) {
179 clk := &fakeClock{t: rosterTestBase}
180 f := &fakeLister{members: []string{"did:plc:boltless"}}
181 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
182 ctx := context.Background()
183
184 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
185 t.Fatal(err)
186 }
187
188 clk.advance(2 * rosterTTL)
189 f.set(nil, errors.New("knot unreachable"))
190
191 got, err := r.GetKnotMembers(ctx, rosterHost)
192 if err != nil {
193 t.Fatalf("err=%v, want stale rows served when a once-synced scope goes unreachable", err)
194 }
195 if !slices.Equal(got, []string{"did:plc:boltless"}) {
196 t.Errorf("stale read = %v, want the last good roster", got)
197 }
198}
199
200func TestRoster_InvalidateForcesReconcile(t *testing.T) {
201 clk := &fakeClock{t: rosterTestBase}
202 f := &fakeLister{members: []string{"did:plc:boltless"}}
203 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
204 ctx := context.Background()
205
206 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
207 t.Fatal(err)
208 }
209 r.InvalidateMembers(rosterHost)
210
211 f.set([]string{"did:plc:akshay"}, nil)
212 got, err := r.GetKnotMembers(ctx, rosterHost)
213 if err != nil {
214 t.Fatal(err)
215 }
216 if !slices.Equal(got, []string{"did:plc:akshay"}) {
217 t.Errorf("post-invalidate read = %v, want a fresh reconcile within the TTL", got)
218 }
219 if f.calls() != 2 {
220 t.Errorf("memberCalls=%d, want 2; invalidation must force the next read to reconcile", f.calls())
221 }
222}
223
224func TestRoster_SingleflightCollapsesConcurrentColdReads(t *testing.T) {
225 clk := &fakeClock{t: rosterTestBase}
226 started := make(chan struct{})
227 release := make(chan struct{})
228 f := &fakeLister{members: []string{"did:plc:boltless"}, started: started, block: release}
229 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
230 ctx := context.Background()
231
232 var wg sync.WaitGroup
233 call := func() {
234 wg.Add(1)
235 go func() {
236 defer wg.Done()
237 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
238 t.Errorf("GetKnotMembers: %v", err)
239 }
240 }()
241 }
242
243 call()
244 <-started
245 for range make([]struct{}, 8) {
246 call()
247 }
248 time.Sleep(20 * time.Millisecond)
249 close(release)
250 wg.Wait()
251
252 if f.calls() != 1 {
253 t.Errorf("memberCalls=%d, want 1; concurrent cold reads must collapse into a single reconcile", f.calls())
254 }
255}
256
257func TestRoster_MemberDeltaIdempotentAndScoped(t *testing.T) {
258 clk := &fakeClock{t: rosterTestBase}
259 d := rosterTestDB(t)
260 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil)
261
262 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 1); err != nil {
263 t.Fatalf("AddKnotMember: %v", err)
264 }
265 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 2); err != nil {
266 t.Fatalf("AddKnotMember: %v", err)
267 }
268 if err := r.RemoveKnotMember("other.nel.pet", syntax.DID("did:plc:boltless"), 1); err != nil {
269 t.Fatalf("RemoveKnotMember other host: %v", err)
270 }
271
272 if rows := membersForHost(t, d, rosterHost); len(rows) != 1 {
273 t.Fatalf("members = %v, want a single row after a duplicate add and an unrelated-host remove", rows)
274 }
275
276 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 3); err != nil {
277 t.Fatalf("RemoveKnotMember: %v", err)
278 }
279 if rows := membersForHost(t, d, rosterHost); len(rows) != 0 {
280 t.Fatalf("members = %v, want empty after remove", rows)
281 }
282}
283
284func TestRoster_NativeAddReplacesLegacyRow(t *testing.T) {
285 clk := &fakeClock{t: rosterTestBase}
286 d := rosterTestDB(t)
287 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil)
288
289 if err := db.AddKnotMember(d, models.KnotMember{
290 Did: syntax.DID("did:plc:akshay"),
291 Rkey: "legacy-rkey",
292 Domain: rosterHost,
293 Subject: syntax.DID("did:plc:boltless"),
294 }); err != nil {
295 t.Fatalf("seed legacy row: %v", err)
296 }
297
298 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 1); err != nil {
299 t.Fatalf("AddKnotMember: %v", err)
300 }
301
302 rows := membersForHost(t, d, rosterHost)
303 if len(rows) != 1 {
304 t.Fatalf("members = %v, want a single canonical row after a native add over a legacy row", rows)
305 }
306 if rows[0].Did != "" {
307 t.Errorf("row did = %q, want empty; the native write must own the (domain, subject) identity", rows[0].Did)
308 }
309}
310
311func TestRoster_StaleDeltaIgnored(t *testing.T) {
312 clk := &fakeClock{t: rosterTestBase}
313 d := rosterTestDB(t)
314 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil)
315
316 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil {
317 t.Fatalf("AddKnotMember: %v", err)
318 }
319 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 50); err != nil {
320 t.Fatalf("RemoveKnotMember: %v", err)
321 }
322
323 if rows := membersForHost(t, d, rosterHost); len(rows) != 1 {
324 t.Fatalf("members = %v, want the add preserved; a lower-cursor remove arriving late must be ignored", rows)
325 }
326}
327
328func TestRoster_NewerRemoveWins(t *testing.T) {
329 clk := &fakeClock{t: rosterTestBase}
330 d := rosterTestDB(t)
331 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil)
332
333 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil {
334 t.Fatalf("AddKnotMember: %v", err)
335 }
336 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 101); err != nil {
337 t.Fatalf("RemoveKnotMember: %v", err)
338 }
339
340 if rows := membersForHost(t, d, rosterHost); len(rows) != 0 {
341 t.Fatalf("members = %v, want empty; a higher-cursor remove must win", rows)
342 }
343}
344
345func TestRoster_LateAddAfterRemoveIgnored(t *testing.T) {
346 clk := &fakeClock{t: rosterTestBase}
347 d := rosterTestDB(t)
348 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil)
349
350 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 101); err != nil {
351 t.Fatalf("RemoveKnotMember: %v", err)
352 }
353 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil {
354 t.Fatalf("AddKnotMember: %v", err)
355 }
356
357 if rows := membersForHost(t, d, rosterHost); len(rows) != 0 {
358 t.Fatalf("members = %v, want empty; an add older than the applied remove must be ignored", rows)
359 }
360}
361
362func TestRoster_ReconcilePrunesStaleCursors(t *testing.T) {
363 clk := &fakeClock{t: rosterTestBase}
364 d := rosterTestDB(t)
365 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil)
366 ctx := context.Background()
367
368 stale := Cursor(rosterTestBase.Add(-2 * cursorRetention).UnixNano())
369 fresh := Cursor(rosterTestBase.UnixNano())
370
371 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:clam"), stale); err != nil {
372 t.Fatalf("AddKnotMember stale: %v", err)
373 }
374 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:whelk"), fresh); err != nil {
375 t.Fatalf("AddKnotMember fresh: %v", err)
376 }
377
378 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
379 t.Fatalf("reconcile: %v", err)
380 }
381
382 scope := memberScope(rosterHost)
383 if _, ok, err := seenCursor(d, scope, syntax.DID("did:plc:clam")); err != nil || ok {
384 t.Errorf("stale cursor present (ok=%v err=%v); reconcile must prune cursors older than the retention window", ok, err)
385 }
386 if _, ok, err := seenCursor(d, scope, syntax.DID("did:plc:whelk")); err != nil || !ok {
387 t.Errorf("fresh cursor missing (ok=%v err=%v); reconcile must keep cursors within the retention window", ok, err)
388 }
389}
390
391func TestRoster_ReconcileDoesNotClobberConcurrentDelta(t *testing.T) {
392 clk := &fakeClock{t: rosterTestBase}
393 f := &fakeLister{members: []string{"did:plc:boltless"}}
394 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
395 ctx := context.Background()
396
397 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
398 t.Fatal(err)
399 }
400
401 clk.advance(rosterTTL)
402
403 started := make(chan struct{})
404 release := make(chan struct{})
405 f.arm(started, release)
406
407 done := make(chan []string, 1)
408 go func() {
409 got, err := r.GetKnotMembers(ctx, rosterHost)
410 if err != nil {
411 t.Errorf("reconcile read: %v", err)
412 }
413 done <- got
414 }()
415
416 <-started
417 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:akshay"), 1); err != nil {
418 t.Fatalf("AddKnotMember: %v", err)
419 }
420 close(release)
421
422 got := <-done
423 if !slices.Contains(got, "did:plc:akshay") {
424 t.Errorf("post-reconcile read = %v, want the delta that landed during the drain preserved", got)
425 }
426}
427
428func TestRoster_BackoffSuppressesRepeatedDrains(t *testing.T) {
429 clk := &fakeClock{t: rosterTestBase}
430 f := &fakeLister{members: []string{"did:plc:boltless"}}
431 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
432 ctx := context.Background()
433
434 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
435 t.Fatal(err)
436 }
437 if f.calls() != 1 {
438 t.Fatalf("memberCalls=%d, want 1 after bootstrap", f.calls())
439 }
440
441 clk.advance(rosterTTL)
442 f.set(nil, errors.New("knot unreachable"))
443
444 if got, err := r.GetKnotMembers(ctx, rosterHost); err != nil || !slices.Equal(got, []string{"did:plc:boltless"}) {
445 t.Fatalf("got %v err %v, want the stale roster served", got, err)
446 }
447 if f.calls() != 2 {
448 t.Fatalf("memberCalls=%d, want 2 after the first stale drain", f.calls())
449 }
450
451 if got, err := r.GetKnotMembers(ctx, rosterHost); err != nil || !slices.Equal(got, []string{"did:plc:boltless"}) {
452 t.Fatalf("got %v err %v, want the stale roster served", got, err)
453 }
454 if f.calls() != 2 {
455 t.Errorf("memberCalls=%d, want 2; a down knot must not be re-probed within the backoff window", f.calls())
456 }
457
458 clk.advance(reconcileBackoff)
459 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil {
460 t.Fatal(err)
461 }
462 if f.calls() != 3 {
463 t.Errorf("memberCalls=%d, want 3; the probe must resume once the backoff window elapses", f.calls())
464 }
465}
466
467func TestRoster_BackoffStillErrorsWhenCold(t *testing.T) {
468 clk := &fakeClock{t: rosterTestBase}
469 f := &fakeLister{err: errors.New("knot unreachable")}
470 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil)
471 ctx := context.Background()
472
473 if _, err := r.GetKnotMembers(ctx, rosterHost); !errors.Is(err, ErrKnotUnreachable) {
474 t.Fatalf("err=%v, want ErrKnotUnreachable on the cold drain failure", err)
475 }
476 if f.calls() != 1 {
477 t.Fatalf("memberCalls=%d, want 1", f.calls())
478 }
479
480 if _, err := r.GetKnotMembers(ctx, rosterHost); !errors.Is(err, ErrKnotUnreachable) {
481 t.Fatalf("err=%v, want ErrKnotUnreachable during backoff for a cold scope", err)
482 }
483 if f.calls() != 1 {
484 t.Errorf("memberCalls=%d, want 1; a cold scope must not be re-probed within the backoff window", f.calls())
485 }
486}