Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "path/filepath"
8 "testing"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11
12 "tangled.org/core/appview/db"
13 "tangled.org/core/appview/knotacl"
14 "tangled.org/core/appview/models"
15 ec "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventstream"
17 knotdb "tangled.org/core/knotserver/db"
18)
19
20const (
21 aclTestHost = "knot.nel.pet"
22 aclTestRepoDid = "did:plc:limpet"
23 aclTestOwner = "did:plc:akshay"
24 aclTestSubject = "did:plc:boltless"
25)
26
27type memberCall struct {
28 host string
29 subject string
30}
31
32type collabCall struct {
33 repoDid string
34 subject string
35}
36
37type recordingAcl struct {
38 memberAdd []memberCall
39 memberRemove []memberCall
40 collabAdd []collabCall
41 collabRemove []collabCall
42 membersInvalid []string
43 collabsInvalid []collabCall
44}
45
46func (r *recordingAcl) AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error {
47 r.memberAdd = append(r.memberAdd, memberCall{host, subject.String()})
48 return nil
49}
50
51func (r *recordingAcl) RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error {
52 r.memberRemove = append(r.memberRemove, memberCall{host, subject.String()})
53 return nil
54}
55
56func (r *recordingAcl) AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error {
57 r.collabAdd = append(r.collabAdd, collabCall{repoDid.String(), subject.String()})
58 return nil
59}
60
61func (r *recordingAcl) RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error {
62 r.collabRemove = append(r.collabRemove, collabCall{repoDid.String(), subject.String()})
63 return nil
64}
65
66func (r *recordingAcl) InvalidateMembers(host string) {
67 r.membersInvalid = append(r.membersInvalid, host)
68}
69
70func (r *recordingAcl) InvalidateCollaborators(host, repoDid string) {
71 r.collabsInvalid = append(r.collabsInvalid, collabCall{repoDid, host})
72}
73
74type flakyAcl struct {
75 failsLeft int
76 calls int
77 membersInvalid int
78 collabsInvalid int
79}
80
81func (a *flakyAcl) try() error {
82 a.calls++
83 if a.failsLeft > 0 {
84 a.failsLeft--
85 return errors.New("transient store error")
86 }
87 return nil
88}
89
90func (a *flakyAcl) AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error {
91 return a.try()
92}
93func (a *flakyAcl) RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error {
94 return a.try()
95}
96func (a *flakyAcl) AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error {
97 return a.try()
98}
99func (a *flakyAcl) RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error {
100 return a.try()
101}
102func (a *flakyAcl) InvalidateMembers(host string) { a.membersInvalid++ }
103func (a *flakyAcl) InvalidateCollaborators(host, repo string) { a.collabsInvalid++ }
104
105func aclTestDB(t *testing.T) *db.DB {
106 t.Helper()
107 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "appview.db"))
108 if err != nil {
109 t.Fatalf("db.Make: %v", err)
110 }
111 t.Cleanup(func() { d.Close() })
112 return d
113}
114
115func seedAclRepo(t *testing.T, d *db.DB) {
116 t.Helper()
117 tx, err := d.Begin()
118 if err != nil {
119 t.Fatalf("begin: %v", err)
120 }
121 if err := db.AddRepo(tx, &models.Repo{
122 Did: aclTestOwner,
123 Knot: aclTestHost,
124 RepoDid: aclTestRepoDid,
125 Name: "anemone",
126 }); err != nil {
127 t.Fatalf("AddRepo: %v", err)
128 }
129 if err := tx.Commit(); err != nil {
130 t.Fatalf("commit: %v", err)
131 }
132}
133
134func memberEvent(t *testing.T, op knotdb.AclOp, subject string) eventstream.Event {
135 t.Helper()
136 payload, err := json.Marshal(knotdb.KnotMemberUpdate{Op: op, Subject: subject})
137 if err != nil {
138 t.Fatalf("marshal memberUpdate: %v", err)
139 }
140 return eventstream.Event{Rkey: "evt", Nsid: knotdb.KnotMemberUpdateNSID, EventJson: payload}
141}
142
143func collabEvent(t *testing.T, op knotdb.AclOp, subject, repoDid string) eventstream.Event {
144 t.Helper()
145 payload, err := json.Marshal(knotdb.RepoCollaboratorUpdate{Op: op, Subject: subject, Repo: repoDid})
146 if err != nil {
147 t.Fatalf("marshal collaboratorUpdate: %v", err)
148 }
149 return eventstream.Event{Rkey: "evt", Nsid: knotdb.RepoCollaboratorUpdateNSID, EventJson: payload}
150}
151
152func TestIngestKnotMemberUpdate_DispatchesAddThenRemove(t *testing.T) {
153 acl := &recordingAcl{}
154 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
155
156 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil {
157 t.Fatalf("add: %v", err)
158 }
159 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpRemove, aclTestSubject)); err != nil {
160 t.Fatalf("remove: %v", err)
161 }
162
163 if len(acl.memberAdd) != 1 || acl.memberAdd[0] != (memberCall{aclTestHost, aclTestSubject}) {
164 t.Errorf("memberAdd = %v, want one add scoped to the source host", acl.memberAdd)
165 }
166 if len(acl.memberRemove) != 1 || acl.memberRemove[0] != (memberCall{aclTestHost, aclTestSubject}) {
167 t.Errorf("memberRemove = %v, want one remove scoped to the source host", acl.memberRemove)
168 }
169}
170
171func TestIngestKnotMemberUpdate_UnknownOpErrors(t *testing.T) {
172 acl := &recordingAcl{}
173 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
174 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOp("bogus"), aclTestSubject)); err == nil {
175 t.Fatal("an unknown op must be rejected")
176 }
177 if len(acl.memberAdd)+len(acl.memberRemove) != 0 {
178 t.Errorf("an unknown op must not reach the roster: %+v", acl)
179 }
180}
181
182func TestIngestKnotMemberUpdate_BadSubjectErrors(t *testing.T) {
183 acl := &recordingAcl{}
184 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
185 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, "not-a-did")); err == nil {
186 t.Fatal("a malformed subject DID must be rejected")
187 }
188 if len(acl.memberAdd) != 0 {
189 t.Errorf("a malformed subject must not reach the roster: %v", acl.memberAdd)
190 }
191}
192
193func TestIngestCollaboratorUpdate_DispatchesAddThenRemove(t *testing.T) {
194 ctx := context.Background()
195 d := aclTestDB(t)
196 seedAclRepo(t, d)
197 acl := &recordingAcl{}
198 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
199
200 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil {
201 t.Fatalf("add: %v", err)
202 }
203 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpRemove, aclTestSubject, aclTestRepoDid)); err != nil {
204 t.Fatalf("remove: %v", err)
205 }
206
207 if len(acl.collabAdd) != 1 || acl.collabAdd[0] != (collabCall{aclTestRepoDid, aclTestSubject}) {
208 t.Errorf("collabAdd = %v, want one add for the repo", acl.collabAdd)
209 }
210 if len(acl.collabRemove) != 1 || acl.collabRemove[0] != (collabCall{aclTestRepoDid, aclTestSubject}) {
211 t.Errorf("collabRemove = %v, want one remove for the repo", acl.collabRemove)
212 }
213}
214
215func TestIngestCollaboratorUpdate_UnindexedRepoSkips(t *testing.T) {
216 ctx := context.Background()
217 d := aclTestDB(t)
218 acl := &recordingAcl{}
219 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
220
221 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil {
222 t.Fatalf("add for unindexed repo must not error, got: %v", err)
223 }
224 if len(acl.collabAdd) != 0 {
225 t.Errorf("an add for an unindexed repo must not reach the roster: %v", acl.collabAdd)
226 }
227}
228
229func TestIngestCollaboratorUpdate_ForeignKnotDropped(t *testing.T) {
230 ctx := context.Background()
231 d := aclTestDB(t)
232 seedAclRepo(t, d)
233 acl := &recordingAcl{}
234 source := ec.Source{Kind: ec.KindKnot, Host: "barnacle.nel.pet"}
235
236 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil {
237 t.Fatalf("a foreign-knot collaboratorUpdate must be dropped, not error: %v", err)
238 }
239 if len(acl.collabAdd) != 0 {
240 t.Errorf("a knot that does not host the repo must not mutate its collaborators: %v", acl.collabAdd)
241 }
242
243 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpRemove, aclTestSubject, aclTestRepoDid)); err != nil {
244 t.Fatalf("a foreign-knot remove must be dropped, not error: %v", err)
245 }
246 if len(acl.collabRemove) != 0 {
247 t.Errorf("a knot that does not host the repo must not remove its collaborators: %v", acl.collabRemove)
248 }
249}
250
251func TestIngestCollaboratorUpdate_BadDidErrors(t *testing.T) {
252 ctx := context.Background()
253 d := aclTestDB(t)
254 acl := &recordingAcl{}
255 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
256 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, "not-a-did", aclTestRepoDid)); err == nil {
257 t.Fatal("a malformed subject DID must be rejected")
258 }
259 if len(acl.collabAdd) != 0 {
260 t.Errorf("a malformed subject must not reach the roster: %v", acl.collabAdd)
261 }
262}
263
264func TestIngestKnotMemberUpdate_RetriesTransientThenSucceeds(t *testing.T) {
265 acl := &flakyAcl{failsLeft: aclIngestAttempts - 1}
266 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
267
268 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil {
269 t.Fatalf("a transient store error within the retry budget must recover, got: %v", err)
270 }
271 if acl.calls != aclIngestAttempts {
272 t.Errorf("calls = %d, want %d; the write must retry until it lands", acl.calls, aclIngestAttempts)
273 }
274}
275
276func TestIngestKnotMemberUpdate_GivesUpAfterAttempts(t *testing.T) {
277 acl := &flakyAcl{failsLeft: aclIngestAttempts + 5}
278 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
279
280 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err == nil {
281 t.Fatal("a persistent store error must surface so the failure is logged")
282 }
283 if acl.calls != aclIngestAttempts {
284 t.Errorf("calls = %d, want %d; the retry must be bounded", acl.calls, aclIngestAttempts)
285 }
286 if acl.membersInvalid != 1 {
287 t.Errorf("membersInvalid = %d, want 1; a dropped delta must invalidate the scope so the next read reconciles instead of waiting out the TTL", acl.membersInvalid)
288 }
289}
290
291func TestIngestCollaboratorUpdate_InvalidatesScopeOnGiveUp(t *testing.T) {
292 ctx := context.Background()
293 d := aclTestDB(t)
294 seedAclRepo(t, d)
295 acl := &flakyAcl{failsLeft: aclIngestAttempts + 5}
296 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
297
298 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err == nil {
299 t.Fatal("a persistent store error must surface so the failure is logged")
300 }
301 if acl.collabsInvalid != 1 {
302 t.Errorf("collabsInvalid = %d, want 1; a dropped delta must invalidate the scope", acl.collabsInvalid)
303 }
304}
305
306func TestIngestKnotMemberUpdate_NoInvalidateOnSuccess(t *testing.T) {
307 acl := &flakyAcl{failsLeft: aclIngestAttempts - 1}
308 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
309
310 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil {
311 t.Fatalf("a recoverable delta must not error: %v", err)
312 }
313 if acl.membersInvalid != 0 {
314 t.Errorf("membersInvalid = %d, want 0; a delta that lands must not force a reconcile", acl.membersInvalid)
315 }
316}
317
318func TestIngestCollaboratorUpdate_StoreErrorPropagates(t *testing.T) {
319 ctx := context.Background()
320 d := aclTestDB(t)
321 if err := d.Close(); err != nil {
322 t.Fatalf("close: %v", err)
323 }
324 acl := &recordingAcl{}
325 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost}
326
327 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err == nil {
328 t.Fatal("a store error on the repo lookup must surface, not be swallowed as an unindexed-repo skip")
329 }
330 if len(acl.collabAdd) != 0 {
331 t.Errorf("a failed repo lookup must not reach the roster: %v", acl.collabAdd)
332 }
333}