Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 comatproto "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/indigo/xrpc"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/idresolver"
15 "tangled.org/core/knotserver/db"
16 "tangled.org/core/rbac"
17)
18
19const (
20 knotMembersBackfillMigration = "backfill-knot-members-from-pds-v2"
21 knotMembersBackfillPerOwner = 30 * time.Second
22)
23
24func BackfillKnotMembers(
25 ctx context.Context,
26 d *db.DB,
27 e *rbac.Enforcer,
28 resolver *idresolver.Resolver,
29 hostname string,
30 logger *slog.Logger,
31) error {
32 l := logger.With("migration", knotMembersBackfillMigration)
33
34 applied, err := d.IsMigrationApplied(knotMembersBackfillMigration)
35 if err != nil {
36 return fmt.Errorf("check migration applied: %w", err)
37 }
38 if applied {
39 return nil
40 }
41
42 owners, err := e.GetKnotUsersByRole("server:owner", rbac.ThisServer)
43 if err != nil {
44 return fmt.Errorf("list owners: %w", err)
45 }
46
47 var rows []db.KnotMember
48 for _, owner := range owners {
49 ownerCtx, cancel := context.WithTimeout(ctx, knotMembersBackfillPerOwner)
50 ownerRows, err := fetchOwnerKnotMembers(ownerCtx, resolver, hostname, owner, l)
51 cancel()
52 if err != nil {
53 l.Warn("skipping owner during backfill", "owner", owner, "err", err)
54 continue
55 }
56 rows = append(rows, ownerRows...)
57 }
58
59 for _, m := range rows {
60 if err := e.AddKnotMember(rbac.ThisServer, m.Subject.String()); err != nil {
61 return fmt.Errorf("grant ACL for %s: %w", m.Subject, err)
62 }
63 }
64
65 if err := d.ApplyKnotMembersBackfill(ctx, rows, knotMembersBackfillMigration); err != nil {
66 return fmt.Errorf("apply backfill: %w", err)
67 }
68
69 l.Info("backfilled knot members", "count", len(rows), "owners", len(owners))
70 return nil
71}
72
73func fetchOwnerKnotMembers(
74 ctx context.Context,
75 resolver *idresolver.Resolver,
76 hostname string,
77 owner string,
78 l *slog.Logger,
79) ([]db.KnotMember, error) {
80 ownerDid, err := syntax.ParseDID(owner)
81 if err != nil {
82 return nil, fmt.Errorf("invalid owner DID %q: %w", owner, err)
83 }
84
85 ident, err := resolver.ResolveIdent(ctx, owner)
86 if err != nil {
87 return nil, fmt.Errorf("resolve %s: %w", owner, err)
88 }
89 client := &xrpc.Client{Host: ident.PDSEndpoint()}
90
91 var (
92 rows []db.KnotMember
93 cursor string
94 )
95 for {
96 out, err := comatproto.RepoListRecords(ctx, client, tangled.KnotMemberNSID, cursor, 100, owner, false)
97 if err != nil {
98 return nil, fmt.Errorf("list records: %w", err)
99 }
100 for _, rec := range out.Records {
101 m, ok := rec.Value.Val.(*tangled.KnotMember)
102 if !ok || m.Domain != hostname {
103 continue
104 }
105 subject, err := syntax.ParseDID(m.Subject)
106 if err != nil {
107 l.Warn("invalid subject DID in record, skipping", "uri", rec.Uri, "err", err)
108 continue
109 }
110 uri, err := syntax.ParseATURI(rec.Uri)
111 if err != nil {
112 l.Warn("invalid AT URI in record, skipping", "uri", rec.Uri, "err", err)
113 continue
114 }
115 rkey := uri.RecordKey().String()
116 if rkey == "" {
117 l.Warn("empty rkey in AT URI, skipping", "uri", rec.Uri)
118 continue
119 }
120 rows = append(rows, db.KnotMember{
121 Did: ownerDid,
122 Rkey: rkey,
123 Subject: subject,
124 })
125 }
126 if out.Cursor == nil || *out.Cursor == "" || *out.Cursor == cursor {
127 break
128 }
129 cursor = *out.Cursor
130 }
131 return rows, nil
132}