Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver/pubkeys: dedup in the first place

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 22, 2026, 1:19 PM +0300) commit ac52b1e6 parent fdbbbbfb change-id rnprkmrv
+541 -74
+21
knotserver/db/db.go
··· 306 306 return nil, err 307 307 } 308 308 309 + if err := orm.RunMigration(conn, logger, "add-rkey-to-public-keys", func(tx *sql.Tx) error { 310 + _, mErr := tx.ExecContext(ctx, `ALTER TABLE public_keys ADD COLUMN rkey TEXT`) 311 + return mErr 312 + }); err != nil { 313 + return nil, err 314 + } 315 + 316 + if err := orm.RunMigration(conn, logger, "enforce-global-key-uniqueness", func(tx *sql.Tx) error { 317 + res, mErr := tx.ExecContext(ctx, `delete from public_keys where id not in (select min(id) from public_keys group by key)`) 318 + if mErr != nil { 319 + return mErr 320 + } 321 + if n, rErr := res.RowsAffected(); rErr == nil && n > 0 { 322 + logger.Warn("dropped duplicate public keys to enforce global key uniqueness", "deleted", n) 323 + } 324 + _, mErr = tx.ExecContext(ctx, `create unique index if not exists idx_public_keys_key on public_keys(key)`) 325 + return mErr 326 + }); err != nil { 327 + return nil, err 328 + } 329 + 309 330 return &DB{ 310 331 db: db, 311 332 logger: logger,
+74 -42
knotserver/db/pubkeys.go
··· 1 1 package db 2 2 3 3 import ( 4 + "database/sql" 5 + "log/slog" 4 6 "strconv" 5 7 "time" 6 8 9 + "github.com/bluesky-social/indigo/atproto/syntax" 7 10 "tangled.org/core/api/tangled" 8 11 ) 9 12 10 13 type PublicKey struct { 11 - Did string 14 + Did syntax.DID 15 + Rkey syntax.RecordKey 12 16 tangled.PublicKey 13 17 } 14 18 15 - func (d *DB) AddPublicKeyFromRecord(did string, recordIface map[string]interface{}) error { 16 - record := make(map[string]string) 17 - for k, v := range recordIface { 18 - if str, ok := v.(string); ok { 19 - record[k] = str 19 + func (d *DB) UpsertPublicKey(pk PublicKey) error { 20 + tx, err := d.db.Begin() 21 + if err != nil { 22 + return err 23 + } 24 + defer tx.Rollback() 25 + 26 + if pk.Rkey != "" { 27 + if _, err := tx.Exec(`delete from public_keys where did = ? and rkey = ?`, pk.Did, pk.Rkey); err != nil { 28 + return err 20 29 } 21 30 } 22 31 23 - pk := PublicKey{ 24 - Did: did, 32 + if err := insertPublicKey(tx, d.logger, pk); err != nil { 33 + return err 25 34 } 26 - pk.Key = record["key"] 27 - pk.CreatedAt = record["createdAt"] 28 35 29 - return d.AddPublicKey(pk) 36 + return tx.Commit() 30 37 } 31 38 32 - func (d *DB) AddPublicKey(pk PublicKey) error { 39 + func insertPublicKey(tx *sql.Tx, logger *slog.Logger, pk PublicKey) error { 40 + if pk.Key == "" { 41 + logger.Warn("skipping public key with empty key value", "did", pk.Did, "rkey", pk.Rkey) 42 + return nil 43 + } 44 + 33 45 if pk.CreatedAt == "" { 34 46 pk.CreatedAt = time.Now().Format(time.RFC3339) 35 47 } 36 48 37 - query := `insert or ignore into public_keys (did, key, created) values (?, ?, ?)` 38 - _, err := d.db.Exec(query, pk.Did, pk.Key, pk.CreatedAt) 39 - return err 49 + res, err := tx.Exec( 50 + `insert or ignore into public_keys (did, key, rkey, created) values (?, ?, ?, ?)`, 51 + pk.Did, pk.Key, pk.Rkey, pk.CreatedAt, 52 + ) 53 + if err != nil { 54 + return err 55 + } 56 + 57 + if rows, err := res.RowsAffected(); err == nil && rows == 0 { 58 + logger.Warn("public key not stored, already registered to another did", "did", pk.Did, "rkey", pk.Rkey) 59 + } 60 + 61 + return nil 40 62 } 41 63 42 - func (d *DB) RemovePublicKey(did string) error { 43 - query := `delete from public_keys where did = ?` 44 - _, err := d.db.Exec(query, did) 64 + func (d *DB) DeletePublicKeyByRkey(did syntax.DID, rkey syntax.RecordKey) error { 65 + if rkey == "" { 66 + return nil 67 + } 68 + 69 + query := `delete from public_keys where did = ? and rkey = ?` 70 + _, err := d.db.Exec(query, did, rkey) 45 71 return err 46 72 } 47 73 48 - func (pk *PublicKey) JSON() map[string]any { 49 - return map[string]any{ 50 - "did": pk.Did, 51 - "key": pk.Key, 52 - "createdAt": pk.CreatedAt, 74 + func (d *DB) ReplacePublicKeys(did syntax.DID, keys []PublicKey) error { 75 + tx, err := d.db.Begin() 76 + if err != nil { 77 + return err 53 78 } 54 - } 79 + defer tx.Rollback() 55 80 56 - func (d *DB) GetAllPublicKeys() ([]PublicKey, error) { 57 - var keys []PublicKey 81 + if _, err := tx.Exec(`delete from public_keys where did = ?`, did); err != nil { 82 + return err 83 + } 58 84 59 - rows, err := d.db.Query(`select key, did, created from public_keys`) 60 - if err != nil { 61 - return nil, err 85 + if err := insertPublicKeys(tx, d.logger, keys); err != nil { 86 + return err 62 87 } 63 - defer rows.Close() 64 88 65 - for rows.Next() { 66 - var publicKey PublicKey 67 - if err := rows.Scan(&publicKey.Key, &publicKey.Did, &publicKey.CreatedAt); err != nil { 68 - return nil, err 69 - } 70 - keys = append(keys, publicKey) 89 + return tx.Commit() 90 + } 91 + 92 + func insertPublicKeys(tx *sql.Tx, logger *slog.Logger, keys []PublicKey) error { 93 + if len(keys) == 0 { 94 + return nil 71 95 } 72 96 73 - if err := rows.Err(); err != nil { 74 - return nil, err 97 + if err := insertPublicKey(tx, logger, keys[0]); err != nil { 98 + return err 75 99 } 76 100 77 - return keys, nil 101 + return insertPublicKeys(tx, logger, keys[1:]) 102 + } 103 + 104 + func (pk *PublicKey) JSON() map[string]any { 105 + return map[string]any{ 106 + "did": pk.Did, 107 + "key": pk.Key, 108 + "createdAt": pk.CreatedAt, 109 + } 78 110 } 79 111 80 - func (d *DB) GetPublicKeys(did string) ([]PublicKey, error) { 112 + func (d *DB) GetAllPublicKeys() ([]PublicKey, error) { 81 113 var keys []PublicKey 82 114 83 - rows, err := d.db.Query(`select did, key, created from public_keys where did = ?`, did) 115 + rows, err := d.db.Query(`select key, did, created from public_keys`) 84 116 if err != nil { 85 117 return nil, err 86 118 } ··· 88 120 89 121 for rows.Next() { 90 122 var publicKey PublicKey 91 - if err := rows.Scan(&publicKey.Did, &publicKey.Key, &publicKey.CreatedAt); err != nil { 123 + if err := rows.Scan(&publicKey.Key, &publicKey.Did, &publicKey.CreatedAt); err != nil { 92 124 return nil, err 93 125 } 94 126 keys = append(keys, publicKey)
+136
knotserver/db/pubkeys_test.go
··· 1 + package db 2 + 3 + import ( 4 + "testing" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 + ) 9 + 10 + const ( 11 + didBoltless = "did:plc:boltless" 12 + didAkshay = "did:plc:akshay" 13 + keyShared = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAISharedSharedSharedSharedSharedSharedShar01" 14 + keyRotated = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIRotatedRotatedRotatedRotatedRotatedRot02" 15 + keyOther = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIOtherOtherOtherOtherOtherOtherOtherOth03" 16 + ) 17 + 18 + func TestUpsertPublicKey_GlobalUniqueness(t *testing.T) { 19 + d := newTestDB(t) 20 + addDid(t, d, didBoltless) 21 + addDid(t, d, didAkshay) 22 + 23 + if err := d.UpsertPublicKey(pubKey(didBoltless, "r-first", keyShared)); err != nil { 24 + t.Fatalf("first upsert: %v", err) 25 + } 26 + if err := d.UpsertPublicKey(pubKey(didAkshay, "r-second", keyShared)); err != nil { 27 + t.Fatalf("second upsert: %v", err) 28 + } 29 + 30 + owners := ownersByKey(t, d) 31 + if got := len(owners); got != 1 { 32 + t.Fatalf("stored %d copies of the key, want 1, owners=%v", got, owners) 33 + } 34 + if owners[keyShared] != didBoltless { 35 + t.Errorf("key registered to %q, want %q (first writer keeps it)", owners[keyShared], didBoltless) 36 + } 37 + } 38 + 39 + func TestUpsertPublicKey_RotatesAtSameRkey(t *testing.T) { 40 + d := newTestDB(t) 41 + addDid(t, d, didBoltless) 42 + 43 + if err := d.UpsertPublicKey(pubKey(didBoltless, "rotate", keyShared)); err != nil { 44 + t.Fatalf("upsert old: %v", err) 45 + } 46 + if err := d.UpsertPublicKey(pubKey(didBoltless, "rotate", keyRotated)); err != nil { 47 + t.Fatalf("upsert new: %v", err) 48 + } 49 + 50 + owners := ownersByKey(t, d) 51 + if _, ok := owners[keyShared]; ok { 52 + t.Errorf("old key %q survived rotation at the same rkey", keyShared) 53 + } 54 + if _, ok := owners[keyRotated]; !ok { 55 + t.Errorf("rotated key %q not stored", keyRotated) 56 + } 57 + if got := len(owners); got != 1 { 58 + t.Errorf("stored %d keys, want 1", got) 59 + } 60 + } 61 + 62 + func TestDeletePublicKeyByRkey(t *testing.T) { 63 + d := newTestDB(t) 64 + addDid(t, d, didBoltless) 65 + 66 + if err := d.UpsertPublicKey(pubKey(didBoltless, "keep", keyShared)); err != nil { 67 + t.Fatalf("upsert keep: %v", err) 68 + } 69 + if err := d.UpsertPublicKey(pubKey(didBoltless, "drop", keyOther)); err != nil { 70 + t.Fatalf("upsert drop: %v", err) 71 + } 72 + 73 + if err := d.DeletePublicKeyByRkey(didBoltless, ""); err != nil { 74 + t.Fatalf("delete with empty rkey: %v", err) 75 + } 76 + if got := len(ownersByKey(t, d)); got != 2 { 77 + t.Fatalf("empty rkey deleted %d rows, want a no-op leaving 2", 2-got) 78 + } 79 + 80 + if err := d.DeletePublicKeyByRkey(didBoltless, "drop"); err != nil { 81 + t.Fatalf("delete drop: %v", err) 82 + } 83 + 84 + owners := ownersByKey(t, d) 85 + if _, ok := owners[keyOther]; ok { 86 + t.Errorf("key at rkey %q was not deleted", "drop") 87 + } 88 + if _, ok := owners[keyShared]; !ok { 89 + t.Errorf("delete removed the wrong key, %q is gone", keyShared) 90 + } 91 + } 92 + 93 + func TestInsertPublicKey_SkipsEmptyKey(t *testing.T) { 94 + d := newTestDB(t) 95 + addDid(t, d, didBoltless) 96 + 97 + if err := d.UpsertPublicKey(pubKey(didBoltless, "empty", "")); err != nil { 98 + t.Fatalf("upsert empty key: %v", err) 99 + } 100 + 101 + if got := len(ownersByKey(t, d)); got != 0 { 102 + t.Errorf("stored %d rows for an empty key, want 0", got) 103 + } 104 + } 105 + 106 + func addDid(t *testing.T, d *DB, did string) { 107 + t.Helper() 108 + if err := AddDid(d, did); err != nil { 109 + t.Fatalf("AddDid(%q): %v", did, err) 110 + } 111 + } 112 + 113 + func pubKey(did syntax.DID, rkey syntax.RecordKey, key string) PublicKey { 114 + return PublicKey{ 115 + Did: did, 116 + Rkey: rkey, 117 + PublicKey: tangled.PublicKey{Key: key, CreatedAt: "2026-06-20T00:00:00Z"}, 118 + } 119 + } 120 + 121 + func ownersByKey(t *testing.T, d *DB) map[string]string { 122 + t.Helper() 123 + rows, err := d.GetAllPublicKeys() 124 + if err != nil { 125 + t.Fatalf("GetAllPublicKeys: %v", err) 126 + } 127 + return foldOwners(rows, map[string]string{}) 128 + } 129 + 130 + func foldOwners(rows []PublicKey, acc map[string]string) map[string]string { 131 + if len(rows) == 0 { 132 + return acc 133 + } 134 + acc[rows[0].Key] = rows[0].Did.String() 135 + return foldOwners(rows[1:], acc) 136 + }
+24 -15
knotserver/ingester.go
··· 26 26 ) 27 27 28 28 func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 29 - l := log.FromContext(ctx) 30 - raw := json.RawMessage(event.Commit.Record) 31 - did := event.Did 29 + l := log.FromContext(ctx).With("handler", "processPublicKey", "did", event.Did, "rkey", event.Commit.RKey) 30 + did := syntax.DID(event.Did) 31 + rkey := syntax.RecordKey(event.Commit.RKey) 32 32 33 - var record tangled.PublicKey 34 - if err := json.Unmarshal(raw, &record); err != nil { 35 - return fmt.Errorf("failed to unmarshal record: %w", err) 33 + switch event.Commit.Operation { 34 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 35 + var record tangled.PublicKey 36 + if err := json.Unmarshal(json.RawMessage(event.Commit.Record), &record); err != nil { 37 + return fmt.Errorf("failed to unmarshal record: %w", err) 38 + } 39 + 40 + pk := db.PublicKey{ 41 + Did: did, 42 + Rkey: rkey, 43 + PublicKey: record, 44 + } 45 + if err := h.db.UpsertPublicKey(pk); err != nil { 46 + return fmt.Errorf("failed to upsert public key: %w", err) 47 + } 48 + l.Info("upserted public key from firehose") 49 + case jmodels.CommitOperationDelete: 50 + if err := h.db.DeletePublicKeyByRkey(did, rkey); err != nil { 51 + return fmt.Errorf("failed to delete public key: %w", err) 52 + } 53 + l.Info("deleted public key from firehose") 36 54 } 37 55 38 - pk := db.PublicKey{ 39 - Did: did, 40 - PublicKey: record, 41 - } 42 - if err := h.db.AddPublicKey(pk); err != nil { 43 - l.Error("failed to add public key", "error", err) 44 - return fmt.Errorf("failed to add public key: %w", err) 45 - } 46 - l.Info("added public key from firehose", "did", did) 47 56 return nil 48 57 } 49 58
+83 -14
knotserver/keys/keys.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "log/slog" 6 7 7 8 comatproto "github.com/bluesky-social/indigo/api/atproto" 8 9 "github.com/bluesky-social/indigo/atproto/identity" ··· 13 14 "tangled.org/core/log" 14 15 ) 15 16 16 - func FetchAndStore(ctx context.Context, dir identity.Directory, store *db.DB, did string) error { 17 + const ( 18 + publicKeyPageSize = 100 19 + maxPublicKeyPages = 20 20 + ) 21 + 22 + func FetchAndStore(ctx context.Context, dir identity.Directory, store *db.DB, did syntax.DID) error { 17 23 l := log.FromContext(ctx) 18 24 19 - id, err := dir.LookupDID(ctx, syntax.DID(did)) 25 + id, err := dir.LookupDID(ctx, did) 20 26 if err != nil { 21 27 return fmt.Errorf("lookup did to fetch keys: %w", err) 22 28 } ··· 28 34 } 29 35 30 36 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL} 31 - resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.PublicKeyNSID, "", 50, did, false) 37 + records, err := listAllPublicKeys(ctx, l, &xrpcc, did, "", maxPublicKeyPages) 32 38 if err != nil { 33 39 return fmt.Errorf("fetching public keys for did: %w", err) 34 40 } 35 41 36 - for _, record := range resp.Records { 37 - if record == nil { 38 - continue 39 - } 40 - key, ok := record.Value.Val.(*tangled.PublicKey) 41 - if !ok || key == nil { 42 - continue 43 - } 44 - if err := store.AddPublicKey(db.PublicKey{Did: did, PublicKey: *key}); err != nil { 45 - return fmt.Errorf("adding public key to db: %w", err) 46 - } 42 + keys := collectPublicKeys(l, did, records) 43 + if len(keys) == 0 { 44 + l.Warn("no public keys fetched, skipping replace so existing keys are not wiped by a transient empty response", "did", did) 45 + return nil 46 + } 47 + 48 + if err := store.ReplacePublicKeys(did, keys); err != nil { 49 + return fmt.Errorf("replacing public keys in db: %w", err) 47 50 } 48 51 return nil 49 52 } 53 + 54 + func listAllPublicKeys(ctx context.Context, l *slog.Logger, xrpcc *indigoxrpc.Client, did syntax.DID, cursor string, pagesLeft int) ([]*comatproto.RepoListRecords_Record, error) { 55 + if pagesLeft <= 0 { 56 + l.Warn("public key pagination hit page cap, remaining keys ignored", "did", did, "cap", maxPublicKeyPages) 57 + return nil, nil 58 + } 59 + 60 + resp, err := comatproto.RepoListRecords(ctx, xrpcc, tangled.PublicKeyNSID, cursor, publicKeyPageSize, did.String(), false) 61 + if err != nil { 62 + return nil, err 63 + } 64 + 65 + if resp.Cursor == nil || *resp.Cursor == "" || len(resp.Records) == 0 { 66 + return resp.Records, nil 67 + } 68 + 69 + rest, err := listAllPublicKeys(ctx, l, xrpcc, did, *resp.Cursor, pagesLeft-1) 70 + if err != nil { 71 + return nil, err 72 + } 73 + 74 + return append(resp.Records, rest...), nil 75 + } 76 + 77 + func collectPublicKeys(l *slog.Logger, did syntax.DID, records []*comatproto.RepoListRecords_Record) []db.PublicKey { 78 + return collectPublicKeysInto(l, did, records, nil) 79 + } 80 + 81 + func collectPublicKeysInto(l *slog.Logger, did syntax.DID, records []*comatproto.RepoListRecords_Record, acc []db.PublicKey) []db.PublicKey { 82 + if len(records) == 0 { 83 + return acc 84 + } 85 + 86 + return collectPublicKeysInto(l, did, records[1:], appendValidKey(l, did, acc, records[0])) 87 + } 88 + 89 + func appendValidKey(l *slog.Logger, did syntax.DID, acc []db.PublicKey, record *comatproto.RepoListRecords_Record) []db.PublicKey { 90 + if record == nil { 91 + return acc 92 + } 93 + 94 + key, ok := record.Value.Val.(*tangled.PublicKey) 95 + if !ok || key == nil { 96 + return acc 97 + } 98 + 99 + rkey, err := recordKeyFromURI(record.Uri) 100 + if err != nil { 101 + l.Warn("skipping public key with unparseable uri", "uri", record.Uri, "err", err) 102 + return acc 103 + } 104 + 105 + return append(acc, db.PublicKey{ 106 + Did: did, 107 + Rkey: rkey, 108 + PublicKey: *key, 109 + }) 110 + } 111 + 112 + func recordKeyFromURI(uri string) (syntax.RecordKey, error) { 113 + aturi, err := syntax.ParseATURI(uri) 114 + if err != nil { 115 + return "", err 116 + } 117 + return aturi.RecordKey(), nil 118 + }
+199
knotserver/keys/keys_test.go
··· 1 + package keys 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "net/http" 8 + "net/http/httptest" 9 + "path/filepath" 10 + "testing" 11 + 12 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 + "github.com/bluesky-social/indigo/atproto/identity" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + lexutil "github.com/bluesky-social/indigo/lex/util" 16 + "tangled.org/core/api/tangled" 17 + "tangled.org/core/knotserver/db" 18 + ) 19 + 20 + const ( 21 + didBoltless = "did:plc:boltless" 22 + keyAlpha = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIAlphaAlphaAlphaAlphaAlphaAlphaAlphaAlpha01" 23 + keyBravo = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIABravoBravoBravoBravoBravoBravoBravoBravo02" 24 + ) 25 + 26 + func TestFetchAndStore_EmptyResponseDoesNotWipe(t *testing.T) { 27 + store := newKeyStore(t) 28 + seedKey(t, store, didBoltless, "seed", keyAlpha) 29 + 30 + srv := pdsServer(t, map[string]*comatproto.RepoListRecords_Output{ 31 + "": page(""), 32 + }) 33 + defer srv.Close() 34 + 35 + if err := FetchAndStore(context.Background(), fakeDirectory{pdsURL: srv.URL}, store, didBoltless); err != nil { 36 + t.Fatalf("FetchAndStore: %v", err) 37 + } 38 + 39 + owners := ownersByKey(t, store) 40 + if _, ok := owners[keyAlpha]; !ok { 41 + t.Fatalf("seeded key was wiped by an empty fetch response, owners=%v", owners) 42 + } 43 + if got := len(owners); got != 1 { 44 + t.Errorf("stored %d keys, want 1", got) 45 + } 46 + } 47 + 48 + func TestFetchAndStore_ReplacesExistingKeys(t *testing.T) { 49 + store := newKeyStore(t) 50 + seedKey(t, store, didBoltless, "stale", keyAlpha) 51 + 52 + srv := pdsServer(t, map[string]*comatproto.RepoListRecords_Output{ 53 + "": page("", pubkeyRecord(didBoltless, "fresh", keyBravo)), 54 + }) 55 + defer srv.Close() 56 + 57 + if err := FetchAndStore(context.Background(), fakeDirectory{pdsURL: srv.URL}, store, didBoltless); err != nil { 58 + t.Fatalf("FetchAndStore: %v", err) 59 + } 60 + 61 + owners := ownersByKey(t, store) 62 + if _, ok := owners[keyAlpha]; ok { 63 + t.Errorf("stale key %q survived a full replace", keyAlpha) 64 + } 65 + if _, ok := owners[keyBravo]; !ok { 66 + t.Errorf("fresh key %q was not stored", keyBravo) 67 + } 68 + if got := len(owners); got != 1 { 69 + t.Errorf("stored %d keys, want 1", got) 70 + } 71 + } 72 + 73 + func TestFetchAndStore_PaginatesAcrossPages(t *testing.T) { 74 + store := newKeyStore(t) 75 + if err := db.AddDid(store, didBoltless); err != nil { 76 + t.Fatalf("AddDid: %v", err) 77 + } 78 + 79 + srv := pdsServer(t, map[string]*comatproto.RepoListRecords_Output{ 80 + "": page("next", pubkeyRecord(didBoltless, "r1", keyAlpha)), 81 + "next": page("", pubkeyRecord(didBoltless, "r2", keyBravo)), 82 + }) 83 + defer srv.Close() 84 + 85 + if err := FetchAndStore(context.Background(), fakeDirectory{pdsURL: srv.URL}, store, didBoltless); err != nil { 86 + t.Fatalf("FetchAndStore: %v", err) 87 + } 88 + 89 + owners := ownersByKey(t, store) 90 + if _, ok := owners[keyAlpha]; !ok { 91 + t.Errorf("first-page key %q missing", keyAlpha) 92 + } 93 + if _, ok := owners[keyBravo]; !ok { 94 + t.Errorf("second-page key %q missing", keyBravo) 95 + } 96 + if got := len(owners); got != 2 { 97 + t.Errorf("stored %d keys, want 2", got) 98 + } 99 + } 100 + 101 + func newKeyStore(t *testing.T) *db.DB { 102 + t.Helper() 103 + store, err := db.Setup(context.Background(), filepath.Join(t.TempDir(), "test.db")) 104 + if err != nil { 105 + t.Fatalf("db.Setup: %v", err) 106 + } 107 + return store 108 + } 109 + 110 + func seedKey(t *testing.T, store *db.DB, did syntax.DID, rkey syntax.RecordKey, key string) { 111 + t.Helper() 112 + if err := db.AddDid(store, did.String()); err != nil { 113 + t.Fatalf("AddDid: %v", err) 114 + } 115 + if err := store.UpsertPublicKey(db.PublicKey{ 116 + Did: did, 117 + Rkey: rkey, 118 + PublicKey: tangled.PublicKey{Key: key, CreatedAt: "2026-06-20T00:00:00Z"}, 119 + }); err != nil { 120 + t.Fatalf("UpsertPublicKey: %v", err) 121 + } 122 + } 123 + 124 + func ownersByKey(t *testing.T, store *db.DB) map[string]string { 125 + t.Helper() 126 + rows, err := store.GetAllPublicKeys() 127 + if err != nil { 128 + t.Fatalf("GetAllPublicKeys: %v", err) 129 + } 130 + return foldOwners(rows, map[string]string{}) 131 + } 132 + 133 + func foldOwners(rows []db.PublicKey, acc map[string]string) map[string]string { 134 + if len(rows) == 0 { 135 + return acc 136 + } 137 + acc[rows[0].Key] = rows[0].Did.String() 138 + return foldOwners(rows[1:], acc) 139 + } 140 + 141 + func page(cursor string, records ...*comatproto.RepoListRecords_Record) *comatproto.RepoListRecords_Output { 142 + out := &comatproto.RepoListRecords_Output{Records: records} 143 + if cursor != "" { 144 + out.Cursor = &cursor 145 + } 146 + return out 147 + } 148 + 149 + func pubkeyRecord(did, rkey, key string) *comatproto.RepoListRecords_Record { 150 + return &comatproto.RepoListRecords_Record{ 151 + Uri: "at://" + did + "/" + tangled.PublicKeyNSID + "/" + rkey, 152 + Cid: "bafyreib2rxk3rybk3aobmv5cjuql3bm2twh4jo5uxgr3rxbd3xrnp5z5cy", 153 + Value: &lexutil.LexiconTypeDecoder{ 154 + Val: &tangled.PublicKey{Key: key, CreatedAt: "2026-06-20T00:00:00Z"}, 155 + }, 156 + } 157 + } 158 + 159 + func pdsServer(t *testing.T, pages map[string]*comatproto.RepoListRecords_Output) *httptest.Server { 160 + t.Helper() 161 + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 162 + cursor := r.URL.Query().Get("cursor") 163 + out, ok := pages[cursor] 164 + if !ok { 165 + t.Errorf("listRecords requested with unexpected cursor %q", cursor) 166 + http.Error(w, "no such page", http.StatusInternalServerError) 167 + return 168 + } 169 + w.Header().Set("Content-Type", "application/json") 170 + if err := json.NewEncoder(w).Encode(out); err != nil { 171 + t.Errorf("encoding listRecords response: %v", err) 172 + } 173 + })) 174 + } 175 + 176 + type fakeDirectory struct { 177 + pdsURL string 178 + } 179 + 180 + func (f fakeDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { 181 + return &identity.Identity{ 182 + DID: did, 183 + Services: map[string]identity.ServiceEndpoint{ 184 + "atproto_pds": {Type: "AtprotoPersonalDataServer", URL: f.pdsURL}, 185 + }, 186 + }, nil 187 + } 188 + 189 + func (f fakeDirectory) LookupHandle(ctx context.Context, handle syntax.Handle) (*identity.Identity, error) { 190 + return nil, errors.New("LookupHandle unused in tests") 191 + } 192 + 193 + func (f fakeDirectory) Lookup(ctx context.Context, atid syntax.AtIdentifier) (*identity.Identity, error) { 194 + return nil, errors.New("Lookup unused in tests") 195 + } 196 + 197 + func (f fakeDirectory) Purge(ctx context.Context, atid syntax.AtIdentifier) error { 198 + return nil 199 + }
+2 -1
knotserver/router.go
··· 9 9 "strings" 10 10 "sync" 11 11 12 + "github.com/bluesky-social/indigo/atproto/syntax" 12 13 "github.com/go-chi/chi/v5" 13 14 "tangled.org/core/idresolver" 14 15 "tangled.org/core/jetstream" ··· 217 218 return fmt.Errorf("failed to add owner to RBAC: %w", err) 218 219 } 219 220 220 - err = keys.FetchAndStore(ctx, h.resolver.Directory(), h.db, cfgOwner) 221 + err = keys.FetchAndStore(ctx, h.resolver.Directory(), h.db, syntax.DID(cfgOwner)) 221 222 if err != nil { 222 223 h.l.Error("fetching and adding owners public keys", "error", err, "did", cfgOwner) 223 224 }
+1 -1
knotserver/xrpc/acl_saga.go
··· 188 188 kctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), keyFetchTimeout) 189 189 go func() { 190 190 defer cancel() 191 - if err := keys.FetchAndStore(kctx, h.Resolver.Directory(), h.Db, subject.String()); err != nil { 191 + if err := keys.FetchAndStore(kctx, h.Resolver.Directory(), h.Db, subject); err != nil { 192 192 l.Warn("failed to fetch subject public keys, continuing", "subject", subject, "error", err) 193 193 } 194 194 }()
+1 -1
knotserver/xrpc/list_keys.go
··· 31 31 publicKeys := make([]*tangled.KnotListKeys_PublicKey, 0, len(keys)) 32 32 for _, key := range keys { 33 33 publicKeys = append(publicKeys, &tangled.KnotListKeys_PublicKey{ 34 - Did: key.Did, 34 + Did: key.Did.String(), 35 35 Key: key.Key, 36 36 CreatedAt: key.CreatedAt, 37 37 })