Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

knotserver: backfill members from pds on startup

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (May 27, 2026, 10:34 AM +0300) commit 2959ca57 parent d6564738 change-id yyswynqx
+138 -2
+132
knotserver/backfill.go
··· 1 + package knotserver 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "time" 8 + 9 + comatproto "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/xrpc" 12 + 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/idresolver" 15 + "tangled.org/core/knotserver/db" 16 + "tangled.org/core/rbac" 17 + ) 18 + 19 + const ( 20 + knotMembersBackfillMigration = "backfill-knot-members-from-pds-v2" 21 + knotMembersBackfillPerOwner = 30 * time.Second 22 + ) 23 + 24 + func BackfillKnotMembers( 25 + ctx context.Context, 26 + d *db.DB, 27 + e *rbac.Enforcer, 28 + resolver *idresolver.Resolver, 29 + hostname string, 30 + logger *slog.Logger, 31 + ) error { 32 + l := logger.With("migration", knotMembersBackfillMigration) 33 + 34 + applied, err := d.IsMigrationApplied(knotMembersBackfillMigration) 35 + if err != nil { 36 + return fmt.Errorf("check migration applied: %w", err) 37 + } 38 + if applied { 39 + return nil 40 + } 41 + 42 + owners, err := e.GetKnotUsersByRole("server:owner", rbac.ThisServer) 43 + if err != nil { 44 + return fmt.Errorf("list owners: %w", err) 45 + } 46 + 47 + var rows []db.KnotMember 48 + for _, owner := range owners { 49 + ownerCtx, cancel := context.WithTimeout(ctx, knotMembersBackfillPerOwner) 50 + ownerRows, err := fetchOwnerKnotMembers(ownerCtx, resolver, hostname, owner, l) 51 + cancel() 52 + if err != nil { 53 + l.Warn("skipping owner during backfill", "owner", owner, "err", err) 54 + continue 55 + } 56 + rows = append(rows, ownerRows...) 57 + } 58 + 59 + for _, m := range rows { 60 + if err := e.AddKnotMember(rbac.ThisServer, m.Subject.String()); err != nil { 61 + return fmt.Errorf("grant ACL for %s: %w", m.Subject, err) 62 + } 63 + } 64 + 65 + if err := d.ApplyKnotMembersBackfill(ctx, rows, knotMembersBackfillMigration); err != nil { 66 + return fmt.Errorf("apply backfill: %w", err) 67 + } 68 + 69 + l.Info("backfilled knot members", "count", len(rows), "owners", len(owners)) 70 + return nil 71 + } 72 + 73 + func fetchOwnerKnotMembers( 74 + ctx context.Context, 75 + resolver *idresolver.Resolver, 76 + hostname string, 77 + owner string, 78 + l *slog.Logger, 79 + ) ([]db.KnotMember, error) { 80 + ownerDid, err := syntax.ParseDID(owner) 81 + if err != nil { 82 + return nil, fmt.Errorf("invalid owner DID %q: %w", owner, err) 83 + } 84 + 85 + ident, err := resolver.ResolveIdent(ctx, owner) 86 + if err != nil { 87 + return nil, fmt.Errorf("resolve %s: %w", owner, err) 88 + } 89 + client := &xrpc.Client{Host: ident.PDSEndpoint()} 90 + 91 + var ( 92 + rows []db.KnotMember 93 + cursor string 94 + ) 95 + for { 96 + out, err := comatproto.RepoListRecords(ctx, client, tangled.KnotMemberNSID, cursor, 100, owner, false) 97 + if err != nil { 98 + return nil, fmt.Errorf("list records: %w", err) 99 + } 100 + for _, rec := range out.Records { 101 + m, ok := rec.Value.Val.(*tangled.KnotMember) 102 + if !ok || m.Domain != hostname { 103 + continue 104 + } 105 + subject, err := syntax.ParseDID(m.Subject) 106 + if err != nil { 107 + l.Warn("invalid subject DID in record, skipping", "uri", rec.Uri, "err", err) 108 + continue 109 + } 110 + uri, err := syntax.ParseATURI(rec.Uri) 111 + if err != nil { 112 + l.Warn("invalid AT URI in record, skipping", "uri", rec.Uri, "err", err) 113 + continue 114 + } 115 + rkey := uri.RecordKey().String() 116 + if rkey == "" { 117 + l.Warn("empty rkey in AT URI, skipping", "uri", rec.Uri) 118 + continue 119 + } 120 + rows = append(rows, db.KnotMember{ 121 + Did: ownerDid, 122 + Rkey: rkey, 123 + Subject: subject, 124 + }) 125 + } 126 + if out.Cursor == nil || *out.Cursor == "" || *out.Cursor == cursor { 127 + break 128 + } 129 + cursor = *out.Cursor 130 + } 131 + return rows, nil 132 + }
+2 -2
knotserver/db/db.go
··· 19 19 logger *slog.Logger 20 20 } 21 21 22 - type Querier interface { 22 + type DBTX interface { 23 23 QueryRow(query string, args ...any) *sql.Row 24 24 Exec(query string, args ...any) (sql.Result, error) 25 25 } ··· 293 293 return GetRepoKeyOwner(d.db, repoDid) 294 294 } 295 295 296 - func GetRepoKeyOwner(q Querier, repoDid string) (ownerDid string, repoName string, err error) { 296 + func GetRepoKeyOwner(q DBTX, repoDid string) (ownerDid string, repoName string, err error) { 297 297 err = q.QueryRow( 298 298 `SELECT owner_did, rkey FROM repo_aliases 299 299 WHERE repo_did = ?
+4
knotserver/server.go
··· 93 93 94 94 resolver := idresolver.DefaultResolver(c.Server.PlcUrl) 95 95 96 + if err := BackfillKnotMembers(ctx, db, e, resolver, c.Server.Hostname, logger); err != nil { 97 + logger.Warn("knot members backfill failed, continuing", "err", err) 98 + } 99 + 96 100 go migrateReposOnStartup(ctx, c, db, e, &notifier, log.SubLogger(logger, "migrate")) 97 101 98 102 mux, err := Setup(ctx, c, db, e, jc, &notifier, resolver)