Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

appview: kt+sp members ingestion

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 19, 2026, 12:59 PM +0300) commit b2f2af4e parent 49d36837 change-id xvxmnzry
+631 -180
+16
appview/db/db.go
··· 1961 1961 return err 1962 1962 }) 1963 1963 1964 + orm.RunMigration(conn, logger, "add-knot-members-table", func(tx *sql.Tx) error { 1965 + _, err := tx.Exec(` 1966 + create table if not exists knot_members ( 1967 + id integer primary key autoincrement, 1968 + did text not null, 1969 + rkey text not null, 1970 + domain text not null, 1971 + subject text not null, 1972 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 1973 + unique (did, domain, subject) 1974 + ); 1975 + create index if not exists idx_knot_members_did_rkey on knot_members(did, rkey); 1976 + `) 1977 + return err 1978 + }) 1979 + 1964 1980 return &DB{ 1965 1981 db, 1966 1982 logger,
+88
appview/db/registration.go
··· 92 92 _, err := e.Exec(` 93 93 insert into registrations (domain, did) 94 94 values (?, ?) 95 + on conflict (domain, did) do nothing 95 96 `, domain, did) 96 97 return err 97 98 } ··· 114 115 _, err := e.Exec(query, args...) 115 116 return err 116 117 } 118 + 119 + func AddKnotMember(e Execer, member models.KnotMember) error { 120 + _, err := e.Exec( 121 + `insert into knot_members (did, rkey, domain, subject) 122 + values (?, ?, ?, ?) 123 + on conflict (did, domain, subject) do update set rkey = excluded.rkey`, 124 + member.Did, 125 + member.Rkey, 126 + member.Domain, 127 + member.Subject, 128 + ) 129 + return err 130 + } 131 + 132 + func RemoveKnotMember(e Execer, filters ...orm.Filter) error { 133 + if len(filters) == 0 { 134 + return fmt.Errorf("RemoveKnotMember requires at least one filter") 135 + } 136 + 137 + var conditions []string 138 + var args []any 139 + for _, filter := range filters { 140 + conditions = append(conditions, filter.Condition()) 141 + args = append(args, filter.Arg()...) 142 + } 143 + 144 + query := fmt.Sprintf(`delete from knot_members where %s`, strings.Join(conditions, " and ")) 145 + 146 + _, err := e.Exec(query, args...) 147 + return err 148 + } 149 + 150 + func GetKnotMembers(e Execer, filters ...orm.Filter) ([]models.KnotMember, error) { 151 + var members []models.KnotMember 152 + 153 + var conditions []string 154 + var args []any 155 + for _, filter := range filters { 156 + conditions = append(conditions, filter.Condition()) 157 + args = append(args, filter.Arg()...) 158 + } 159 + 160 + whereClause := "" 161 + if conditions != nil { 162 + whereClause = " where " + strings.Join(conditions, " and ") 163 + } 164 + 165 + query := fmt.Sprintf( 166 + `select id, did, rkey, domain, subject, created 167 + from knot_members 168 + %s 169 + order by created 170 + `, 171 + whereClause, 172 + ) 173 + 174 + rows, err := e.Query(query, args...) 175 + if err != nil { 176 + return nil, err 177 + } 178 + defer rows.Close() 179 + 180 + for rows.Next() { 181 + var member models.KnotMember 182 + var createdAt string 183 + 184 + if err := rows.Scan( 185 + &member.Id, 186 + &member.Did, 187 + &member.Rkey, 188 + &member.Domain, 189 + &member.Subject, 190 + &createdAt, 191 + ); err != nil { 192 + return nil, err 193 + } 194 + 195 + member.Created, err = time.Parse(time.RFC3339, createdAt) 196 + if err != nil { 197 + member.Created = time.Now() 198 + } 199 + 200 + members = append(members, member) 201 + } 202 + 203 + return members, nil 204 + }
+7 -8
appview/db/spindle.go
··· 83 83 return spindles, nil 84 84 } 85 85 86 - // if there is an existing spindle with the same instance, this returns an error 87 86 func AddSpindle(e Execer, spindle models.Spindle) error { 88 87 _, err := e.Exec( 89 - `insert into spindles (owner, instance) values (?, ?)`, 88 + `insert into spindles (owner, instance) values (?, ?) 89 + on conflict (owner, instance) do nothing`, 90 90 spindle.Owner, 91 91 spindle.Instance, 92 92 ) ··· 147 147 } 148 148 149 149 func RemoveSpindleMember(e Execer, filters ...orm.Filter) error { 150 + if len(filters) == 0 { 151 + return fmt.Errorf("RemoveSpindleMember requires at least one filter") 152 + } 153 + 150 154 var conditions []string 151 155 var args []any 152 156 for _, filter := range filters { ··· 154 158 args = append(args, filter.Arg()...) 155 159 } 156 160 157 - whereClause := "" 158 - if conditions != nil { 159 - whereClause = " where " + strings.Join(conditions, " and ") 160 - } 161 - 162 - query := fmt.Sprintf(`delete from spindle_members %s`, whereClause) 161 + query := fmt.Sprintf(`delete from spindle_members where %s`, strings.Join(conditions, " and ")) 163 162 164 163 _, err := e.Exec(query, args...) 165 164 return err
+333 -59
appview/ingester.go
··· 38 38 ) 39 39 40 40 type Ingester struct { 41 + Ctx context.Context 41 42 Db *db.DB 42 43 Enforcer *rbac.Enforcer 43 44 IdResolver *idresolver.Resolver ··· 87 88 case tangled.SpindleNSID: 88 89 err = i.ingestSpindle(ctx, e) 89 90 case tangled.KnotMemberNSID: 90 - err = i.ingestKnotMember(e) 91 + err = i.ingestKnotMember(ctx, e) 91 92 case tangled.KnotNSID: 92 - err = i.ingestKnot(e) 93 + err = i.ingestKnot(ctx, e) 93 94 case tangled.StringNSID: 94 95 err = i.ingestString(e) 95 96 case tangled.RepoIssueNSID: ··· 640 641 l = l.With("nsid", e.Commit.Collection) 641 642 642 643 switch e.Commit.Operation { 643 - case jmodels.CommitOperationCreate: 644 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 644 645 raw := json.RawMessage(e.Commit.Record) 645 646 record := tangled.SpindleMember{} 646 647 err = json.Unmarshal(raw, &record) ··· 651 652 652 653 // only spindle owner can invite to spindles 653 654 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 654 - if err != nil || !ok { 655 - return fmt.Errorf("failed to enforce permissions: %w", err) 655 + if err != nil { 656 + return fmt.Errorf("failed to check invite permission: %w", err) 657 + } 658 + if !ok { 659 + if verifyErr := i.verifySpindle(ctx, record.Instance, did); verifyErr != nil { 660 + return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 661 + } 662 + ok, err = i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 663 + if err != nil { 664 + return fmt.Errorf("failed to re-check invite permission: %w", err) 665 + } 666 + if !ok { 667 + return fmt.Errorf("invite denied for did %s on spindle %s", did, record.Instance) 668 + } 656 669 } 657 670 658 671 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) ··· 661 674 } 662 675 663 676 if memberId.Handle.IsInvalidHandle() { 664 - return err 677 + return fmt.Errorf("invalid handle for member %s", record.Subject) 665 678 } 666 679 667 - err = db.AddSpindleMember(i.Db, models.SpindleMember{ 680 + existing, err := db.GetSpindleMembers(i.Db, 681 + orm.FilterEq("did", did), 682 + orm.FilterEq("rkey", e.Commit.RKey), 683 + ) 684 + if err != nil { 685 + return fmt.Errorf("failed to look up existing member: %w", err) 686 + } 687 + if len(existing) > 1 { 688 + return fmt.Errorf("multiple spindle members with rkey %s", e.Commit.RKey) 689 + } 690 + 691 + tx, err := i.Db.Begin() 692 + if err != nil { 693 + return fmt.Errorf("failed to start txn: %w", err) 694 + } 695 + committed := false 696 + defer func() { 697 + if committed { 698 + return 699 + } 700 + tx.Rollback() 701 + i.Enforcer.E.LoadPolicy() 702 + }() 703 + 704 + if len(existing) == 1 { 705 + prev := existing[0] 706 + if prev.Instance != record.Instance || prev.Subject != memberId.DID { 707 + if err = db.RemoveSpindleMember(tx, 708 + orm.FilterEq("did", did), 709 + orm.FilterEq("rkey", e.Commit.RKey), 710 + ); err != nil { 711 + return fmt.Errorf("failed to remove stale row: %w", err) 712 + } 713 + if err = i.Enforcer.RemoveSpindleMember(prev.Instance, prev.Subject.String()); err != nil { 714 + return fmt.Errorf("failed to remove stale ACL: %w", err) 715 + } 716 + } 717 + } 718 + 719 + if err = db.AddSpindleMember(tx, models.SpindleMember{ 668 720 Did: syntax.DID(did), 669 721 Rkey: e.Commit.RKey, 670 722 Instance: record.Instance, 671 723 Subject: memberId.DID, 672 - }) 673 - if !ok { 724 + }); err != nil { 674 725 return fmt.Errorf("failed to add to db: %w", err) 675 726 } 676 727 677 - err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 678 - if err != nil { 728 + if err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()); err != nil { 679 729 return fmt.Errorf("failed to update ACLs: %w", err) 680 730 } 681 731 682 - l.Info("added spindle member") 732 + if err = tx.Commit(); err != nil { 733 + return fmt.Errorf("failed to commit txn: %w", err) 734 + } 735 + 736 + if err = i.Enforcer.E.SavePolicy(); err != nil { 737 + return fmt.Errorf("failed to save ACLs: %w", err) 738 + } 739 + committed = true 740 + 741 + l.Info("upserted spindle member") 683 742 case jmodels.CommitOperationDelete: 684 743 rkey := e.Commit.RKey 685 744 ··· 698 757 if err != nil { 699 758 return fmt.Errorf("failed to start txn: %w", err) 700 759 } 760 + committed := false 761 + defer func() { 762 + if committed { 763 + return 764 + } 765 + tx.Rollback() 766 + i.Enforcer.E.LoadPolicy() 767 + }() 701 768 702 769 // remove record by rkey && update enforcer 703 770 if err = db.RemoveSpindleMember( ··· 721 788 if err = i.Enforcer.E.SavePolicy(); err != nil { 722 789 return fmt.Errorf("failed to save ACLs: %w", err) 723 790 } 791 + committed = true 724 792 725 793 l.Info("removed spindle member") 726 794 } ··· 736 804 l = l.With("nsid", e.Commit.Collection) 737 805 738 806 switch e.Commit.Operation { 739 - case jmodels.CommitOperationCreate: 807 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 740 808 raw := json.RawMessage(e.Commit.Record) 741 809 record := tangled.Spindle{} 742 810 err = json.Unmarshal(raw, &record) ··· 756 824 return err 757 825 } 758 826 759 - err = retry.Do( 760 - func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 761 - retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 762 - retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 763 - ) 764 - if err != nil { 765 - l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 766 - return err 767 - } 768 - 769 - _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 770 - if err != nil { 771 - return fmt.Errorf("failed to mark verified: %w", err) 827 + if err := i.verifySpindle(ctx, instance, did); err != nil { 828 + l.Warn("failed to verify spindle", "instance", instance, "did", did, "err", err) 772 829 } 773 830 774 831 return nil ··· 886 943 return nil 887 944 } 888 945 889 - func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 946 + func (i *Ingester) ingestKnotMember(ctx context.Context, e *jmodels.Event) error { 890 947 did := e.Did 891 948 var err error 892 949 ··· 894 951 l = l.With("nsid", e.Commit.Collection) 895 952 896 953 switch e.Commit.Operation { 897 - case jmodels.CommitOperationCreate: 954 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 898 955 raw := json.RawMessage(e.Commit.Record) 899 956 record := tangled.KnotMember{} 900 957 err = json.Unmarshal(raw, &record) ··· 905 962 906 963 // only knot owner can invite to knots 907 964 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 908 - if err != nil || !ok { 909 - return fmt.Errorf("failed to enforce permissions: %w", err) 965 + if err != nil { 966 + return fmt.Errorf("failed to check invite permission: %w", err) 967 + } 968 + if !ok { 969 + if verifyErr := i.verifyKnot(ctx, record.Domain, did); verifyErr != nil { 970 + return fmt.Errorf("invite denied and verify failed: %w", verifyErr) 971 + } 972 + ok, err = i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 973 + if err != nil { 974 + return fmt.Errorf("failed to re-check invite permission: %w", err) 975 + } 976 + if !ok { 977 + return fmt.Errorf("invite denied for did %s on knot %s", did, record.Domain) 978 + } 910 979 } 911 980 912 - memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 981 + memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 913 982 if err != nil { 914 983 return err 915 984 } 916 985 917 986 if memberId.Handle.IsInvalidHandle() { 918 - return err 987 + return fmt.Errorf("invalid handle for member %s", record.Subject) 919 988 } 920 989 921 - err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 990 + existing, err := db.GetKnotMembers(i.Db, 991 + orm.FilterEq("did", did), 992 + orm.FilterEq("rkey", e.Commit.RKey), 993 + ) 922 994 if err != nil { 995 + return fmt.Errorf("failed to look up existing member: %w", err) 996 + } 997 + if len(existing) > 1 { 998 + return fmt.Errorf("multiple knot members with rkey %s", e.Commit.RKey) 999 + } 1000 + 1001 + tx, err := i.Db.Begin() 1002 + if err != nil { 1003 + return fmt.Errorf("failed to start txn: %w", err) 1004 + } 1005 + committed := false 1006 + defer func() { 1007 + if committed { 1008 + return 1009 + } 1010 + tx.Rollback() 1011 + i.Enforcer.E.LoadPolicy() 1012 + }() 1013 + 1014 + if len(existing) == 1 { 1015 + prev := existing[0] 1016 + if prev.Domain != record.Domain || prev.Subject != memberId.DID { 1017 + if err = db.RemoveKnotMember(tx, 1018 + orm.FilterEq("did", did), 1019 + orm.FilterEq("rkey", e.Commit.RKey), 1020 + ); err != nil { 1021 + return fmt.Errorf("failed to remove stale row: %w", err) 1022 + } 1023 + if err = i.Enforcer.RemoveKnotMember(prev.Domain, prev.Subject.String()); err != nil { 1024 + return fmt.Errorf("failed to remove stale ACL: %w", err) 1025 + } 1026 + } 1027 + } 1028 + 1029 + if err = db.AddKnotMember(tx, models.KnotMember{ 1030 + Did: syntax.DID(did), 1031 + Rkey: e.Commit.RKey, 1032 + Domain: record.Domain, 1033 + Subject: memberId.DID, 1034 + }); err != nil { 1035 + return fmt.Errorf("failed to add to db: %w", err) 1036 + } 1037 + 1038 + if err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()); err != nil { 923 1039 return fmt.Errorf("failed to update ACLs: %w", err) 924 1040 } 925 1041 926 - l.Info("added knot member") 1042 + if err = tx.Commit(); err != nil { 1043 + return fmt.Errorf("failed to commit txn: %w", err) 1044 + } 1045 + 1046 + if err = i.Enforcer.E.SavePolicy(); err != nil { 1047 + return fmt.Errorf("failed to save ACLs: %w", err) 1048 + } 1049 + committed = true 1050 + 1051 + l.Info("upserted knot member") 927 1052 case jmodels.CommitOperationDelete: 928 - // we don't store knot members in a table (like we do for spindle) 929 - // and we can't remove this just yet. possibly fixed if we switch 930 - // to either: 931 - // 1. a knot_members table like with spindle and store the rkey 932 - // 2. use the knot host as the rkey 933 - // 934 - // TODO: implement member deletion 935 - l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 1053 + rkey := e.Commit.RKey 1054 + 1055 + members, err := db.GetKnotMembers( 1056 + i.Db, 1057 + orm.FilterEq("did", did), 1058 + orm.FilterEq("rkey", rkey), 1059 + ) 1060 + if err != nil { 1061 + return fmt.Errorf("failed to look up knot member with rkey %s: %w", rkey, err) 1062 + } 1063 + if len(members) == 0 { 1064 + l.Info("knot member already removed", "rkey", rkey) 1065 + return nil 1066 + } 1067 + if len(members) > 1 { 1068 + return fmt.Errorf("multiple knot members with rkey %s", rkey) 1069 + } 1070 + member := members[0] 1071 + 1072 + tx, err := i.Db.Begin() 1073 + if err != nil { 1074 + return fmt.Errorf("failed to start txn: %w", err) 1075 + } 1076 + committed := false 1077 + defer func() { 1078 + if committed { 1079 + return 1080 + } 1081 + tx.Rollback() 1082 + i.Enforcer.E.LoadPolicy() 1083 + }() 1084 + 1085 + if err = db.RemoveKnotMember( 1086 + tx, 1087 + orm.FilterEq("did", did), 1088 + orm.FilterEq("rkey", rkey), 1089 + ); err != nil { 1090 + return fmt.Errorf("failed to remove from db: %w", err) 1091 + } 1092 + 1093 + if err = i.Enforcer.RemoveKnotMember(member.Domain, member.Subject.String()); err != nil { 1094 + return fmt.Errorf("failed to update ACLs: %w", err) 1095 + } 1096 + 1097 + if err = tx.Commit(); err != nil { 1098 + return fmt.Errorf("failed to commit txn: %w", err) 1099 + } 1100 + 1101 + if err = i.Enforcer.E.SavePolicy(); err != nil { 1102 + return fmt.Errorf("failed to save ACLs: %w", err) 1103 + } 1104 + committed = true 1105 + 1106 + l.Info("removed knot member") 936 1107 } 937 1108 938 1109 return nil 939 1110 } 940 1111 941 - func (i *Ingester) ingestKnot(e *jmodels.Event) error { 1112 + func (i *Ingester) ingestKnot(ctx context.Context, e *jmodels.Event) error { 942 1113 did := e.Did 943 1114 var err error 944 1115 ··· 946 1117 l = l.With("nsid", e.Commit.Collection) 947 1118 948 1119 switch e.Commit.Operation { 949 - case jmodels.CommitOperationCreate: 1120 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 950 1121 raw := json.RawMessage(e.Commit.Record) 951 1122 record := tangled.Knot{} 952 1123 err = json.Unmarshal(raw, &record) ··· 963 1134 return err 964 1135 } 965 1136 966 - err = retry.Do( 967 - func() error { 968 - return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 969 - }, 970 - retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 971 - retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 972 - ) 973 - if err != nil { 974 - l.Error("failed to verify knot after retries", "err", err, "domain", domain) 975 - return err 976 - } 977 - 978 - err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 979 - if err != nil { 980 - return fmt.Errorf("failed to mark verified: %w", err) 1137 + if err := i.verifyKnot(ctx, domain, did); err != nil { 1138 + l.Warn("failed to verify knot", "domain", domain, "did", did, "err", err) 981 1139 } 982 1140 983 1141 return nil ··· 1008 1166 i.Enforcer.E.LoadPolicy() 1009 1167 }() 1010 1168 1169 + err = db.RemoveKnotMember( 1170 + tx, 1171 + orm.FilterEq("did", did), 1172 + orm.FilterEq("domain", domain), 1173 + ) 1174 + if err != nil { 1175 + return err 1176 + } 1177 + 1011 1178 err = db.DeleteKnot( 1012 1179 tx, 1013 1180 orm.FilterEq("did", did), ··· 1037 1204 1038 1205 return nil 1039 1206 } 1207 + 1208 + const ( 1209 + verifyAttempts = 4 1210 + verifyMinDelay = 1 * time.Second 1211 + verifyMaxDelay = 5 * time.Second 1212 + ) 1213 + 1214 + func (i *Ingester) verifyKnot(ctx context.Context, domain, did string) error { 1215 + regs, err := db.GetRegistrations(i.Db, 1216 + orm.FilterEq("domain", domain), 1217 + orm.FilterEq("did", did), 1218 + ) 1219 + if err != nil { 1220 + return fmt.Errorf("look up registration: %w", err) 1221 + } 1222 + if len(regs) != 1 { 1223 + return fmt.Errorf("no registration for %s by %s", domain, did) 1224 + } 1225 + if regs[0].Registered != nil { 1226 + return nil 1227 + } 1228 + 1229 + err = retry.Do( 1230 + func() error { return serververify.RunVerification(ctx, domain, did, i.Config.Core.Dev) }, 1231 + retry.Context(ctx), 1232 + retry.Attempts(verifyAttempts), 1233 + retry.Delay(verifyMinDelay), 1234 + retry.MaxDelay(verifyMaxDelay), 1235 + retry.DelayType(retry.BackOffDelay), 1236 + retry.LastErrorOnly(true), 1237 + ) 1238 + if err != nil { 1239 + return fmt.Errorf("verify: %w", err) 1240 + } 1241 + return serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 1242 + } 1243 + 1244 + func (i *Ingester) verifySpindle(ctx context.Context, instance, did string) error { 1245 + spindles, err := db.GetSpindles(ctx, i.Db, 1246 + orm.FilterEq("instance", instance), 1247 + orm.FilterEq("owner", did), 1248 + ) 1249 + if err != nil { 1250 + return fmt.Errorf("look up spindle: %w", err) 1251 + } 1252 + if len(spindles) != 1 { 1253 + return fmt.Errorf("no spindle for %s by %s", instance, did) 1254 + } 1255 + if spindles[0].Verified != nil { 1256 + return nil 1257 + } 1258 + 1259 + err = retry.Do( 1260 + func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 1261 + retry.Context(ctx), 1262 + retry.Attempts(verifyAttempts), 1263 + retry.Delay(verifyMinDelay), 1264 + retry.MaxDelay(verifyMaxDelay), 1265 + retry.DelayType(retry.BackOffDelay), 1266 + retry.LastErrorOnly(true), 1267 + ) 1268 + if err != nil { 1269 + return fmt.Errorf("verify: %w", err) 1270 + } 1271 + _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 1272 + return err 1273 + } 1274 + 1275 + const sweepConcurrency = 4 1276 + 1277 + func (i *Ingester) SweepPendingVerifications() { 1278 + l := i.Logger.With("handler", "SweepPendingVerifications") 1279 + 1280 + var g errgroup.Group 1281 + g.SetLimit(sweepConcurrency) 1282 + 1283 + regs, err := db.GetRegistrations(i.Db, orm.FilterIs("registered", nil)) 1284 + if err != nil { 1285 + l.Error("failed to list unverified knots", "err", err) 1286 + } else { 1287 + for _, reg := range regs { 1288 + g.Go(func() error { 1289 + if err := i.verifyKnot(i.Ctx, reg.Domain, reg.ByDid); err != nil { 1290 + l.Warn("verify knot failed", "domain", reg.Domain, "did", reg.ByDid, "err", err) 1291 + } 1292 + return nil 1293 + }) 1294 + } 1295 + } 1296 + 1297 + spindles, err := db.GetSpindles(i.Ctx, i.Db, orm.FilterIs("verified", nil)) 1298 + if err != nil { 1299 + l.Error("failed to list unverified spindles", "err", err) 1300 + g.Wait() 1301 + return 1302 + } 1303 + for _, s := range spindles { 1304 + g.Go(func() error { 1305 + if err := i.verifySpindle(i.Ctx, s.Instance, s.Owner.String()); err != nil { 1306 + l.Warn("verify spindle failed", "instance", s.Instance, "owner", s.Owner, "err", err) 1307 + } 1308 + return nil 1309 + }) 1310 + } 1311 + g.Wait() 1312 + } 1313 + 1040 1314 func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 1041 1315 did := e.Did 1042 1316 rkey := e.Commit.RKey
+31 -1
appview/ingester_repo.go
··· 262 262 return fmt.Errorf("failed to fetch repo for delete: %w", err) 263 263 } 264 264 265 - if err := db.RemoveRepo(i.Db, e.Did, e.Commit.RKey); err != nil { 265 + if i.Enforcer == nil { 266 + return fmt.Errorf("ingester has no RBAC enforcer configured") 267 + } 268 + 269 + tx, err := i.Db.Begin() 270 + if err != nil { 271 + return fmt.Errorf("failed to start txn: %w", err) 272 + } 273 + committed := false 274 + defer func() { 275 + if committed { 276 + return 277 + } 278 + tx.Rollback() 279 + i.Enforcer.E.LoadPolicy() 280 + }() 281 + 282 + if err := db.RemoveRepo(tx, e.Did, e.Commit.RKey); err != nil { 266 283 return fmt.Errorf("failed to delete repo: %w", err) 267 284 } 285 + 286 + if err := i.Enforcer.WipeRepoPolicies(repo.Knot, repo.RepoIdentifier()); err != nil { 287 + return fmt.Errorf("failed to wipe repo permissions: %w", err) 288 + } 289 + 290 + if err := tx.Commit(); err != nil { 291 + return fmt.Errorf("failed to commit txn: %w", err) 292 + } 293 + 294 + if err := i.Enforcer.E.SavePolicy(); err != nil { 295 + return fmt.Errorf("failed to save ACLs: %w", err) 296 + } 297 + committed = true 268 298 269 299 i.Notifier.DeleteRepo(ctx, repo) 270 300 l.Info("deleted repo row")
+35
appview/ingester_repo_test.go
··· 175 175 } 176 176 } 177 177 178 + func assertNoRepoPolicies(t *testing.T, ing *Ingester, knot, repo string) { 179 + t.Helper() 180 + for _, perm := range []string{"repo:settings", "repo:push", "repo:owner", "repo:delete", "repo:invite", "repo:collaborator"} { 181 + policies, err := ing.Enforcer.E.GetFilteredPolicy(1, knot, repo, perm) 182 + if err != nil { 183 + t.Fatalf("GetFilteredPolicy(%q): %v", perm, err) 184 + } 185 + if len(policies) != 0 { 186 + t.Fatalf("expected no %s policies for %s, got %v", perm, repo, policies) 187 + } 188 + } 189 + } 190 + 178 191 func TestIngestRepo_CreateInsertsNewRow(t *testing.T) { 179 192 ing, spy := newTestIngester(t) 180 193 ··· 418 431 if !errors.Is(err, sql.ErrNoRows) { 419 432 t.Errorf("expected row to be deleted, got err = %v", err) 420 433 } 434 + } 435 + 436 + func TestIngestRepo_DeleteWipesRbac(t *testing.T) { 437 + ing, _ := newTestIngester(t) 438 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1") 439 + if err := ing.ensureRepoOwnerPermissions("did:plc:akshay", "knot.example", "did:plc:repo1"); err != nil { 440 + t.Fatalf("ensureRepoOwnerPermissions: %v", err) 441 + } 442 + if err := ing.Enforcer.AddCollaborator("did:plc:boltless", "knot.example", "did:plc:repo1"); err != nil { 443 + t.Fatalf("AddCollaborator: %v", err) 444 + } 445 + if err := ing.Enforcer.E.SavePolicy(); err != nil { 446 + t.Fatalf("SavePolicy: %v", err) 447 + } 448 + assertRepoOwnerPermissions(t, ing, "did:plc:akshay", "knot.example", "did:plc:repo1") 449 + 450 + e := makeDeleteEvent("did:plc:akshay", "foo") 451 + if err := ingestAcceptingOwner(t, ing, e); err != nil { 452 + t.Fatalf("ingestRepo: %v", err) 453 + } 454 + 455 + assertNoRepoPolicies(t, ing, "knot.example", "did:plc:repo1") 421 456 } 422 457 423 458 func TestIngestRepo_MalformedRecord(t *testing.T) {
+88 -70
appview/knots/knots.go
··· 1 1 package knots 2 2 3 3 import ( 4 + "context" 4 5 "errors" 5 6 "fmt" 6 7 "log/slog" ··· 26 27 "tangled.org/core/tid" 27 28 28 29 comatproto "github.com/bluesky-social/indigo/api/atproto" 30 + "github.com/bluesky-social/indigo/atproto/atclient" 29 31 lexutil "github.com/bluesky-social/indigo/lex/util" 30 32 ) 31 33 ··· 165 167 fail() 166 168 return 167 169 } 168 - defer func() { 169 - tx.Rollback() 170 - k.Enforcer.E.LoadPolicy() 171 - }() 170 + defer tx.Rollback() 172 171 173 - err = db.AddKnot(tx, domain, user.Did) 174 - if err != nil { 172 + if err := db.AddKnot(tx, domain, user.Did); err != nil { 175 173 l.Error("failed to insert", "err", err) 176 174 fail() 177 175 return 178 176 } 179 177 180 - err = k.Enforcer.AddKnot(domain) 181 - if err != nil { 182 - l.Error("failed to create knot", "err", err) 183 - fail() 184 - return 185 - } 186 - 187 - // create record on pds 188 178 client, err := k.OAuth.AuthorizedClient(r) 189 179 if err != nil { 190 180 l.Error("failed to authorize client", "err", err) ··· 198 188 exCid = ex.Cid 199 189 } 200 190 201 - // re-announce by registering under same rkey 202 191 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 203 192 Collection: tangled.KnotNSID, 204 193 Repo: user.Did, ··· 210 199 }, 211 200 SwapRecord: exCid, 212 201 }) 213 - 214 202 if err != nil { 215 203 l.Error("failed to put record", "err", err) 216 204 fail() 217 205 return 218 206 } 219 207 220 - err = tx.Commit() 221 - if err != nil { 208 + if err := tx.Commit(); err != nil { 222 209 l.Error("failed to commit transaction", "err", err) 223 210 fail() 224 211 return 225 212 } 226 213 227 - err = k.Enforcer.E.SavePolicy() 228 - if err != nil { 229 - l.Error("failed to update ACL", "err", err) 230 - k.Pages.HxRefresh(w) 231 - return 232 - } 214 + go k.Knotstream.AddSource(r.Context(), eventconsumer.NewKnotSource(domain)) 233 215 234 - // begin verification 235 - err = serververify.RunVerification(r.Context(), domain, user.Did, k.Config.Core.Dev) 236 - if err != nil { 237 - l.Error("verification failed", "err", err) 238 - k.Pages.HxRefresh(w) 239 - return 240 - } 241 - 242 - err = serververify.MarkKnotVerified(k.Db, k.Enforcer, domain, user.Did) 243 - if err != nil { 244 - l.Error("failed to mark verified", "err", err) 245 - k.Pages.HxRefresh(w) 246 - return 247 - } 248 - 249 - // add this knot to knotstream 250 - go k.Knotstream.AddSource( 251 - r.Context(), 252 - eventconsumer.NewKnotSource(domain), 253 - ) 254 - 255 - // ok 256 216 k.Pages.HxRefresh(w) 257 217 } 258 218 ··· 553 513 return 554 514 } 555 515 556 - // write to pds 557 516 client, err := k.OAuth.AuthorizedClient(r) 558 517 if err != nil { 559 518 l.Error("failed to authorize client", "err", err) 560 519 fail() 561 520 return 562 521 } 522 + 523 + if err = k.Enforcer.AddKnotMember(domain, memberId.DID.String()); err != nil { 524 + l.Error("failed to add member to ACLs", "err", err) 525 + fail() 526 + return 527 + } 528 + committed := false 529 + defer func() { 530 + if committed { 531 + return 532 + } 533 + k.Enforcer.E.LoadPolicy() 534 + }() 563 535 564 536 rkey := tid.TID() 565 - 566 537 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 567 538 Collection: tangled.KnotMemberNSID, 568 539 Repo: user.Did, ··· 581 552 return 582 553 } 583 554 584 - err = k.Enforcer.AddKnotMember(domain, memberId.DID.String()) 585 - if err != nil { 586 - l.Error("failed to add member to ACLs", "err", err) 587 - fail() 588 - return 589 - } 590 - 591 - err = k.Enforcer.E.SavePolicy() 592 - if err != nil { 555 + if err = k.Enforcer.E.SavePolicy(); err != nil { 593 556 l.Error("failed to save ACL policy", "err", err) 594 557 fail() 595 558 return 596 559 } 560 + committed = true 597 561 598 - // success 599 562 k.Pages.HxRedirect(w, fmt.Sprintf("/settings/knots/%s", domain)) 600 563 } 601 564 ··· 649 612 return 650 613 } 651 614 652 - // remove from enforcer 653 - err = k.Enforcer.RemoveKnotMember(domain, memberId.DID.String()) 615 + client, err := k.OAuth.AuthorizedClient(r) 654 616 if err != nil { 655 - l.Error("failed to update ACLs", "err", err) 617 + l.Error("failed to authorize client", "err", err) 656 618 fail() 657 619 return 658 620 } 659 621 660 - client, err := k.OAuth.AuthorizedClient(r) 622 + rkey, err := lookupKnotMemberRkey(r.Context(), k.Db, client, user.Did, domain, memberId.DID.String()) 661 623 if err != nil { 662 - l.Error("failed to authorize client", "err", err) 624 + l.Warn("failed to look up member rkey", "err", err) 625 + } 626 + 627 + if err = k.Enforcer.RemoveKnotMember(domain, memberId.DID.String()); err != nil { 628 + l.Error("failed to update ACLs", "err", err) 663 629 fail() 664 630 return 665 631 } 632 + committed := false 633 + defer func() { 634 + if committed { 635 + return 636 + } 637 + k.Enforcer.E.LoadPolicy() 638 + }() 666 639 667 - // TODO: We need to track the rkey for knot members to delete the record 668 - // For now, just remove from ACLs 669 - _ = client 640 + if rkey != "" { 641 + _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 642 + Collection: tangled.KnotMemberNSID, 643 + Repo: user.Did, 644 + Rkey: rkey, 645 + }) 646 + if err != nil { 647 + l.Error("failed to delete record from PDS", "err", err) 648 + k.Pages.Notice(w, noticeId, "Failed to delete record from PDS, try again later.") 649 + return 650 + } 651 + } 670 652 671 - // commit everything 672 - err = k.Enforcer.E.SavePolicy() 673 - if err != nil { 653 + if err = k.Enforcer.E.SavePolicy(); err != nil { 674 654 l.Error("failed to save ACLs", "err", err) 675 655 fail() 676 656 return 677 657 } 658 + committed = true 678 659 679 - // ok 680 660 k.Pages.HxRefresh(w) 681 661 } 662 + 663 + func lookupKnotMemberRkey(ctx context.Context, d *db.DB, client *atclient.APIClient, ownerDid, domain, subject string) (string, error) { 664 + members, err := db.GetKnotMembers( 665 + d, 666 + orm.FilterEq("did", ownerDid), 667 + orm.FilterEq("domain", domain), 668 + orm.FilterEq("subject", subject), 669 + ) 670 + if err != nil { 671 + return "", fmt.Errorf("db lookup: %w", err) 672 + } 673 + if len(members) >= 1 { 674 + return members[0].Rkey, nil 675 + } 676 + return findKnotMemberRkey(ctx, client, ownerDid, domain, subject, "") 677 + } 678 + 679 + func findKnotMemberRkey(ctx context.Context, client *atclient.APIClient, repo, domain, subject, cursor string) (string, error) { 680 + out, err := comatproto.RepoListRecords(ctx, client, tangled.KnotMemberNSID, cursor, 100, repo, false) 681 + if err != nil { 682 + return "", err 683 + } 684 + for _, rec := range out.Records { 685 + m, ok := rec.Value.Val.(*tangled.KnotMember) 686 + if !ok { 687 + continue 688 + } 689 + if m.Domain != domain || m.Subject != subject { 690 + continue 691 + } 692 + parts := strings.Split(rec.Uri, "/") 693 + return parts[len(parts)-1], nil 694 + } 695 + if out.Cursor == nil || *out.Cursor == "" || *out.Cursor == cursor { 696 + return "", nil 697 + } 698 + return findKnotMemberRkey(ctx, client, repo, domain, subject, *out.Cursor) 699 + }
+16
appview/models/knot_member.go
··· 1 + package models 2 + 3 + import ( 4 + "time" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + ) 8 + 9 + type KnotMember struct { 10 + Id int64 11 + Did syntax.DID 12 + Rkey string 13 + Domain string 14 + Subject syntax.DID 15 + Created time.Time 16 + }
+10
appview/serververify/verify.go
··· 69 69 if err != nil { 70 70 return 0, fmt.Errorf("failed to create txn: %w", err) 71 71 } 72 + committed := false 72 73 defer func() { 74 + if committed { 75 + return 76 + } 73 77 tx.Rollback() 74 78 e.E.LoadPolicy() 75 79 }() ··· 98 102 if err != nil { 99 103 return 0, fmt.Errorf("failed to update ACL: %w", err) 100 104 } 105 + committed = true 101 106 102 107 return rowId, nil 103 108 } ··· 108 113 if err != nil { 109 114 return fmt.Errorf("failed to start tx: %w", err) 110 115 } 116 + committed := false 111 117 defer func() { 118 + if committed { 119 + return 120 + } 112 121 tx.Rollback() 113 122 e.E.LoadPolicy() 114 123 }() ··· 144 153 if err != nil { 145 154 return fmt.Errorf("failed to update ACLs: %w", err) 146 155 } 156 + committed = true 147 157 148 158 return nil 149 159 }
+4 -42
appview/spindles/spindles.go
··· 170 170 fail() 171 171 return 172 172 } 173 - defer func() { 174 - tx.Rollback() 175 - s.Enforcer.E.LoadPolicy() 176 - }() 173 + defer tx.Rollback() 177 174 178 - err = db.AddSpindle(tx, models.Spindle{ 175 + if err := db.AddSpindle(tx, models.Spindle{ 179 176 Owner: syntax.DID(user.Did), 180 177 Instance: instance, 181 - }) 182 - if err != nil { 178 + }); err != nil { 183 179 l.Error("failed to insert", "err", err) 184 180 fail() 185 181 return 186 182 } 187 183 188 - err = s.Enforcer.AddSpindle(instance) 189 - if err != nil { 190 - l.Error("failed to create spindle", "err", err) 191 - fail() 192 - return 193 - } 194 - 195 - // create record on pds 196 184 client, err := s.OAuth.AuthorizedClient(r) 197 185 if err != nil { 198 186 l.Error("failed to authorize client", "err", err) ··· 206 194 exCid = ex.Cid 207 195 } 208 196 209 - // re-announce by registering under same rkey 210 197 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 211 198 Collection: tangled.SpindleNSID, 212 199 Repo: user.Did, ··· 218 205 }, 219 206 SwapRecord: exCid, 220 207 }) 221 - 222 208 if err != nil { 223 209 l.Error("failed to put record", "err", err) 224 210 fail() 225 211 return 226 212 } 227 213 228 - err = tx.Commit() 229 - if err != nil { 214 + if err := tx.Commit(); err != nil { 230 215 l.Error("failed to commit transaction", "err", err) 231 216 fail() 232 217 return 233 218 } 234 219 235 - err = s.Enforcer.E.SavePolicy() 236 - if err != nil { 237 - l.Error("failed to update ACL", "err", err) 238 - s.Pages.HxRefresh(w) 239 - return 240 - } 241 - 242 - // begin verification 243 - err = serververify.RunVerification(r.Context(), instance, user.Did, s.Config.Core.Dev) 244 - if err != nil { 245 - l.Error("verification failed", "err", err) 246 - s.Pages.HxRefresh(w) 247 - return 248 - } 249 - 250 - _, err = serververify.MarkSpindleVerified(s.Db, s.Enforcer, instance, user.Did) 251 - if err != nil { 252 - l.Error("failed to mark verified", "err", err) 253 - s.Pages.HxRefresh(w) 254 - return 255 - } 256 - 257 - // ok 258 220 s.Pages.HxRefresh(w) 259 221 } 260 222
+3
appview/state/state.go
··· 174 174 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 175 175 176 176 ingester := appview.Ingester{ 177 + Ctx: ctx, 177 178 Db: d, 178 179 Enforcer: enforcer, 179 180 IdResolver: res, ··· 188 189 if err != nil { 189 190 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 190 191 } 192 + 193 + go ingester.SweepPendingVerifications() 191 194 192 195 var cfClient *cloudflare.Client 193 196 if config.Cloudflare.ApiToken != "" {