Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/lqyotq 14 kB View raw
1package knotacl 2 3import ( 4 "context" 5 "errors" 6 "path/filepath" 7 "slices" 8 "sync" 9 "testing" 10 "time" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/models" 16 "tangled.org/core/orm" 17) 18 19var rosterTestBase = time.Unix(1700000000, 0) 20 21const rosterTTL = 5 * time.Minute 22 23type fakeLister struct { 24 mu sync.Mutex 25 memberCalls int 26 members []string 27 err error 28 started chan struct{} 29 block chan struct{} 30} 31 32func (f *fakeLister) GetKnotMembers(ctx context.Context, host string) ([]string, error) { 33 f.mu.Lock() 34 f.memberCalls++ 35 members, err, started, block := f.members, f.err, f.started, f.block 36 f.mu.Unlock() 37 if started != nil { 38 close(started) 39 } 40 if block != nil { 41 <-block 42 } 43 if err != nil { 44 return nil, err 45 } 46 return members, nil 47} 48 49func (f *fakeLister) GetRepoCollaborators(ctx context.Context, host, repoDid string) ([]string, error) { 50 f.mu.Lock() 51 defer f.mu.Unlock() 52 return f.members, f.err 53} 54 55func (f *fakeLister) calls() int { 56 f.mu.Lock() 57 defer f.mu.Unlock() 58 return f.memberCalls 59} 60 61func (f *fakeLister) set(members []string, err error) { 62 f.mu.Lock() 63 defer f.mu.Unlock() 64 f.members, f.err = members, err 65} 66 67func (f *fakeLister) arm(started, block chan struct{}) { 68 f.mu.Lock() 69 defer f.mu.Unlock() 70 f.started, f.block = started, block 71} 72 73type fakeClock struct { 74 mu sync.Mutex 75 t time.Time 76} 77 78func (c *fakeClock) now() time.Time { 79 c.mu.Lock() 80 defer c.mu.Unlock() 81 return c.t 82} 83 84func (c *fakeClock) advance(d time.Duration) { 85 c.mu.Lock() 86 defer c.mu.Unlock() 87 c.t = c.t.Add(d) 88} 89 90func rosterTestDB(t *testing.T) *db.DB { 91 t.Helper() 92 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "appview.db")) 93 if err != nil { 94 t.Fatalf("db.Make: %v", err) 95 } 96 t.Cleanup(func() { d.Close() }) 97 return d 98} 99 100const rosterHost = "knot.nel.pet" 101 102func membersForHost(t *testing.T, d *db.DB, host string) []models.KnotMember { 103 t.Helper() 104 rows, err := db.GetKnotMembers(d, orm.FilterEq("domain", host)) 105 if err != nil { 106 t.Fatalf("GetKnotMembers: %v", err) 107 } 108 return rows 109} 110 111func TestRoster_BootstrapThenServesFromSqlite(t *testing.T) { 112 clk := &fakeClock{t: rosterTestBase} 113 f := &fakeLister{members: []string{"did:plc:boltless"}} 114 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 115 ctx := context.Background() 116 117 got, err := r.GetKnotMembers(ctx, rosterHost) 118 if err != nil { 119 t.Fatal(err) 120 } 121 if !slices.Equal(got, []string{"did:plc:boltless"}) { 122 t.Errorf("bootstrap read = %v, want the drained roster", got) 123 } 124 125 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 126 t.Fatal(err) 127 } 128 if f.calls() != 1 { 129 t.Errorf("memberCalls=%d, want 1; a read within the reconcile TTL must be served from sqlite", f.calls()) 130 } 131 132 clk.advance(rosterTTL) 133 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 134 t.Fatal(err) 135 } 136 if f.calls() != 2 { 137 t.Errorf("memberCalls=%d, want 2; a stale scope must reconcile from XRPC", f.calls()) 138 } 139} 140 141func TestRoster_EventDeltaVisibleWithoutXRPC(t *testing.T) { 142 clk := &fakeClock{t: rosterTestBase} 143 f := &fakeLister{members: []string{"did:plc:boltless"}} 144 d := rosterTestDB(t) 145 r := newRoster(d, f, rosterTTL, clk.now, nil) 146 ctx := context.Background() 147 148 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 149 t.Fatal(err) 150 } 151 152 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:akshay"), 1); err != nil { 153 t.Fatalf("AddKnotMember: %v", err) 154 } 155 156 got, err := r.GetKnotMembers(ctx, rosterHost) 157 if err != nil { 158 t.Fatal(err) 159 } 160 if !slices.Equal(got, []string{"did:plc:akshay", "did:plc:boltless"}) { 161 t.Errorf("post-delta read = %v, want the pushed member reflected", got) 162 } 163 if f.calls() != 1 { 164 t.Errorf("memberCalls=%d, want 1; a pushed delta must not trigger an XRPC reconcile", f.calls()) 165 } 166} 167 168func TestRoster_ColdUnreachableErrors(t *testing.T) { 169 clk := &fakeClock{t: rosterTestBase} 170 f := &fakeLister{err: errors.New("knot unreachable")} 171 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 172 173 if _, err := r.GetKnotMembers(context.Background(), rosterHost); !errors.Is(err, ErrKnotUnreachable) { 174 t.Fatalf("err=%v, want ErrKnotUnreachable when cold and the bootstrap drain fails", err) 175 } 176} 177 178func TestRoster_StaleServedWhenUnreachable(t *testing.T) { 179 clk := &fakeClock{t: rosterTestBase} 180 f := &fakeLister{members: []string{"did:plc:boltless"}} 181 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 182 ctx := context.Background() 183 184 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 185 t.Fatal(err) 186 } 187 188 clk.advance(2 * rosterTTL) 189 f.set(nil, errors.New("knot unreachable")) 190 191 got, err := r.GetKnotMembers(ctx, rosterHost) 192 if err != nil { 193 t.Fatalf("err=%v, want stale rows served when a once-synced scope goes unreachable", err) 194 } 195 if !slices.Equal(got, []string{"did:plc:boltless"}) { 196 t.Errorf("stale read = %v, want the last good roster", got) 197 } 198} 199 200func TestRoster_InvalidateForcesReconcile(t *testing.T) { 201 clk := &fakeClock{t: rosterTestBase} 202 f := &fakeLister{members: []string{"did:plc:boltless"}} 203 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 204 ctx := context.Background() 205 206 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 207 t.Fatal(err) 208 } 209 r.InvalidateMembers(rosterHost) 210 211 f.set([]string{"did:plc:akshay"}, nil) 212 got, err := r.GetKnotMembers(ctx, rosterHost) 213 if err != nil { 214 t.Fatal(err) 215 } 216 if !slices.Equal(got, []string{"did:plc:akshay"}) { 217 t.Errorf("post-invalidate read = %v, want a fresh reconcile within the TTL", got) 218 } 219 if f.calls() != 2 { 220 t.Errorf("memberCalls=%d, want 2; invalidation must force the next read to reconcile", f.calls()) 221 } 222} 223 224func TestRoster_SingleflightCollapsesConcurrentColdReads(t *testing.T) { 225 clk := &fakeClock{t: rosterTestBase} 226 started := make(chan struct{}) 227 release := make(chan struct{}) 228 f := &fakeLister{members: []string{"did:plc:boltless"}, started: started, block: release} 229 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 230 ctx := context.Background() 231 232 var wg sync.WaitGroup 233 call := func() { 234 wg.Add(1) 235 go func() { 236 defer wg.Done() 237 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 238 t.Errorf("GetKnotMembers: %v", err) 239 } 240 }() 241 } 242 243 call() 244 <-started 245 for range make([]struct{}, 8) { 246 call() 247 } 248 time.Sleep(20 * time.Millisecond) 249 close(release) 250 wg.Wait() 251 252 if f.calls() != 1 { 253 t.Errorf("memberCalls=%d, want 1; concurrent cold reads must collapse into a single reconcile", f.calls()) 254 } 255} 256 257func TestRoster_MemberDeltaIdempotentAndScoped(t *testing.T) { 258 clk := &fakeClock{t: rosterTestBase} 259 d := rosterTestDB(t) 260 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 261 262 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 1); err != nil { 263 t.Fatalf("AddKnotMember: %v", err) 264 } 265 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 2); err != nil { 266 t.Fatalf("AddKnotMember: %v", err) 267 } 268 if err := r.RemoveKnotMember("other.nel.pet", syntax.DID("did:plc:boltless"), 1); err != nil { 269 t.Fatalf("RemoveKnotMember other host: %v", err) 270 } 271 272 if rows := membersForHost(t, d, rosterHost); len(rows) != 1 { 273 t.Fatalf("members = %v, want a single row after a duplicate add and an unrelated-host remove", rows) 274 } 275 276 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 3); err != nil { 277 t.Fatalf("RemoveKnotMember: %v", err) 278 } 279 if rows := membersForHost(t, d, rosterHost); len(rows) != 0 { 280 t.Fatalf("members = %v, want empty after remove", rows) 281 } 282} 283 284func TestRoster_NativeAddReplacesLegacyRow(t *testing.T) { 285 clk := &fakeClock{t: rosterTestBase} 286 d := rosterTestDB(t) 287 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 288 289 if err := db.AddKnotMember(d, models.KnotMember{ 290 Did: syntax.DID("did:plc:akshay"), 291 Rkey: "legacy-rkey", 292 Domain: rosterHost, 293 Subject: syntax.DID("did:plc:boltless"), 294 }); err != nil { 295 t.Fatalf("seed legacy row: %v", err) 296 } 297 298 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 1); err != nil { 299 t.Fatalf("AddKnotMember: %v", err) 300 } 301 302 rows := membersForHost(t, d, rosterHost) 303 if len(rows) != 1 { 304 t.Fatalf("members = %v, want a single canonical row after a native add over a legacy row", rows) 305 } 306 if rows[0].Did != "" { 307 t.Errorf("row did = %q, want empty; the native write must own the (domain, subject) identity", rows[0].Did) 308 } 309} 310 311func TestRoster_StaleDeltaIgnored(t *testing.T) { 312 clk := &fakeClock{t: rosterTestBase} 313 d := rosterTestDB(t) 314 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 315 316 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil { 317 t.Fatalf("AddKnotMember: %v", err) 318 } 319 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 50); err != nil { 320 t.Fatalf("RemoveKnotMember: %v", err) 321 } 322 323 if rows := membersForHost(t, d, rosterHost); len(rows) != 1 { 324 t.Fatalf("members = %v, want the add preserved; a lower-cursor remove arriving late must be ignored", rows) 325 } 326} 327 328func TestRoster_NewerRemoveWins(t *testing.T) { 329 clk := &fakeClock{t: rosterTestBase} 330 d := rosterTestDB(t) 331 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 332 333 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil { 334 t.Fatalf("AddKnotMember: %v", err) 335 } 336 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 101); err != nil { 337 t.Fatalf("RemoveKnotMember: %v", err) 338 } 339 340 if rows := membersForHost(t, d, rosterHost); len(rows) != 0 { 341 t.Fatalf("members = %v, want empty; a higher-cursor remove must win", rows) 342 } 343} 344 345func TestRoster_LateAddAfterRemoveIgnored(t *testing.T) { 346 clk := &fakeClock{t: rosterTestBase} 347 d := rosterTestDB(t) 348 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 349 350 if err := r.RemoveKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 101); err != nil { 351 t.Fatalf("RemoveKnotMember: %v", err) 352 } 353 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:boltless"), 100); err != nil { 354 t.Fatalf("AddKnotMember: %v", err) 355 } 356 357 if rows := membersForHost(t, d, rosterHost); len(rows) != 0 { 358 t.Fatalf("members = %v, want empty; an add older than the applied remove must be ignored", rows) 359 } 360} 361 362func TestRoster_ReconcilePrunesStaleCursors(t *testing.T) { 363 clk := &fakeClock{t: rosterTestBase} 364 d := rosterTestDB(t) 365 r := newRoster(d, &fakeLister{}, rosterTTL, clk.now, nil) 366 ctx := context.Background() 367 368 stale := Cursor(rosterTestBase.Add(-2 * cursorRetention).UnixNano()) 369 fresh := Cursor(rosterTestBase.UnixNano()) 370 371 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:clam"), stale); err != nil { 372 t.Fatalf("AddKnotMember stale: %v", err) 373 } 374 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:whelk"), fresh); err != nil { 375 t.Fatalf("AddKnotMember fresh: %v", err) 376 } 377 378 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 379 t.Fatalf("reconcile: %v", err) 380 } 381 382 scope := memberScope(rosterHost) 383 if _, ok, err := seenCursor(d, scope, syntax.DID("did:plc:clam")); err != nil || ok { 384 t.Errorf("stale cursor present (ok=%v err=%v); reconcile must prune cursors older than the retention window", ok, err) 385 } 386 if _, ok, err := seenCursor(d, scope, syntax.DID("did:plc:whelk")); err != nil || !ok { 387 t.Errorf("fresh cursor missing (ok=%v err=%v); reconcile must keep cursors within the retention window", ok, err) 388 } 389} 390 391func TestRoster_ReconcileDoesNotClobberConcurrentDelta(t *testing.T) { 392 clk := &fakeClock{t: rosterTestBase} 393 f := &fakeLister{members: []string{"did:plc:boltless"}} 394 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 395 ctx := context.Background() 396 397 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 398 t.Fatal(err) 399 } 400 401 clk.advance(rosterTTL) 402 403 started := make(chan struct{}) 404 release := make(chan struct{}) 405 f.arm(started, release) 406 407 done := make(chan []string, 1) 408 go func() { 409 got, err := r.GetKnotMembers(ctx, rosterHost) 410 if err != nil { 411 t.Errorf("reconcile read: %v", err) 412 } 413 done <- got 414 }() 415 416 <-started 417 if err := r.AddKnotMember(rosterHost, syntax.DID("did:plc:akshay"), 1); err != nil { 418 t.Fatalf("AddKnotMember: %v", err) 419 } 420 close(release) 421 422 got := <-done 423 if !slices.Contains(got, "did:plc:akshay") { 424 t.Errorf("post-reconcile read = %v, want the delta that landed during the drain preserved", got) 425 } 426} 427 428func TestRoster_BackoffSuppressesRepeatedDrains(t *testing.T) { 429 clk := &fakeClock{t: rosterTestBase} 430 f := &fakeLister{members: []string{"did:plc:boltless"}} 431 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 432 ctx := context.Background() 433 434 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 435 t.Fatal(err) 436 } 437 if f.calls() != 1 { 438 t.Fatalf("memberCalls=%d, want 1 after bootstrap", f.calls()) 439 } 440 441 clk.advance(rosterTTL) 442 f.set(nil, errors.New("knot unreachable")) 443 444 if got, err := r.GetKnotMembers(ctx, rosterHost); err != nil || !slices.Equal(got, []string{"did:plc:boltless"}) { 445 t.Fatalf("got %v err %v, want the stale roster served", got, err) 446 } 447 if f.calls() != 2 { 448 t.Fatalf("memberCalls=%d, want 2 after the first stale drain", f.calls()) 449 } 450 451 if got, err := r.GetKnotMembers(ctx, rosterHost); err != nil || !slices.Equal(got, []string{"did:plc:boltless"}) { 452 t.Fatalf("got %v err %v, want the stale roster served", got, err) 453 } 454 if f.calls() != 2 { 455 t.Errorf("memberCalls=%d, want 2; a down knot must not be re-probed within the backoff window", f.calls()) 456 } 457 458 clk.advance(reconcileBackoff) 459 if _, err := r.GetKnotMembers(ctx, rosterHost); err != nil { 460 t.Fatal(err) 461 } 462 if f.calls() != 3 { 463 t.Errorf("memberCalls=%d, want 3; the probe must resume once the backoff window elapses", f.calls()) 464 } 465} 466 467func TestRoster_BackoffStillErrorsWhenCold(t *testing.T) { 468 clk := &fakeClock{t: rosterTestBase} 469 f := &fakeLister{err: errors.New("knot unreachable")} 470 r := newRoster(rosterTestDB(t), f, rosterTTL, clk.now, nil) 471 ctx := context.Background() 472 473 if _, err := r.GetKnotMembers(ctx, rosterHost); !errors.Is(err, ErrKnotUnreachable) { 474 t.Fatalf("err=%v, want ErrKnotUnreachable on the cold drain failure", err) 475 } 476 if f.calls() != 1 { 477 t.Fatalf("memberCalls=%d, want 1", f.calls()) 478 } 479 480 if _, err := r.GetKnotMembers(ctx, rosterHost); !errors.Is(err, ErrKnotUnreachable) { 481 t.Fatalf("err=%v, want ErrKnotUnreachable during backoff for a cold scope", err) 482 } 483 if f.calls() != 1 { 484 t.Errorf("memberCalls=%d, want 1; a cold scope must not be re-probed within the backoff window", f.calls()) 485 } 486}