Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "path/filepath" 8 "testing" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 12 "tangled.org/core/appview/db" 13 "tangled.org/core/appview/knotacl" 14 "tangled.org/core/appview/models" 15 ec "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventstream" 17 knotdb "tangled.org/core/knotserver/db" 18) 19 20const ( 21 aclTestHost = "knot.nel.pet" 22 aclTestRepoDid = "did:plc:limpet" 23 aclTestOwner = "did:plc:akshay" 24 aclTestSubject = "did:plc:boltless" 25) 26 27type memberCall struct { 28 host string 29 subject string 30} 31 32type collabCall struct { 33 repoDid string 34 subject string 35} 36 37type recordingAcl struct { 38 memberAdd []memberCall 39 memberRemove []memberCall 40 collabAdd []collabCall 41 collabRemove []collabCall 42 membersInvalid []string 43 collabsInvalid []collabCall 44} 45 46func (r *recordingAcl) AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 47 r.memberAdd = append(r.memberAdd, memberCall{host, subject.String()}) 48 return nil 49} 50 51func (r *recordingAcl) RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 52 r.memberRemove = append(r.memberRemove, memberCall{host, subject.String()}) 53 return nil 54} 55 56func (r *recordingAcl) AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 57 r.collabAdd = append(r.collabAdd, collabCall{repoDid.String(), subject.String()}) 58 return nil 59} 60 61func (r *recordingAcl) RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 62 r.collabRemove = append(r.collabRemove, collabCall{repoDid.String(), subject.String()}) 63 return nil 64} 65 66func (r *recordingAcl) InvalidateMembers(host string) { 67 r.membersInvalid = append(r.membersInvalid, host) 68} 69 70func (r *recordingAcl) InvalidateCollaborators(host, repoDid string) { 71 r.collabsInvalid = append(r.collabsInvalid, collabCall{repoDid, host}) 72} 73 74type flakyAcl struct { 75 failsLeft int 76 calls int 77 membersInvalid int 78 collabsInvalid int 79} 80 81func (a *flakyAcl) try() error { 82 a.calls++ 83 if a.failsLeft > 0 { 84 a.failsLeft-- 85 return errors.New("transient store error") 86 } 87 return nil 88} 89 90func (a *flakyAcl) AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 91 return a.try() 92} 93func (a *flakyAcl) RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 94 return a.try() 95} 96func (a *flakyAcl) AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 97 return a.try() 98} 99func (a *flakyAcl) RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 100 return a.try() 101} 102func (a *flakyAcl) InvalidateMembers(host string) { a.membersInvalid++ } 103func (a *flakyAcl) InvalidateCollaborators(host, repo string) { a.collabsInvalid++ } 104 105func aclTestDB(t *testing.T) *db.DB { 106 t.Helper() 107 d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "appview.db")) 108 if err != nil { 109 t.Fatalf("db.Make: %v", err) 110 } 111 t.Cleanup(func() { d.Close() }) 112 return d 113} 114 115func seedAclRepo(t *testing.T, d *db.DB) { 116 t.Helper() 117 tx, err := d.Begin() 118 if err != nil { 119 t.Fatalf("begin: %v", err) 120 } 121 if err := db.AddRepo(tx, &models.Repo{ 122 Did: aclTestOwner, 123 Knot: aclTestHost, 124 RepoDid: aclTestRepoDid, 125 Name: "anemone", 126 }); err != nil { 127 t.Fatalf("AddRepo: %v", err) 128 } 129 if err := tx.Commit(); err != nil { 130 t.Fatalf("commit: %v", err) 131 } 132} 133 134func memberEvent(t *testing.T, op knotdb.AclOp, subject string) eventstream.Event { 135 t.Helper() 136 payload, err := json.Marshal(knotdb.KnotMemberUpdate{Op: op, Subject: subject}) 137 if err != nil { 138 t.Fatalf("marshal memberUpdate: %v", err) 139 } 140 return eventstream.Event{Rkey: "evt", Nsid: knotdb.KnotMemberUpdateNSID, EventJson: payload} 141} 142 143func collabEvent(t *testing.T, op knotdb.AclOp, subject, repoDid string) eventstream.Event { 144 t.Helper() 145 payload, err := json.Marshal(knotdb.RepoCollaboratorUpdate{Op: op, Subject: subject, Repo: repoDid}) 146 if err != nil { 147 t.Fatalf("marshal collaboratorUpdate: %v", err) 148 } 149 return eventstream.Event{Rkey: "evt", Nsid: knotdb.RepoCollaboratorUpdateNSID, EventJson: payload} 150} 151 152func TestIngestKnotMemberUpdate_DispatchesAddThenRemove(t *testing.T) { 153 acl := &recordingAcl{} 154 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 155 156 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil { 157 t.Fatalf("add: %v", err) 158 } 159 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpRemove, aclTestSubject)); err != nil { 160 t.Fatalf("remove: %v", err) 161 } 162 163 if len(acl.memberAdd) != 1 || acl.memberAdd[0] != (memberCall{aclTestHost, aclTestSubject}) { 164 t.Errorf("memberAdd = %v, want one add scoped to the source host", acl.memberAdd) 165 } 166 if len(acl.memberRemove) != 1 || acl.memberRemove[0] != (memberCall{aclTestHost, aclTestSubject}) { 167 t.Errorf("memberRemove = %v, want one remove scoped to the source host", acl.memberRemove) 168 } 169} 170 171func TestIngestKnotMemberUpdate_UnknownOpErrors(t *testing.T) { 172 acl := &recordingAcl{} 173 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 174 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOp("bogus"), aclTestSubject)); err == nil { 175 t.Fatal("an unknown op must be rejected") 176 } 177 if len(acl.memberAdd)+len(acl.memberRemove) != 0 { 178 t.Errorf("an unknown op must not reach the roster: %+v", acl) 179 } 180} 181 182func TestIngestKnotMemberUpdate_BadSubjectErrors(t *testing.T) { 183 acl := &recordingAcl{} 184 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 185 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, "not-a-did")); err == nil { 186 t.Fatal("a malformed subject DID must be rejected") 187 } 188 if len(acl.memberAdd) != 0 { 189 t.Errorf("a malformed subject must not reach the roster: %v", acl.memberAdd) 190 } 191} 192 193func TestIngestCollaboratorUpdate_DispatchesAddThenRemove(t *testing.T) { 194 ctx := context.Background() 195 d := aclTestDB(t) 196 seedAclRepo(t, d) 197 acl := &recordingAcl{} 198 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 199 200 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil { 201 t.Fatalf("add: %v", err) 202 } 203 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpRemove, aclTestSubject, aclTestRepoDid)); err != nil { 204 t.Fatalf("remove: %v", err) 205 } 206 207 if len(acl.collabAdd) != 1 || acl.collabAdd[0] != (collabCall{aclTestRepoDid, aclTestSubject}) { 208 t.Errorf("collabAdd = %v, want one add for the repo", acl.collabAdd) 209 } 210 if len(acl.collabRemove) != 1 || acl.collabRemove[0] != (collabCall{aclTestRepoDid, aclTestSubject}) { 211 t.Errorf("collabRemove = %v, want one remove for the repo", acl.collabRemove) 212 } 213} 214 215func TestIngestCollaboratorUpdate_UnindexedRepoSkips(t *testing.T) { 216 ctx := context.Background() 217 d := aclTestDB(t) 218 acl := &recordingAcl{} 219 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 220 221 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil { 222 t.Fatalf("add for unindexed repo must not error, got: %v", err) 223 } 224 if len(acl.collabAdd) != 0 { 225 t.Errorf("an add for an unindexed repo must not reach the roster: %v", acl.collabAdd) 226 } 227} 228 229func TestIngestCollaboratorUpdate_ForeignKnotDropped(t *testing.T) { 230 ctx := context.Background() 231 d := aclTestDB(t) 232 seedAclRepo(t, d) 233 acl := &recordingAcl{} 234 source := ec.Source{Kind: ec.KindKnot, Host: "barnacle.nel.pet"} 235 236 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil { 237 t.Fatalf("a foreign-knot collaboratorUpdate must be dropped, not error: %v", err) 238 } 239 if len(acl.collabAdd) != 0 { 240 t.Errorf("a knot that does not host the repo must not mutate its collaborators: %v", acl.collabAdd) 241 } 242 243 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpRemove, aclTestSubject, aclTestRepoDid)); err != nil { 244 t.Fatalf("a foreign-knot remove must be dropped, not error: %v", err) 245 } 246 if len(acl.collabRemove) != 0 { 247 t.Errorf("a knot that does not host the repo must not remove its collaborators: %v", acl.collabRemove) 248 } 249} 250 251func TestIngestCollaboratorUpdate_BadDidErrors(t *testing.T) { 252 ctx := context.Background() 253 d := aclTestDB(t) 254 acl := &recordingAcl{} 255 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 256 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, "not-a-did", aclTestRepoDid)); err == nil { 257 t.Fatal("a malformed subject DID must be rejected") 258 } 259 if len(acl.collabAdd) != 0 { 260 t.Errorf("a malformed subject must not reach the roster: %v", acl.collabAdd) 261 } 262} 263 264func TestIngestKnotMemberUpdate_RetriesTransientThenSucceeds(t *testing.T) { 265 acl := &flakyAcl{failsLeft: aclIngestAttempts - 1} 266 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 267 268 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil { 269 t.Fatalf("a transient store error within the retry budget must recover, got: %v", err) 270 } 271 if acl.calls != aclIngestAttempts { 272 t.Errorf("calls = %d, want %d; the write must retry until it lands", acl.calls, aclIngestAttempts) 273 } 274} 275 276func TestIngestKnotMemberUpdate_GivesUpAfterAttempts(t *testing.T) { 277 acl := &flakyAcl{failsLeft: aclIngestAttempts + 5} 278 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 279 280 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err == nil { 281 t.Fatal("a persistent store error must surface so the failure is logged") 282 } 283 if acl.calls != aclIngestAttempts { 284 t.Errorf("calls = %d, want %d; the retry must be bounded", acl.calls, aclIngestAttempts) 285 } 286 if acl.membersInvalid != 1 { 287 t.Errorf("membersInvalid = %d, want 1; a dropped delta must invalidate the scope so the next read reconciles instead of waiting out the TTL", acl.membersInvalid) 288 } 289} 290 291func TestIngestCollaboratorUpdate_InvalidatesScopeOnGiveUp(t *testing.T) { 292 ctx := context.Background() 293 d := aclTestDB(t) 294 seedAclRepo(t, d) 295 acl := &flakyAcl{failsLeft: aclIngestAttempts + 5} 296 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 297 298 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err == nil { 299 t.Fatal("a persistent store error must surface so the failure is logged") 300 } 301 if acl.collabsInvalid != 1 { 302 t.Errorf("collabsInvalid = %d, want 1; a dropped delta must invalidate the scope", acl.collabsInvalid) 303 } 304} 305 306func TestIngestKnotMemberUpdate_NoInvalidateOnSuccess(t *testing.T) { 307 acl := &flakyAcl{failsLeft: aclIngestAttempts - 1} 308 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 309 310 if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil { 311 t.Fatalf("a recoverable delta must not error: %v", err) 312 } 313 if acl.membersInvalid != 0 { 314 t.Errorf("membersInvalid = %d, want 0; a delta that lands must not force a reconcile", acl.membersInvalid) 315 } 316} 317 318func TestIngestCollaboratorUpdate_StoreErrorPropagates(t *testing.T) { 319 ctx := context.Background() 320 d := aclTestDB(t) 321 if err := d.Close(); err != nil { 322 t.Fatalf("close: %v", err) 323 } 324 acl := &recordingAcl{} 325 source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 326 327 if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err == nil { 328 t.Fatal("a store error on the repo lookup must surface, not be swallowed as an unindexed-repo skip") 329 } 330 if len(acl.collabAdd) != 0 { 331 t.Errorf("a failed repo lookup must not reach the roster: %v", acl.collabAdd) 332 } 333}