Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview/state: ingest knot ACL events into roster

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 16, 2026, 9:04 PM +0300) commit b3ee1d8d parent 929c6330 change-id wunwsysq
+457 -5
+1 -1
appview/db/vouch.go
··· 343 343 } 344 344 345 345 // priority: 346 - // 1. collaborator invites sent 346 + // 1. collaborator invites sent - NOTE with knot-owned events not mentioning *who* is doing the adding of collab, we can't know who to suggest a vouch to. 347 347 // 2. knot member invites sent 348 348 // 3. PR authors on FOO's repositories 349 349 // 4. issue authors on FOO's repositories
+122 -3
appview/state/knotstream.go
··· 16 16 "tangled.org/core/api/tangled" 17 17 "tangled.org/core/appview/config" 18 18 "tangled.org/core/appview/db" 19 + "tangled.org/core/appview/knotacl" 19 20 "tangled.org/core/appview/knotcompat" 20 21 "tangled.org/core/appview/models" 21 22 "tangled.org/core/appview/sites" ··· 33 34 "github.com/posthog/posthog-go" 34 35 ) 35 36 36 - func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 37 + type aclRoster interface { 38 + AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error 39 + RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error 40 + AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error 41 + RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error 42 + InvalidateMembers(host string) 43 + InvalidateCollaborators(host, repoDid string) 44 + } 45 + 46 + func Knotstream(ctx context.Context, c *config.Config, d *db.DB, acl *knotacl.Service, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 37 47 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null")) 38 48 if err != nil { 39 49 return nil, err ··· 47 57 return bootstrapStream( 48 58 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr, 49 59 c.Knotstream, c.Core.Dev, 50 - knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 60 + knotIngester(d, acl, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 51 61 ), nil 52 62 } 53 63 ··· 65 75 return &repos[0], nil 66 76 } 67 77 68 - func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 78 + func knotIngester(d *db.DB, acl aclRoster, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 69 79 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 70 80 switch msg.Nsid { 71 81 case tangled.GitRefUpdateNSID: ··· 74 84 return ingestPipeline(d, source, msg) 75 85 case knotdb.RepoDIDAssignNSID: 76 86 return ingestDIDAssign(d, enforcer, source, msg, ctx) 87 + case knotdb.KnotMemberUpdateNSID: 88 + return ingestKnotMemberUpdate(acl, source, msg) 89 + case knotdb.RepoCollaboratorUpdateNSID: 90 + return ingestCollaboratorUpdate(ctx, d, acl, source, msg) 77 91 } 78 92 79 93 return nil 80 94 } 95 + } 96 + 97 + const ( 98 + aclIngestAttempts = 3 99 + aclIngestBackoff = 50 * time.Millisecond 100 + ) 101 + 102 + func withAclRetry(attempts int, backoff time.Duration, op func() error) error { 103 + if err := op(); err == nil || attempts <= 1 { 104 + return err 105 + } 106 + time.Sleep(backoff) 107 + return withAclRetry(attempts-1, backoff, op) 108 + } 109 + 110 + func ingestKnotMemberUpdate(acl aclRoster, source ec.Source, msg eventstream.Event) error { 111 + var rec knotdb.KnotMemberUpdate 112 + if err := json.Unmarshal(msg.EventJson, &rec); err != nil { 113 + return fmt.Errorf("unmarshal memberUpdate: %w", err) 114 + } 115 + 116 + subject, err := syntax.ParseDID(rec.Subject) 117 + if err != nil { 118 + return fmt.Errorf("memberUpdate bad subject %q: %w", rec.Subject, err) 119 + } 120 + 121 + cursor := knotacl.Cursor(msg.Created) 122 + switch rec.Op { 123 + case knotdb.AclOpAdd: 124 + err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 125 + return acl.AddKnotMember(source.Host, subject, cursor) 126 + }) 127 + case knotdb.AclOpRemove: 128 + err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 129 + return acl.RemoveKnotMember(source.Host, subject, cursor) 130 + }) 131 + default: 132 + return fmt.Errorf("memberUpdate unknown op %q", rec.Op) 133 + } 134 + 135 + if err != nil { 136 + acl.InvalidateMembers(source.Host) 137 + } 138 + return err 139 + } 140 + 141 + func ingestCollaboratorUpdate(ctx context.Context, d *db.DB, acl aclRoster, source ec.Source, msg eventstream.Event) error { 142 + var rec knotdb.RepoCollaboratorUpdate 143 + if err := json.Unmarshal(msg.EventJson, &rec); err != nil { 144 + return fmt.Errorf("unmarshal collaboratorUpdate: %w", err) 145 + } 146 + 147 + subject, err := syntax.ParseDID(rec.Subject) 148 + if err != nil { 149 + return fmt.Errorf("collaboratorUpdate bad subject %q: %w", rec.Subject, err) 150 + } 151 + repoDid, err := syntax.ParseDID(rec.Repo) 152 + if err != nil { 153 + return fmt.Errorf("collaboratorUpdate bad repo %q: %w", rec.Repo, err) 154 + } 155 + 156 + cursor := knotacl.Cursor(msg.Created) 157 + switch rec.Op { 158 + case knotdb.AclOpAdd: 159 + err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 160 + owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject) 161 + if err != nil || !owned { 162 + return err 163 + } 164 + return acl.AddCollaborator(repoDid, subject, cursor) 165 + }) 166 + case knotdb.AclOpRemove: 167 + err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 168 + owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject) 169 + if err != nil || !owned { 170 + return err 171 + } 172 + return acl.RemoveCollaborator(repoDid, subject, cursor) 173 + }) 174 + default: 175 + return fmt.Errorf("collaboratorUpdate unknown op %q", rec.Op) 176 + } 177 + 178 + if err != nil { 179 + acl.InvalidateCollaborators(source.Host, repoDid.String()) 180 + } 181 + return err 182 + } 183 + 184 + func repoOwnedBySource(ctx context.Context, d *db.DB, source ec.Source, repoDid, subject syntax.DID) (bool, error) { 185 + repo, err := db.GetRepoByDid(d, repoDid.String()) 186 + if errors.Is(err, sql.ErrNoRows) { 187 + log.FromContext(ctx).Warn("collaboratorUpdate for unindexed repo, skipping until reconcile", 188 + "repo_did", repoDid, "subject", subject) 189 + return false, nil 190 + } 191 + if err != nil { 192 + return false, err 193 + } 194 + if repo.Knot != source.Host { 195 + log.FromContext(ctx).Warn("collaboratorUpdate for a repo this knot does not host, dropping", 196 + "repo_did", repoDid, "subject", subject, "claimed_by", source.Host, "owner", repo.Knot) 197 + return false, nil 198 + } 199 + return true, nil 81 200 } 82 201 83 202 // TODO(boltless): remove this. knotmirror should do all sort of indexing
+333
appview/state/knotstream_test.go
··· 1 + package state 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "path/filepath" 8 + "testing" 9 + 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + 12 + "tangled.org/core/appview/db" 13 + "tangled.org/core/appview/knotacl" 14 + "tangled.org/core/appview/models" 15 + ec "tangled.org/core/eventconsumer" 16 + "tangled.org/core/eventstream" 17 + knotdb "tangled.org/core/knotserver/db" 18 + ) 19 + 20 + const ( 21 + aclTestHost = "knot.nel.pet" 22 + aclTestRepoDid = "did:plc:limpet" 23 + aclTestOwner = "did:plc:akshay" 24 + aclTestSubject = "did:plc:boltless" 25 + ) 26 + 27 + type memberCall struct { 28 + host string 29 + subject string 30 + } 31 + 32 + type collabCall struct { 33 + repoDid string 34 + subject string 35 + } 36 + 37 + type recordingAcl struct { 38 + memberAdd []memberCall 39 + memberRemove []memberCall 40 + collabAdd []collabCall 41 + collabRemove []collabCall 42 + membersInvalid []string 43 + collabsInvalid []collabCall 44 + } 45 + 46 + func (r *recordingAcl) AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 47 + r.memberAdd = append(r.memberAdd, memberCall{host, subject.String()}) 48 + return nil 49 + } 50 + 51 + func (r *recordingAcl) RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 52 + r.memberRemove = append(r.memberRemove, memberCall{host, subject.String()}) 53 + return nil 54 + } 55 + 56 + func (r *recordingAcl) AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 57 + r.collabAdd = append(r.collabAdd, collabCall{repoDid.String(), subject.String()}) 58 + return nil 59 + } 60 + 61 + func (r *recordingAcl) RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 62 + r.collabRemove = append(r.collabRemove, collabCall{repoDid.String(), subject.String()}) 63 + return nil 64 + } 65 + 66 + func (r *recordingAcl) InvalidateMembers(host string) { 67 + r.membersInvalid = append(r.membersInvalid, host) 68 + } 69 + 70 + func (r *recordingAcl) InvalidateCollaborators(host, repoDid string) { 71 + r.collabsInvalid = append(r.collabsInvalid, collabCall{repoDid, host}) 72 + } 73 + 74 + type flakyAcl struct { 75 + failsLeft int 76 + calls int 77 + membersInvalid int 78 + collabsInvalid int 79 + } 80 + 81 + func (a *flakyAcl) try() error { 82 + a.calls++ 83 + if a.failsLeft > 0 { 84 + a.failsLeft-- 85 + return errors.New("transient store error") 86 + } 87 + return nil 88 + } 89 + 90 + func (a *flakyAcl) AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 91 + return a.try() 92 + } 93 + func (a *flakyAcl) RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error { 94 + return a.try() 95 + } 96 + func (a *flakyAcl) AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 97 + return a.try() 98 + } 99 + func (a *flakyAcl) RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error { 100 + return a.try() 101 + } 102 + func (a *flakyAcl) InvalidateMembers(host string) { a.membersInvalid++ } 103 + func (a *flakyAcl) InvalidateCollaborators(host, repo string) { a.collabsInvalid++ } 104 + 105 + func aclTestDB(t *testing.T) *db.DB { 106 + t.Helper() 107 + d, err := db.Make(context.Background(), filepath.Join(t.TempDir(), "appview.db")) 108 + if err != nil { 109 + t.Fatalf("db.Make: %v", err) 110 + } 111 + t.Cleanup(func() { d.Close() }) 112 + return d 113 + } 114 + 115 + func seedAclRepo(t *testing.T, d *db.DB) { 116 + t.Helper() 117 + tx, err := d.Begin() 118 + if err != nil { 119 + t.Fatalf("begin: %v", err) 120 + } 121 + if err := db.AddRepo(tx, &models.Repo{ 122 + Did: aclTestOwner, 123 + Knot: aclTestHost, 124 + RepoDid: aclTestRepoDid, 125 + Name: "anemone", 126 + }); err != nil { 127 + t.Fatalf("AddRepo: %v", err) 128 + } 129 + if err := tx.Commit(); err != nil { 130 + t.Fatalf("commit: %v", err) 131 + } 132 + } 133 + 134 + func memberEvent(t *testing.T, op knotdb.AclOp, subject string) eventstream.Event { 135 + t.Helper() 136 + payload, err := json.Marshal(knotdb.KnotMemberUpdate{Op: op, Subject: subject}) 137 + if err != nil { 138 + t.Fatalf("marshal memberUpdate: %v", err) 139 + } 140 + return eventstream.Event{Rkey: "evt", Nsid: knotdb.KnotMemberUpdateNSID, EventJson: payload} 141 + } 142 + 143 + func collabEvent(t *testing.T, op knotdb.AclOp, subject, repoDid string) eventstream.Event { 144 + t.Helper() 145 + payload, err := json.Marshal(knotdb.RepoCollaboratorUpdate{Op: op, Subject: subject, Repo: repoDid}) 146 + if err != nil { 147 + t.Fatalf("marshal collaboratorUpdate: %v", err) 148 + } 149 + return eventstream.Event{Rkey: "evt", Nsid: knotdb.RepoCollaboratorUpdateNSID, EventJson: payload} 150 + } 151 + 152 + func TestIngestKnotMemberUpdate_DispatchesAddThenRemove(t *testing.T) { 153 + acl := &recordingAcl{} 154 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 155 + 156 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil { 157 + t.Fatalf("add: %v", err) 158 + } 159 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpRemove, aclTestSubject)); err != nil { 160 + t.Fatalf("remove: %v", err) 161 + } 162 + 163 + if len(acl.memberAdd) != 1 || acl.memberAdd[0] != (memberCall{aclTestHost, aclTestSubject}) { 164 + t.Errorf("memberAdd = %v, want one add scoped to the source host", acl.memberAdd) 165 + } 166 + if len(acl.memberRemove) != 1 || acl.memberRemove[0] != (memberCall{aclTestHost, aclTestSubject}) { 167 + t.Errorf("memberRemove = %v, want one remove scoped to the source host", acl.memberRemove) 168 + } 169 + } 170 + 171 + func TestIngestKnotMemberUpdate_UnknownOpErrors(t *testing.T) { 172 + acl := &recordingAcl{} 173 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 174 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOp("bogus"), aclTestSubject)); err == nil { 175 + t.Fatal("an unknown op must be rejected") 176 + } 177 + if len(acl.memberAdd)+len(acl.memberRemove) != 0 { 178 + t.Errorf("an unknown op must not reach the roster: %+v", acl) 179 + } 180 + } 181 + 182 + func TestIngestKnotMemberUpdate_BadSubjectErrors(t *testing.T) { 183 + acl := &recordingAcl{} 184 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 185 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, "not-a-did")); err == nil { 186 + t.Fatal("a malformed subject DID must be rejected") 187 + } 188 + if len(acl.memberAdd) != 0 { 189 + t.Errorf("a malformed subject must not reach the roster: %v", acl.memberAdd) 190 + } 191 + } 192 + 193 + func TestIngestCollaboratorUpdate_DispatchesAddThenRemove(t *testing.T) { 194 + ctx := context.Background() 195 + d := aclTestDB(t) 196 + seedAclRepo(t, d) 197 + acl := &recordingAcl{} 198 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 199 + 200 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil { 201 + t.Fatalf("add: %v", err) 202 + } 203 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpRemove, aclTestSubject, aclTestRepoDid)); err != nil { 204 + t.Fatalf("remove: %v", err) 205 + } 206 + 207 + if len(acl.collabAdd) != 1 || acl.collabAdd[0] != (collabCall{aclTestRepoDid, aclTestSubject}) { 208 + t.Errorf("collabAdd = %v, want one add for the repo", acl.collabAdd) 209 + } 210 + if len(acl.collabRemove) != 1 || acl.collabRemove[0] != (collabCall{aclTestRepoDid, aclTestSubject}) { 211 + t.Errorf("collabRemove = %v, want one remove for the repo", acl.collabRemove) 212 + } 213 + } 214 + 215 + func TestIngestCollaboratorUpdate_UnindexedRepoSkips(t *testing.T) { 216 + ctx := context.Background() 217 + d := aclTestDB(t) 218 + acl := &recordingAcl{} 219 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 220 + 221 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil { 222 + t.Fatalf("add for unindexed repo must not error, got: %v", err) 223 + } 224 + if len(acl.collabAdd) != 0 { 225 + t.Errorf("an add for an unindexed repo must not reach the roster: %v", acl.collabAdd) 226 + } 227 + } 228 + 229 + func TestIngestCollaboratorUpdate_ForeignKnotDropped(t *testing.T) { 230 + ctx := context.Background() 231 + d := aclTestDB(t) 232 + seedAclRepo(t, d) 233 + acl := &recordingAcl{} 234 + source := ec.Source{Kind: ec.KindKnot, Host: "barnacle.nel.pet"} 235 + 236 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err != nil { 237 + t.Fatalf("a foreign-knot collaboratorUpdate must be dropped, not error: %v", err) 238 + } 239 + if len(acl.collabAdd) != 0 { 240 + t.Errorf("a knot that does not host the repo must not mutate its collaborators: %v", acl.collabAdd) 241 + } 242 + 243 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpRemove, aclTestSubject, aclTestRepoDid)); err != nil { 244 + t.Fatalf("a foreign-knot remove must be dropped, not error: %v", err) 245 + } 246 + if len(acl.collabRemove) != 0 { 247 + t.Errorf("a knot that does not host the repo must not remove its collaborators: %v", acl.collabRemove) 248 + } 249 + } 250 + 251 + func TestIngestCollaboratorUpdate_BadDidErrors(t *testing.T) { 252 + ctx := context.Background() 253 + d := aclTestDB(t) 254 + acl := &recordingAcl{} 255 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 256 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, "not-a-did", aclTestRepoDid)); err == nil { 257 + t.Fatal("a malformed subject DID must be rejected") 258 + } 259 + if len(acl.collabAdd) != 0 { 260 + t.Errorf("a malformed subject must not reach the roster: %v", acl.collabAdd) 261 + } 262 + } 263 + 264 + func TestIngestKnotMemberUpdate_RetriesTransientThenSucceeds(t *testing.T) { 265 + acl := &flakyAcl{failsLeft: aclIngestAttempts - 1} 266 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 267 + 268 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil { 269 + t.Fatalf("a transient store error within the retry budget must recover, got: %v", err) 270 + } 271 + if acl.calls != aclIngestAttempts { 272 + t.Errorf("calls = %d, want %d; the write must retry until it lands", acl.calls, aclIngestAttempts) 273 + } 274 + } 275 + 276 + func TestIngestKnotMemberUpdate_GivesUpAfterAttempts(t *testing.T) { 277 + acl := &flakyAcl{failsLeft: aclIngestAttempts + 5} 278 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 279 + 280 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err == nil { 281 + t.Fatal("a persistent store error must surface so the failure is logged") 282 + } 283 + if acl.calls != aclIngestAttempts { 284 + t.Errorf("calls = %d, want %d; the retry must be bounded", acl.calls, aclIngestAttempts) 285 + } 286 + if acl.membersInvalid != 1 { 287 + t.Errorf("membersInvalid = %d, want 1; a dropped delta must invalidate the scope so the next read reconciles instead of waiting out the TTL", acl.membersInvalid) 288 + } 289 + } 290 + 291 + func TestIngestCollaboratorUpdate_InvalidatesScopeOnGiveUp(t *testing.T) { 292 + ctx := context.Background() 293 + d := aclTestDB(t) 294 + seedAclRepo(t, d) 295 + acl := &flakyAcl{failsLeft: aclIngestAttempts + 5} 296 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 297 + 298 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err == nil { 299 + t.Fatal("a persistent store error must surface so the failure is logged") 300 + } 301 + if acl.collabsInvalid != 1 { 302 + t.Errorf("collabsInvalid = %d, want 1; a dropped delta must invalidate the scope", acl.collabsInvalid) 303 + } 304 + } 305 + 306 + func TestIngestKnotMemberUpdate_NoInvalidateOnSuccess(t *testing.T) { 307 + acl := &flakyAcl{failsLeft: aclIngestAttempts - 1} 308 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 309 + 310 + if err := ingestKnotMemberUpdate(acl, source, memberEvent(t, knotdb.AclOpAdd, aclTestSubject)); err != nil { 311 + t.Fatalf("a recoverable delta must not error: %v", err) 312 + } 313 + if acl.membersInvalid != 0 { 314 + t.Errorf("membersInvalid = %d, want 0; a delta that lands must not force a reconcile", acl.membersInvalid) 315 + } 316 + } 317 + 318 + func TestIngestCollaboratorUpdate_StoreErrorPropagates(t *testing.T) { 319 + ctx := context.Background() 320 + d := aclTestDB(t) 321 + if err := d.Close(); err != nil { 322 + t.Fatalf("close: %v", err) 323 + } 324 + acl := &recordingAcl{} 325 + source := ec.Source{Kind: ec.KindKnot, Host: aclTestHost} 326 + 327 + if err := ingestCollaboratorUpdate(ctx, d, acl, source, collabEvent(t, knotdb.AclOpAdd, aclTestSubject, aclTestRepoDid)); err == nil { 328 + t.Fatal("a store error on the repo lookup must surface, not be swallowed as an unindexed-repo skip") 329 + } 330 + if len(acl.collabAdd) != 0 { 331 + t.Errorf("a failed repo lookup must not reach the roster: %v", acl.collabAdd) 332 + } 333 + }
+1 -1
appview/state/state.go
··· 215 215 } 216 216 } 217 217 218 - knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 218 + knotstream, err := Knotstream(ctx, config, d, aclService, enforcer, posthog, notifier, cfClient) 219 219 if err != nil { 220 220 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 221 221 }