Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "time" 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/xrpc" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/idresolver" 15 "tangled.org/core/knotserver/db" 16 "tangled.org/core/rbac" 17) 18 19const ( 20 knotMembersBackfillMigration = "backfill-knot-members-from-pds-v2" 21 knotMembersBackfillPerOwner = 30 * time.Second 22) 23 24func BackfillKnotMembers( 25 ctx context.Context, 26 d *db.DB, 27 e *rbac.Enforcer, 28 resolver *idresolver.Resolver, 29 hostname string, 30 logger *slog.Logger, 31) error { 32 l := logger.With("migration", knotMembersBackfillMigration) 33 34 applied, err := d.IsMigrationApplied(knotMembersBackfillMigration) 35 if err != nil { 36 return fmt.Errorf("check migration applied: %w", err) 37 } 38 if applied { 39 return nil 40 } 41 42 owners, err := e.GetKnotUsersByRole("server:owner", rbac.ThisServer) 43 if err != nil { 44 return fmt.Errorf("list owners: %w", err) 45 } 46 47 var rows []db.KnotMember 48 for _, owner := range owners { 49 ownerCtx, cancel := context.WithTimeout(ctx, knotMembersBackfillPerOwner) 50 ownerRows, err := fetchOwnerKnotMembers(ownerCtx, resolver, hostname, owner, l) 51 cancel() 52 if err != nil { 53 l.Warn("skipping owner during backfill", "owner", owner, "err", err) 54 continue 55 } 56 rows = append(rows, ownerRows...) 57 } 58 59 for _, m := range rows { 60 if err := e.AddKnotMember(rbac.ThisServer, m.Subject.String()); err != nil { 61 return fmt.Errorf("grant ACL for %s: %w", m.Subject, err) 62 } 63 } 64 65 if err := d.ApplyKnotMembersBackfill(ctx, rows, knotMembersBackfillMigration); err != nil { 66 return fmt.Errorf("apply backfill: %w", err) 67 } 68 69 l.Info("backfilled knot members", "count", len(rows), "owners", len(owners)) 70 return nil 71} 72 73func fetchOwnerKnotMembers( 74 ctx context.Context, 75 resolver *idresolver.Resolver, 76 hostname string, 77 owner string, 78 l *slog.Logger, 79) ([]db.KnotMember, error) { 80 ownerDid, err := syntax.ParseDID(owner) 81 if err != nil { 82 return nil, fmt.Errorf("invalid owner DID %q: %w", owner, err) 83 } 84 85 ident, err := resolver.ResolveIdent(ctx, owner) 86 if err != nil { 87 return nil, fmt.Errorf("resolve %s: %w", owner, err) 88 } 89 client := &xrpc.Client{Host: ident.PDSEndpoint()} 90 91 var ( 92 rows []db.KnotMember 93 cursor string 94 ) 95 for { 96 out, err := comatproto.RepoListRecords(ctx, client, tangled.KnotMemberNSID, cursor, 100, owner, false) 97 if err != nil { 98 return nil, fmt.Errorf("list records: %w", err) 99 } 100 for _, rec := range out.Records { 101 m, ok := rec.Value.Val.(*tangled.KnotMember) 102 if !ok || m.Domain != hostname { 103 continue 104 } 105 subject, err := syntax.ParseDID(m.Subject) 106 if err != nil { 107 l.Warn("invalid subject DID in record, skipping", "uri", rec.Uri, "err", err) 108 continue 109 } 110 uri, err := syntax.ParseATURI(rec.Uri) 111 if err != nil { 112 l.Warn("invalid AT URI in record, skipping", "uri", rec.Uri, "err", err) 113 continue 114 } 115 rkey := uri.RecordKey().String() 116 if rkey == "" { 117 l.Warn("empty rkey in AT URI, skipping", "uri", rec.Uri) 118 continue 119 } 120 rows = append(rows, db.KnotMember{ 121 Did: ownerDid, 122 Rkey: rkey, 123 Subject: subject, 124 }) 125 } 126 if out.Cursor == nil || *out.Cursor == "" || *out.Cursor == cursor { 127 break 128 } 129 cursor = *out.Cursor 130 } 131 return rows, nil 132}