Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver,spindle: member acl via firehose ingester

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 27, 2026, 10:34 AM +0300) commit d6564738 parent 38475e4b change-id rmxpplov
+351 -200
+10 -46
appview/knots/knots.go
··· 551 551 return 552 552 } 553 553 554 - if err = k.Enforcer.AddKnotMember(domain, memberId.DID.String()); err != nil { 555 - l.Error("failed to add member to ACLs", "err", err) 556 - fail() 557 - return 558 - } 559 - committed := false 560 - defer func() { 561 - if committed { 562 - return 563 - } 564 - k.Enforcer.E.LoadPolicy() 565 - }() 566 - 567 554 rkey := tid.TID() 568 555 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 569 556 Collection: tangled.KnotMemberNSID, ··· 582 569 k.Pages.Notice(w, noticeId, "Failed to add record to PDS, try again later.") 583 570 return 584 571 } 585 - 586 - if err = k.Enforcer.E.SavePolicy(); err != nil { 587 - l.Error("failed to save ACL policy", "err", err) 588 - fail() 589 - return 590 - } 591 - committed = true 592 572 593 573 k.Pages.HxRedirect(w, fmt.Sprintf("/settings/knots/%s", domain)) 594 574 } ··· 655 635 l.Warn("failed to look up member rkey", "err", err) 656 636 } 657 637 658 - if err = k.Enforcer.RemoveKnotMember(domain, memberId.DID.String()); err != nil { 659 - l.Error("failed to update ACLs", "err", err) 638 + if rkey == "" { 639 + l.Error("no member record found to remove") 660 640 fail() 661 641 return 662 642 } 663 - committed := false 664 - defer func() { 665 - if committed { 666 - return 667 - } 668 - k.Enforcer.E.LoadPolicy() 669 - }() 670 643 671 - if rkey != "" { 672 - _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 673 - Collection: tangled.KnotMemberNSID, 674 - Repo: user.Did, 675 - Rkey: rkey, 676 - }) 677 - if err != nil { 678 - l.Error("failed to delete record from PDS", "err", err) 679 - k.Pages.Notice(w, noticeId, "Failed to delete record from PDS, try again later.") 680 - return 681 - } 682 - } 683 - 684 - if err = k.Enforcer.E.SavePolicy(); err != nil { 685 - l.Error("failed to save ACLs", "err", err) 686 - fail() 644 + _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 645 + Collection: tangled.KnotMemberNSID, 646 + Repo: user.Did, 647 + Rkey: rkey, 648 + }) 649 + if err != nil { 650 + l.Error("failed to delete record from PDS", "err", err) 651 + k.Pages.Notice(w, noticeId, "Failed to delete record from PDS, try again later.") 687 652 return 688 653 } 689 - committed = true 690 654 691 655 k.Pages.HxRefresh(w) 692 656 }
-87
appview/spindles/spindles.go
··· 489 489 return 490 490 } 491 491 492 - tx, err := s.Db.Begin() 493 - if err != nil { 494 - l.Error("failed to start txn", "err", err) 495 - fail() 496 - return 497 - } 498 - defer func() { 499 - tx.Rollback() 500 - s.Enforcer.E.LoadPolicy() 501 - }() 502 - 503 492 rkey := tid.TID() 504 493 505 - // add member to db 506 - if err = db.AddSpindleMember(tx, models.SpindleMember{ 507 - Did: syntax.DID(user.Did), 508 - Rkey: rkey, 509 - Instance: instance, 510 - Subject: memberId.DID, 511 - }); err != nil { 512 - l.Error("failed to add spindle member", "err", err) 513 - fail() 514 - return 515 - } 516 - 517 - if err = s.Enforcer.AddSpindleMember(instance, memberId.DID.String()); err != nil { 518 - l.Error("failed to add member to ACLs") 519 - fail() 520 - return 521 - } 522 - 523 494 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 524 495 Collection: tangled.SpindleMemberNSID, 525 496 Repo: user.Did, ··· 535 506 if err != nil { 536 507 l.Error("failed to add record to PDS", "err", err) 537 508 s.Pages.Notice(w, noticeId, "Failed to add record to PDS, try again later.") 538 - return 539 - } 540 - 541 - if err = tx.Commit(); err != nil { 542 - l.Error("failed to commit txn", "err", err) 543 - fail() 544 - return 545 - } 546 - 547 - if err = s.Enforcer.E.SavePolicy(); err != nil { 548 - l.Error("failed to add member to ACLs", "err", err) 549 - fail() 550 509 return 551 510 } 552 511 ··· 607 566 return 608 567 } 609 568 610 - tx, err := s.Db.Begin() 611 - if err != nil { 612 - l.Error("failed to start txn", "err", err) 613 - fail() 614 - return 615 - } 616 - defer func() { 617 - tx.Rollback() 618 - s.Enforcer.E.LoadPolicy() 619 - }() 620 - 621 - // get the record from the DB first: 622 569 members, err := db.GetSpindleMembers( 623 570 s.Db, 624 571 orm.FilterEq("did", user.Did), ··· 631 578 return 632 579 } 633 580 634 - // remove from db 635 - if err = db.RemoveSpindleMember( 636 - tx, 637 - orm.FilterEq("did", user.Did), 638 - orm.FilterEq("instance", instance), 639 - orm.FilterEq("subject", memberId.DID), 640 - ); err != nil { 641 - l.Error("failed to remove spindle member", "err", err) 642 - fail() 643 - return 644 - } 645 - 646 - // remove from enforcer 647 - if err = s.Enforcer.RemoveSpindleMember(instance, memberId.DID.String()); err != nil { 648 - l.Error("failed to update ACLs", "err", err) 649 - fail() 650 - return 651 - } 652 - 653 581 client, err := s.OAuth.AuthorizedClient(r) 654 582 if err != nil { 655 583 l.Error("failed to authorize client", "err", err) ··· 657 585 return 658 586 } 659 587 660 - // remove from pds 661 588 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 662 589 Collection: tangled.SpindleMemberNSID, 663 590 Repo: user.Did, 664 591 Rkey: members[0].Rkey, 665 592 }) 666 593 if err != nil { 667 - // non-fatal 668 594 l.Error("failed to delete record", "err", err) 669 - } 670 - 671 - // commit everything 672 - if err = tx.Commit(); err != nil { 673 - l.Error("failed to commit txn", "err", err) 674 595 fail() 675 596 return 676 597 } 677 598 678 - // commit everything 679 - if err = s.Enforcer.E.SavePolicy(); err != nil { 680 - l.Error("failed to save ACLs", "err", err) 681 - fail() 682 - return 683 - } 684 - 685 - // ok 686 599 s.Pages.HxRefresh(w) 687 600 }
+174 -29
knotserver/ingester.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 7 + "errors" 6 8 "fmt" 7 9 "io" 8 10 "net/http" ··· 49 51 } 50 52 51 53 func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 52 - l := log.FromContext(ctx) 53 - raw := json.RawMessage(event.Commit.Record) 54 54 did := event.Did 55 + rkey := event.Commit.RKey 56 + l := log.FromContext(ctx).With("handler", "processKnotMember", "did", did, "rkey", rkey) 57 + 58 + switch event.Commit.Operation { 59 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 60 + raw := json.RawMessage(event.Commit.Record) 61 + var record tangled.KnotMember 62 + if err := json.Unmarshal(raw, &record); err != nil { 63 + return fmt.Errorf("failed to unmarshal record: %w", err) 64 + } 55 65 56 - var record tangled.KnotMember 57 - if err := json.Unmarshal(raw, &record); err != nil { 58 - return fmt.Errorf("failed to unmarshal record: %w", err) 59 - } 66 + if record.Domain != h.c.Server.Hostname { 67 + return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 68 + } 69 + 70 + subject, err := syntax.ParseDID(record.Subject) 71 + if err != nil { 72 + return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err) 73 + } 74 + 75 + ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 76 + if err != nil { 77 + return fmt.Errorf("failed to enforce permissions: %w", err) 78 + } 79 + if !ok { 80 + return fmt.Errorf("permission denied for %s", did) 81 + } 82 + 83 + sqlTx, err := h.db.BeginTx(ctx, nil) 84 + if err != nil { 85 + return fmt.Errorf("failed to start txn: %w", err) 86 + } 87 + committed := false 88 + defer func() { 89 + if !committed { 90 + sqlTx.Rollback() 91 + } 92 + }() 93 + 94 + existing, err := db.GetKnotMember(sqlTx, did, rkey) 95 + if err != nil && !errors.Is(err, sql.ErrNoRows) { 96 + return fmt.Errorf("failed to look up existing member: %w", err) 97 + } 98 + 99 + var staleSubject string 100 + if existing != nil && existing.Subject != subject { 101 + staleSubject = existing.Subject.String() 102 + if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 103 + return fmt.Errorf("failed to remove stale member row: %w", err) 104 + } 105 + } 106 + 107 + if err := db.AddKnotMember(sqlTx, db.KnotMember{ 108 + Did: syntax.DID(did), 109 + Rkey: rkey, 110 + Subject: subject, 111 + }); err != nil { 112 + return fmt.Errorf("failed to persist member row: %w", err) 113 + } 114 + 115 + if err := db.AddDid(sqlTx, subject.String()); err != nil { 116 + return fmt.Errorf("failed to add did: %w", err) 117 + } 118 + 119 + dropStaleAcl := false 120 + var staleDidDropped bool 121 + if staleSubject != "" { 122 + remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 123 + if err != nil { 124 + return fmt.Errorf("failed to count stale subject rows: %w", err) 125 + } 126 + if remaining == 0 { 127 + dropStaleAcl = true 128 + stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 129 + if err != nil { 130 + return fmt.Errorf("failed to check residual policies for stale subject: %w", err) 131 + } 132 + if !stillNeeded { 133 + if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 134 + return fmt.Errorf("failed to remove stale did: %w", err) 135 + } 136 + staleDidDropped = true 137 + } 138 + } 139 + l.Info("replaced stale knot member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped) 140 + } 141 + 142 + if err := sqlTx.Commit(); err != nil { 143 + return fmt.Errorf("failed to commit txn: %w", err) 144 + } 145 + committed = true 146 + 147 + if dropStaleAcl { 148 + if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 149 + l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err) 150 + } 151 + } 152 + if _, err := h.e.TryAddKnotMember(rbac.ThisServer, subject.String()); err != nil { 153 + l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err) 154 + } 155 + 156 + if staleDidDropped { 157 + h.jc.RemoveDid(staleSubject) 158 + } 159 + h.jc.AddDid(subject.String()) 160 + l.Info("added member from firehose", "member", subject) 161 + 162 + if err := h.fetchAndAddKeys(ctx, subject.String()); err != nil { 163 + return fmt.Errorf("failed to fetch and add keys: %w", err) 164 + } 165 + return nil 166 + 167 + case jmodels.CommitOperationDelete: 168 + sqlTx, err := h.db.BeginTx(ctx, nil) 169 + if err != nil { 170 + return fmt.Errorf("failed to start txn: %w", err) 171 + } 172 + committed := false 173 + defer func() { 174 + if !committed { 175 + sqlTx.Rollback() 176 + } 177 + }() 178 + 179 + member, err := db.GetKnotMember(sqlTx, did, rkey) 180 + if errors.Is(err, sql.ErrNoRows) { 181 + l.Info("knot member already removed") 182 + return nil 183 + } 184 + if err != nil { 185 + return fmt.Errorf("failed to look up knot member: %w", err) 186 + } 60 187 61 - if record.Domain != h.c.Server.Hostname { 62 - l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 63 - return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 64 - } 188 + staleSubject := member.Subject.String() 189 + 190 + if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 191 + return fmt.Errorf("failed to remove member row: %w", err) 192 + } 193 + 194 + remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 195 + if err != nil { 196 + return fmt.Errorf("failed to count remaining member rows: %w", err) 197 + } 65 198 66 - ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 67 - if err != nil || !ok { 68 - l.Error("failed to add member", "did", did) 69 - return fmt.Errorf("failed to enforce permissions: %w", err) 70 - } 199 + dropAcl := false 200 + var staleDidDropped bool 201 + if remaining == 0 { 202 + dropAcl = true 203 + stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 204 + if err != nil { 205 + return fmt.Errorf("failed to check residual policies: %w", err) 206 + } 207 + if !stillNeeded { 208 + if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 209 + return fmt.Errorf("failed to remove did: %w", err) 210 + } 211 + staleDidDropped = true 212 + } 213 + } 71 214 72 - if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 73 - l.Error("failed to add member", "error", err) 74 - return fmt.Errorf("failed to add member: %w", err) 75 - } 76 - l.Info("added member from firehose", "member", record.Subject) 215 + if err := sqlTx.Commit(); err != nil { 216 + return fmt.Errorf("failed to commit txn: %w", err) 217 + } 218 + committed = true 77 219 78 - if err := h.db.AddDid(record.Subject); err != nil { 79 - l.Error("failed to add did", "error", err) 80 - return fmt.Errorf("failed to add did: %w", err) 81 - } 82 - h.jc.AddDid(record.Subject) 220 + if dropAcl { 221 + if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 222 + l.Error("post-commit: failed to remove ACL", "subject", staleSubject, "err", err) 223 + } 224 + } 83 225 84 - if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 85 - return fmt.Errorf("failed to fetch and add keys: %w", err) 226 + if staleDidDropped { 227 + h.jc.RemoveDid(staleSubject) 228 + } 229 + l.Info("removed knot member from firehose", "member", member.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped) 230 + return nil 86 231 } 87 232 88 233 return nil ··· 437 582 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 438 583 } 439 584 440 - if err := h.db.AddDid(subjectId.DID.String()); err != nil { 585 + if err := db.AddDid(h.db, subjectId.DID.String()); err != nil { 441 586 return err 442 587 } 443 588 h.jc.AddDid(subjectId.DID.String()) ··· 575 720 if err != nil { 576 721 args := []any{"kind", event.Kind, "err", err} 577 722 if event.Kind == jmodels.EventKindCommit { 578 - args = append(args, "nsid", event.Commit.Collection) 723 + args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 579 724 } 580 725 h.l.Warn("failed to process event, skipping", args...) 581 726 }
+1 -1
rbac/rbac.go
··· 43 43 return nil, err 44 44 } 45 45 46 - db, err := sql.Open("sqlite3", path+"?_foreign_keys=1") 46 + db, err := sql.Open("sqlite3", path+"?_foreign_keys=1&_journal_mode=WAL&_busy_timeout=5000") 47 47 if err != nil { 48 48 return nil, err 49 49 }
+31
rbac/util.go
··· 72 72 return len(gPolicies) > 0, nil 73 73 } 74 74 75 + func (e *Enforcer) wouldHaveAnyPolicyExcludingGrouping(user, role, domain string) (bool, error) { 76 + pPolicies, err := e.E.GetFilteredNamedPolicy("p", 0, user) 77 + if err != nil { 78 + return false, err 79 + } 80 + if len(pPolicies) > 0 { 81 + return true, nil 82 + } 83 + gPolicies, err := e.E.GetFilteredNamedGroupingPolicy("g", 0, user) 84 + if err != nil { 85 + return false, err 86 + } 87 + for _, gp := range gPolicies { 88 + if len(gp) < 3 { 89 + return true, nil 90 + } 91 + if gp[1] != role || gp[2] != domain { 92 + return true, nil 93 + } 94 + } 95 + return false, nil 96 + } 97 + 98 + func (e *Enforcer) WouldHaveAnyPolicyExcludingKnotMember(user, domain string) (bool, error) { 99 + return e.wouldHaveAnyPolicyExcludingGrouping(user, "server:member", domain) 100 + } 101 + 102 + func (e *Enforcer) WouldHaveAnyPolicyExcludingSpindleMember(user, domain string) (bool, error) { 103 + return e.wouldHaveAnyPolicyExcludingGrouping(user, "server:member", intoSpindle(domain)) 104 + } 105 + 75 106 func checkRepoFormat(repo string) error { 76 107 // sanity check, repo must be of the form ownerDid/repo 77 108 if parts := strings.SplitN(repo, "/", 2); !strings.HasPrefix(parts[0], "did:") {
+135 -37
spindle/ingester.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 7 + "errors" 6 8 "fmt" 7 - "time" 8 9 9 10 "tangled.org/core/api/tangled" 10 11 "tangled.org/core/spindle/db" ··· 33 34 } 34 35 35 36 if err != nil { 36 - s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err) 37 + s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey, "err", err) 37 38 } 38 39 39 40 lastTimeUs := e.TimeUS + 1 ··· 76 77 }, true 77 78 } 78 79 79 - func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 80 - var err error 80 + func (s *Spindle) ingestMember(ctx context.Context, e *models.Event) error { 81 81 did := e.Did 82 82 rkey := e.Commit.RKey 83 - 84 - l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 83 + l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID, "did", did, "rkey", rkey) 85 84 86 85 switch e.Commit.Operation { 87 86 case models.CommitOperationCreate, models.CommitOperationUpdate: 88 87 raw := e.Commit.Record 89 88 record := tangled.SpindleMember{} 90 - err = json.Unmarshal(raw, &record) 91 - if err != nil { 92 - l.Error("invalid record", "error", err) 93 - return err 89 + if err := json.Unmarshal(raw, &record); err != nil { 90 + return fmt.Errorf("invalid record: %w", err) 94 91 } 95 92 96 93 domain := s.cfg.Server.Hostname 97 94 recordInstance := record.Instance 98 95 99 96 if recordInstance != domain { 100 - l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 101 97 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 102 98 } 103 99 100 + subject, err := syntax.ParseDID(record.Subject) 101 + if err != nil { 102 + return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err) 103 + } 104 + 104 105 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 105 - if err != nil || !ok { 106 - l.Error("failed to add member", "did", did, "error", err) 106 + if err != nil { 107 107 return fmt.Errorf("failed to enforce permissions: %w", err) 108 108 } 109 + if !ok { 110 + return fmt.Errorf("permission denied for %s", did) 111 + } 109 112 110 - if err := db.AddSpindleMember(s.db, db.SpindleMember{ 113 + sqlTx, err := s.db.BeginTx(ctx, nil) 114 + if err != nil { 115 + return fmt.Errorf("failed to start txn: %w", err) 116 + } 117 + committed := false 118 + defer func() { 119 + if !committed { 120 + sqlTx.Rollback() 121 + } 122 + }() 123 + 124 + existing, err := db.GetSpindleMember(sqlTx, did, rkey) 125 + if err != nil && !errors.Is(err, sql.ErrNoRows) { 126 + return fmt.Errorf("failed to look up existing member: %w", err) 127 + } 128 + 129 + var staleSubject string 130 + if existing != nil && existing.Subject != subject { 131 + staleSubject = existing.Subject.String() 132 + if err := db.RemoveSpindleMember(sqlTx, did, rkey); err != nil { 133 + return fmt.Errorf("failed to remove stale member row: %w", err) 134 + } 135 + } 136 + 137 + if err := db.AddSpindleMember(sqlTx, db.SpindleMember{ 111 138 Did: syntax.DID(did), 112 139 Rkey: rkey, 113 140 Instance: recordInstance, 114 - Subject: syntax.DID(record.Subject), 115 - Created: time.Now(), 141 + Subject: subject, 116 142 }); err != nil { 117 - l.Error("failed to add member", "error", err) 118 143 return fmt.Errorf("failed to add member: %w", err) 119 144 } 120 145 121 - if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 122 - l.Error("failed to add member", "error", err) 123 - return fmt.Errorf("failed to add member: %w", err) 146 + if err := db.AddDid(sqlTx, subject.String()); err != nil { 147 + return fmt.Errorf("failed to add did: %w", err) 124 148 } 125 - l.Info("added member from firehose", "member", record.Subject) 126 149 127 - if err := s.db.AddDid(record.Subject); err != nil { 128 - l.Error("failed to add did", "error", err) 129 - return fmt.Errorf("failed to add did: %w", err) 150 + dropStaleAcl := false 151 + var staleDidDropped bool 152 + if staleSubject != "" { 153 + remaining, err := db.CountSpindleMembersBySubject(sqlTx, staleSubject) 154 + if err != nil { 155 + return fmt.Errorf("failed to count stale subject rows: %w", err) 156 + } 157 + if remaining == 0 { 158 + dropStaleAcl = true 159 + stillNeeded, err := s.e.WouldHaveAnyPolicyExcludingSpindleMember(staleSubject, rbacDomain) 160 + if err != nil { 161 + return fmt.Errorf("failed to check residual policies for stale subject: %w", err) 162 + } 163 + if !stillNeeded { 164 + if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 165 + return fmt.Errorf("failed to remove stale did: %w", err) 166 + } 167 + staleDidDropped = true 168 + } 169 + } 170 + l.Info("replaced stale spindle member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped) 130 171 } 131 - s.jc.AddDid(record.Subject) 172 + 173 + if err := sqlTx.Commit(); err != nil { 174 + return fmt.Errorf("failed to commit txn: %w", err) 175 + } 176 + committed = true 132 177 178 + if dropStaleAcl { 179 + if _, err := s.e.TryRemoveSpindleMember(rbacDomain, staleSubject); err != nil { 180 + l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err) 181 + } 182 + } 183 + if _, err := s.e.TryAddSpindleMember(rbacDomain, subject.String()); err != nil { 184 + l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err) 185 + } 186 + 187 + if staleDidDropped { 188 + s.jc.RemoveDid(staleSubject) 189 + } 190 + s.jc.AddDid(subject.String()) 191 + l.Info("added member from firehose", "member", subject) 133 192 return nil 134 193 135 194 case models.CommitOperationDelete: 136 - record, err := db.GetSpindleMember(s.db, did, rkey) 195 + sqlTx, err := s.db.BeginTx(ctx, nil) 196 + if err != nil { 197 + return fmt.Errorf("failed to start txn: %w", err) 198 + } 199 + committed := false 200 + defer func() { 201 + if !committed { 202 + sqlTx.Rollback() 203 + } 204 + }() 205 + 206 + record, err := db.GetSpindleMember(sqlTx, did, rkey) 207 + if errors.Is(err, sql.ErrNoRows) { 208 + l.Info("spindle member already removed") 209 + return nil 210 + } 137 211 if err != nil { 138 - l.Error("failed to find member", "error", err) 139 212 return fmt.Errorf("failed to find member: %w", err) 140 213 } 141 214 142 - if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 143 - l.Error("failed to remove member", "error", err) 215 + staleSubject := record.Subject.String() 216 + 217 + if err := db.RemoveSpindleMember(sqlTx, did, rkey); err != nil { 144 218 return fmt.Errorf("failed to remove member: %w", err) 145 219 } 146 220 147 - if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 148 - l.Error("failed to add member", "error", err) 149 - return fmt.Errorf("failed to add member: %w", err) 221 + remaining, err := db.CountSpindleMembersBySubject(sqlTx, staleSubject) 222 + if err != nil { 223 + return fmt.Errorf("failed to count remaining member rows: %w", err) 224 + } 225 + 226 + dropAcl := false 227 + var staleDidDropped bool 228 + if remaining == 0 { 229 + dropAcl = true 230 + stillNeeded, err := s.e.WouldHaveAnyPolicyExcludingSpindleMember(staleSubject, rbacDomain) 231 + if err != nil { 232 + return fmt.Errorf("failed to check residual policies: %w", err) 233 + } 234 + if !stillNeeded { 235 + if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 236 + return fmt.Errorf("failed to remove did: %w", err) 237 + } 238 + staleDidDropped = true 239 + } 240 + } 241 + 242 + if err := sqlTx.Commit(); err != nil { 243 + return fmt.Errorf("failed to commit txn: %w", err) 150 244 } 151 - l.Info("added member from firehose", "member", record.Subject) 245 + committed = true 152 246 153 - if err := s.db.RemoveDid(record.Subject.String()); err != nil { 154 - l.Error("failed to add did", "error", err) 155 - return fmt.Errorf("failed to add did: %w", err) 247 + if dropAcl { 248 + if _, err := s.e.TryRemoveSpindleMember(rbacDomain, staleSubject); err != nil { 249 + l.Error("post-commit: failed to remove member ACL", "subject", staleSubject, "err", err) 250 + } 156 251 } 157 - s.jc.RemoveDid(record.Subject.String()) 158 252 253 + if staleDidDropped { 254 + s.jc.RemoveDid(staleSubject) 255 + } 256 + l.Info("removed member from firehose", "member", record.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped) 159 257 } 160 258 return nil 161 259 }