Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver: firehose delete evt should do nothing

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 19, 2026, 12:09 PM +0300) commit 49d36837 parent 7171efe3 change-id npouwvqo
+140 -48
+33 -14
knotserver/db/db.go
··· 193 193 return d.storeRepoKeyRow(repoDid, nil, ownerDid, repoName, "web") 194 194 } 195 195 196 - func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) (err error) { 196 + func (d *DB) storeRepoKeyRow(repoDid string, signingKey []byte, ownerDid, repoName, keyType string) error { 197 197 tx, err := d.db.Begin() 198 198 if err != nil { 199 199 return err 200 200 } 201 - defer func() { 202 - if err != nil { 203 - tx.Rollback() 204 - return 205 - } 206 - err = tx.Commit() 207 - }() 201 + defer tx.Rollback() 208 202 209 - if _, err = tx.Exec( 203 + if _, err := tx.Exec( 210 204 `INSERT INTO repo_keys (repo_did, signing_key, owner_did, repo_name, key_type) VALUES (?, ?, ?, ?, ?)`, 211 205 repoDid, signingKey, ownerDid, repoName, keyType, 212 206 ); err != nil { 213 207 return err 214 208 } 215 209 216 - _, err = tx.Exec( 210 + if _, err := tx.Exec( 217 211 `INSERT INTO repo_aliases (owner_did, rkey, repo_did, rev) 218 212 VALUES (?, ?, ?, '0_' || strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 219 213 ON CONFLICT(owner_did, rkey) DO NOTHING`, 220 214 ownerDid, repoName, repoDid, 221 - ) 222 - return err 215 + ); err != nil { 216 + return err 217 + } 218 + 219 + return tx.Commit() 223 220 } 224 221 225 222 func (d *DB) DeleteRepoKey(repoDid string) error { 226 - _, err := d.db.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid) 227 - return err 223 + tx, err := d.db.Begin() 224 + if err != nil { 225 + return err 226 + } 227 + defer tx.Rollback() 228 + 229 + if _, err := tx.Exec(`DELETE FROM repo_aliases WHERE repo_did = ?`, repoDid); err != nil { 230 + return err 231 + } 232 + 233 + if _, err := tx.Exec(`DELETE FROM repo_keys WHERE repo_did = ?`, repoDid); err != nil { 234 + return err 235 + } 236 + 237 + return tx.Commit() 228 238 } 229 239 230 240 func (d *DB) RepoDidExists(repoDid string) (bool, error) { ··· 238 248 err := d.db.QueryRow( 239 249 `SELECT repo_did FROM repo_aliases WHERE owner_did = ? AND rkey = ?`, 240 250 ownerDid, rkey, 251 + ).Scan(&repoDid) 252 + return repoDid, err 253 + } 254 + 255 + func (d *DB) GetRepoDidByName(ownerDid, repoName string) (string, error) { 256 + var repoDid string 257 + err := d.db.QueryRow( 258 + `SELECT repo_did FROM repo_keys WHERE owner_did = ? AND repo_name = ?`, 259 + ownerDid, repoName, 241 260 ).Scan(&repoDid) 242 261 return repoDid, err 243 262 }
-8
knotserver/db/repo_aliases.go
··· 25 25 return err 26 26 } 27 27 28 - func (d *DB) DeleteRepoAlias(ownerDid, rkey string) error { 29 - _, err := d.db.Exec( 30 - `delete from repo_aliases where owner_did = ? and rkey = ?`, 31 - ownerDid, rkey, 32 - ) 33 - return err 34 - } 35 - 36 28 func (d *DB) ResolveAlias(ownerDid, rkey string) (*RepoAlias, error) { 37 29 var a RepoAlias 38 30 err := d.db.QueryRow(
-3
knotserver/ingester.go
··· 499 499 } 500 500 501 501 if event.Commit.Operation == jmodels.CommitOperationDelete { 502 - if err := h.db.DeleteRepoAlias(event.Did, rkey); err != nil { 503 - l.Warn("failed to delete repo alias", "err", err) 504 - } 505 502 return nil 506 503 } 507 504
+41 -8
knotserver/ingester_repo_test.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "log/slog" 7 + "os" 8 + "path/filepath" 7 9 "sync" 8 10 "testing" 9 11 ··· 12 14 "tangled.org/core/knotserver/config" 13 15 "tangled.org/core/knotserver/db" 14 16 "tangled.org/core/log" 17 + "tangled.org/core/rbac" 15 18 ) 16 19 17 20 type logRecord struct { ··· 71 74 72 75 func newProcessRepoFixture(t *testing.T) (*Knot, context.Context, *capturingHandler) { 73 76 t.Helper() 74 - d := newTestKnotDB(t) 77 + scanPath := t.TempDir() 78 + dbPath := filepath.Join(scanPath, "knot.db") 79 + d, err := db.Setup(context.Background(), dbPath) 80 + if err != nil { 81 + t.Fatalf("db.Setup: %v", err) 82 + } 83 + 84 + e, err := rbac.NewEnforcer(dbPath) 85 + if err != nil { 86 + t.Fatalf("rbac.NewEnforcer: %v", err) 87 + } 88 + if err := e.AddKnot(rbac.ThisServer); err != nil { 89 + t.Fatalf("AddKnot: %v", err) 90 + } 91 + 75 92 cap := newCapturingHandler() 76 93 l := slog.New(cap) 77 94 ctx := log.IntoContext(context.Background(), l) 78 95 79 96 c := &config.Config{ 80 97 Server: config.Server{Hostname: "knot.example"}, 98 + Repo: config.Repo{ScanPath: scanPath}, 81 99 } 82 100 return &Knot{ 83 101 c: c, 84 102 db: d, 103 + e: e, 85 104 l: l, 86 105 }, ctx, cap 87 106 } ··· 135 154 } 136 155 } 137 156 138 - func TestProcessRepo_DeleteRemovesAlias(t *testing.T) { 157 + func TestProcessRepo_DeleteIsNoOp(t *testing.T) { 139 158 h, ctx, _ := newProcessRepoFixture(t) 140 159 if err := h.db.StoreRepoKey("did:plc:repo1", []byte("k"), "did:plc:akshay", "foo"); err != nil { 141 160 t.Fatalf("StoreRepoKey: %v", err) ··· 145 164 }); err != nil { 146 165 t.Fatalf("UpsertRepoAlias: %v", err) 147 166 } 167 + if err := h.e.AddRepo("did:plc:akshay", rbac.ThisServer, "did:plc:repo1"); err != nil { 168 + t.Fatalf("AddRepo rbac: %v", err) 169 + } 170 + repoPath := filepath.Join(h.c.Repo.ScanPath, "did:plc:repo1") 171 + if err := os.MkdirAll(repoPath, 0o755); err != nil { 172 + t.Fatalf("MkdirAll: %v", err) 173 + } 148 174 149 175 ev := repoEvent(t, "did:plc:akshay", "bar", "3laaaaaaaaaac", tangled.Repo{}, jsmodels.CommitOperationDelete) 150 176 if err := h.processRepo(ctx, ev); err != nil { 151 177 t.Fatalf("processRepo: %v", err) 152 178 } 153 179 154 - if _, err := h.db.GetRepoDid("did:plc:akshay", "bar"); err == nil { 155 - t.Errorf("bar alias should have been deleted") 180 + if got, err := h.db.GetRepoDid("did:plc:akshay", "bar"); err != nil || got != "did:plc:repo1" { 181 + t.Errorf("bar alias should be untouched by firehose delete: got (%q, %v)", got, err) 156 182 } 157 - 158 - _, current, _ := h.db.CurrentRkey("did:plc:repo1") 159 - if current != "foo" { 160 - t.Errorf("current rkey after delete = %q, want foo", current) 183 + if got, err := h.db.GetRepoDid("did:plc:akshay", "foo"); err != nil || got != "did:plc:repo1" { 184 + t.Errorf("foo alias should be untouched by firehose delete: got (%q, %v)", got, err) 185 + } 186 + if exists, _ := h.db.RepoDidExists("did:plc:repo1"); !exists { 187 + t.Errorf("repo_keys row should be untouched by firehose delete") 188 + } 189 + if _, err := os.Stat(repoPath); err != nil { 190 + t.Errorf("repo dir should be untouched by firehose delete: %v", err) 191 + } 192 + if allowed, _ := h.e.IsRepoDeleteAllowed("did:plc:akshay", rbac.ThisServer, "did:plc:repo1"); !allowed { 193 + t.Errorf("rbac policies should be untouched by firehose delete") 161 194 } 162 195 } 163 196
+39 -1
knotserver/xrpc/create_repo.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 6 7 "errors" 7 8 "fmt" ··· 103 104 return 104 105 105 106 default: 107 + removeOrphan := func(orphanDid string) error { 108 + orphanPath, _ := securejoin.SecureJoin(h.Config.Repo.ScanPath, orphanDid) 109 + if rmErr := os.RemoveAll(orphanPath); rmErr != nil { 110 + l.Warn("failed to remove orphan repo directory", "path", orphanPath, "error", rmErr.Error()) 111 + } 112 + if rbacErr := h.Enforcer.RemoveRepo(actorDid.String(), rbac.ThisServer, orphanDid); rbacErr != nil { 113 + l.Warn("failed to remove orphan rbac entry", "repoDid", orphanDid, "error", rbacErr.Error()) 114 + } 115 + return h.Db.DeleteRepoKey(orphanDid) 116 + } 117 + 106 118 existingDid, dbErr := h.Db.GetRepoDid(actorDid.String(), repoName) 119 + if dbErr != nil && !errors.Is(dbErr, sql.ErrNoRows) { 120 + l.Error("failed to look up repo alias", "error", dbErr.Error()) 121 + writeError(w, xrpcerr.GenericError(dbErr), http.StatusInternalServerError) 122 + return 123 + } 107 124 if dbErr == nil && existingDid != "" { 108 125 didRepoPath, _ := securejoin.SecureJoin(h.Config.Repo.ScanPath, existingDid) 109 126 if _, statErr := os.Stat(didRepoPath); statErr == nil { ··· 113 130 return 114 131 } 115 132 l.Warn("stale repo key found without directory, cleaning up", "repoDid", existingDid) 116 - if delErr := h.Db.DeleteRepoKey(existingDid); delErr != nil { 133 + if delErr := removeOrphan(existingDid); delErr != nil { 117 134 l.Error("failed to clean up stale repo key", "repoDid", existingDid, "error", delErr.Error()) 118 135 writeError(w, xrpcerr.GenericError(fmt.Errorf("failed to clean up stale state, retry later")), http.StatusInternalServerError) 119 136 return 137 + } 138 + } else { 139 + orphanDid, lookupErr := h.Db.GetRepoDidByName(actorDid.String(), repoName) 140 + if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) { 141 + l.Error("failed to look up orphan repo key", "error", lookupErr.Error()) 142 + writeError(w, xrpcerr.GenericError(lookupErr), http.StatusInternalServerError) 143 + return 144 + } 145 + if lookupErr == nil && orphanDid != "" { 146 + orphanPath, _ := securejoin.SecureJoin(h.Config.Repo.ScanPath, orphanDid) 147 + if _, statErr := os.Stat(orphanPath); statErr == nil { 148 + l.Error("orphan repo_keys row but directory present, refusing to overwrite", "repoDid", orphanDid) 149 + writeError(w, xrpcerr.GenericError(fmt.Errorf("repository %q is in an inconsistent state, contact a knot admin", repoName)), http.StatusConflict) 150 + return 151 + } 152 + l.Warn("orphan repo_keys row without alias, cleaning up", "repoDid", orphanDid) 153 + if delErr := removeOrphan(orphanDid); delErr != nil { 154 + l.Error("failed to clean up orphan repo key", "repoDid", orphanDid, "error", delErr.Error()) 155 + writeError(w, xrpcerr.GenericError(fmt.Errorf("failed to clean up orphan state, retry later")), http.StatusInternalServerError) 156 + return 157 + } 120 158 } 121 159 } 122 160
+27 -14
knotserver/xrpc/delete_repo.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "database/sql" 4 5 "encoding/json" 6 + "errors" 5 7 "fmt" 6 8 "net/http" 7 9 "os" ··· 9 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 12 "github.com/bluesky-social/indigo/atproto/syntax" 11 13 "github.com/bluesky-social/indigo/xrpc" 14 + securejoin "github.com/cyphar/filepath-securejoin" 12 15 "tangled.org/core/api/tangled" 13 16 "tangled.org/core/rbac" 14 17 xrpcerr "tangled.org/core/xrpc/errors" ··· 60 63 } 61 64 62 65 repoDid, err := x.Db.GetRepoDid(did, name) 66 + if errors.Is(err, sql.ErrNoRows) { 67 + repoDid, err = x.Db.GetRepoDidByName(did, name) 68 + if errors.Is(err, sql.ErrNoRows) { 69 + l.Info("repo already torn down or not found", "did", did, "name", name) 70 + w.WriteHeader(http.StatusOK) 71 + return 72 + } 73 + } 63 74 if err != nil { 64 - fail(xrpcerr.RepoNotFoundError) 75 + l.Error("failed to look up repo", "error", err.Error()) 76 + writeError(w, xrpcerr.GenericError(err), http.StatusInternalServerError) 65 77 return 66 78 } 67 - repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repoDid) 68 - if err != nil { 69 - fail(xrpcerr.RepoNotFoundError) 79 + 80 + repoPath, joinErr := securejoin.SecureJoin(x.Config.Repo.ScanPath, repoDid) 81 + if joinErr != nil { 82 + fail(xrpcerr.GenericError(joinErr)) 70 83 return 71 84 } 72 85 ··· 80 93 return 81 94 } 82 95 83 - err = os.RemoveAll(repoPath) 84 - if err != nil { 85 - l.Error("deleting repo", "error", err.Error()) 86 - writeError(w, xrpcerr.GenericError(err), http.StatusInternalServerError) 96 + if rmErr := os.RemoveAll(repoPath); rmErr != nil { 97 + l.Error("deleting repo", "error", rmErr.Error()) 98 + writeError(w, xrpcerr.GenericError(rmErr), http.StatusInternalServerError) 87 99 return 88 100 } 89 101 90 - err = x.Enforcer.RemoveRepo(did, rbac.ThisServer, repoDid) 91 - if err != nil { 92 - l.Error("failed to delete repo from enforcer", "error", err.Error()) 93 - writeError(w, xrpcerr.GenericError(err), http.StatusInternalServerError) 102 + if rbacErr := x.Enforcer.RemoveRepo(did, rbac.ThisServer, repoDid); rbacErr != nil { 103 + l.Error("failed to delete repo from enforcer", "error", rbacErr.Error()) 104 + writeError(w, xrpcerr.GenericError(rbacErr), http.StatusInternalServerError) 94 105 return 95 106 } 96 107 97 - if err := x.Db.DeleteRepoKey(repoDid); err != nil { 98 - l.Error("failed to delete repo key", "error", err.Error()) 108 + if delErr := x.Db.DeleteRepoKey(repoDid); delErr != nil { 109 + l.Error("failed to delete repo key", "error", delErr.Error()) 110 + writeError(w, xrpcerr.GenericError(delErr), http.StatusInternalServerError) 111 + return 99 112 } 100 113 101 114 w.WriteHeader(http.StatusOK)