Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "log/slog"
7 "os"
8 "path/filepath"
9 "sync"
10 "testing"
11
12 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/knotserver/config"
15 "tangled.org/core/knotserver/db"
16 "tangled.org/core/log"
17 "tangled.org/core/rbac"
18)
19
20type logRecord struct {
21 Level slog.Level
22 Msg string
23 Attrs map[string]any
24}
25
26type capturingHandler struct {
27 mu *sync.Mutex
28 records *[]logRecord
29 attrs []slog.Attr
30}
31
32func newCapturingHandler() *capturingHandler {
33 return &capturingHandler{
34 mu: &sync.Mutex{},
35 records: &[]logRecord{},
36 }
37}
38
39func (h *capturingHandler) Enabled(_ context.Context, _ slog.Level) bool { return true }
40
41func (h *capturingHandler) Handle(_ context.Context, r slog.Record) error {
42 rec := logRecord{Level: r.Level, Msg: r.Message, Attrs: map[string]any{}}
43 for _, a := range h.attrs {
44 rec.Attrs[a.Key] = a.Value.Any()
45 }
46 r.Attrs(func(a slog.Attr) bool {
47 rec.Attrs[a.Key] = a.Value.Any()
48 return true
49 })
50 h.mu.Lock()
51 *h.records = append(*h.records, rec)
52 h.mu.Unlock()
53 return nil
54}
55
56func (h *capturingHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
57 merged := make([]slog.Attr, 0, len(h.attrs)+len(attrs))
58 merged = append(merged, h.attrs...)
59 merged = append(merged, attrs...)
60 return &capturingHandler{mu: h.mu, records: h.records, attrs: merged}
61}
62
63func (h *capturingHandler) WithGroup(string) slog.Handler {
64 panic("capturingHandler: WithGroup not supported")
65}
66
67func (h *capturingHandler) snapshot() []logRecord {
68 h.mu.Lock()
69 defer h.mu.Unlock()
70 out := make([]logRecord, len(*h.records))
71 copy(out, *h.records)
72 return out
73}
74
75func newProcessRepoFixture(t *testing.T) (*Knot, context.Context, *capturingHandler) {
76 t.Helper()
77 scanPath := t.TempDir()
78 dbPath := filepath.Join(scanPath, "knot.db")
79 d, err := db.Setup(context.Background(), dbPath)
80 if err != nil {
81 t.Fatalf("db.Setup: %v", err)
82 }
83
84 e, err := rbac.NewEnforcer(dbPath)
85 if err != nil {
86 t.Fatalf("rbac.NewEnforcer: %v", err)
87 }
88 if err := e.AddKnot(rbac.ThisServer); err != nil {
89 t.Fatalf("AddKnot: %v", err)
90 }
91
92 cap := newCapturingHandler()
93 l := slog.New(cap)
94 ctx := log.IntoContext(context.Background(), l)
95
96 c := &config.Config{
97 Server: config.Server{Hostname: "knot.example"},
98 Repo: config.Repo{ScanPath: scanPath},
99 }
100 return &Knot{
101 c: c,
102 db: d,
103 e: e,
104 l: l,
105 }, ctx, cap
106}
107
108func repoEvent(t *testing.T, authorDid, rkey, rev string, record tangled.Repo, op string) *jsmodels.Event {
109 t.Helper()
110 raw, err := json.Marshal(record)
111 if err != nil {
112 t.Fatalf("marshal record: %v", err)
113 }
114 return &jsmodels.Event{
115 Did: authorDid,
116 Kind: jsmodels.EventKindCommit,
117 Commit: &jsmodels.Commit{
118 Operation: op,
119 Collection: tangled.RepoNSID,
120 RKey: rkey,
121 Rev: rev,
122 Record: raw,
123 },
124 }
125}
126
127func ptr(s string) *string { return &s }
128
129func TestProcessRepo_CreateRegistersAlias(t *testing.T) {
130 h, ctx, _ := newProcessRepoFixture(t)
131 if err := h.db.StoreRepoKey("did:plc:repo1", []byte("k"), "did:plc:akshay", "foo"); err != nil {
132 t.Fatalf("StoreRepoKey: %v", err)
133 }
134
135 ev := repoEvent(t, "did:plc:akshay", "bar", "3laaaaaaaaaab", tangled.Repo{
136 Knot: "knot.example",
137 RepoDid: ptr("did:plc:repo1"),
138 }, jsmodels.CommitOperationCreate)
139 if err := h.processRepo(ctx, ev); err != nil {
140 t.Fatalf("processRepo: %v", err)
141 }
142
143 _, current, err := h.db.CurrentRkey("did:plc:repo1")
144 if err != nil {
145 t.Fatalf("CurrentRkey: %v", err)
146 }
147 if current != "bar" {
148 t.Errorf("current rkey = %q, want bar (highest rev alias)", current)
149 }
150
151 oldDid, err := h.db.GetRepoDid("did:plc:akshay", "foo")
152 if err != nil || oldDid != "did:plc:repo1" {
153 t.Errorf("old rkey foo should still resolve: got (%q, %v)", oldDid, err)
154 }
155}
156
157func TestProcessRepo_DeleteIsNoOp(t *testing.T) {
158 h, ctx, _ := newProcessRepoFixture(t)
159 if err := h.db.StoreRepoKey("did:plc:repo1", []byte("k"), "did:plc:akshay", "foo"); err != nil {
160 t.Fatalf("StoreRepoKey: %v", err)
161 }
162 if err := h.db.UpsertRepoAlias(db.RepoAlias{
163 OwnerDid: "did:plc:akshay", Rkey: "bar", RepoDid: "did:plc:repo1", Rev: "3laaaaaaaaaab",
164 }); err != nil {
165 t.Fatalf("UpsertRepoAlias: %v", err)
166 }
167 if err := h.e.AddRepo("did:plc:akshay", rbac.ThisServer, "did:plc:repo1"); err != nil {
168 t.Fatalf("AddRepo rbac: %v", err)
169 }
170 repoPath := filepath.Join(h.c.Repo.ScanPath, "did:plc:repo1")
171 if err := os.MkdirAll(repoPath, 0o755); err != nil {
172 t.Fatalf("MkdirAll: %v", err)
173 }
174
175 ev := repoEvent(t, "did:plc:akshay", "bar", "3laaaaaaaaaac", tangled.Repo{}, jsmodels.CommitOperationDelete)
176 if err := h.processRepo(ctx, ev); err != nil {
177 t.Fatalf("processRepo: %v", err)
178 }
179
180 if got, err := h.db.GetRepoDid("did:plc:akshay", "bar"); err != nil || got != "did:plc:repo1" {
181 t.Errorf("bar alias should be untouched by firehose delete: got (%q, %v)", got, err)
182 }
183 if got, err := h.db.GetRepoDid("did:plc:akshay", "foo"); err != nil || got != "did:plc:repo1" {
184 t.Errorf("foo alias should be untouched by firehose delete: got (%q, %v)", got, err)
185 }
186 if exists, _ := h.db.RepoDidExists("did:plc:repo1"); !exists {
187 t.Errorf("repo_keys row should be untouched by firehose delete")
188 }
189 if _, err := os.Stat(repoPath); err != nil {
190 t.Errorf("repo dir should be untouched by firehose delete: %v", err)
191 }
192 if allowed, _ := h.e.IsRepoDeleteAllowed("did:plc:akshay", rbac.ThisServer, "did:plc:repo1"); !allowed {
193 t.Errorf("rbac policies should be untouched by firehose delete")
194 }
195}
196
197func TestProcessRepo_MalformedJSONReturnsError(t *testing.T) {
198 h, ctx, _ := newProcessRepoFixture(t)
199
200 ev := &jsmodels.Event{
201 Did: "did:plc:akshay",
202 Kind: jsmodels.EventKindCommit,
203 Commit: &jsmodels.Commit{
204 Operation: jsmodels.CommitOperationCreate,
205 Collection: tangled.RepoNSID,
206 RKey: "rkey1",
207 Record: []byte("{not valid json"),
208 },
209 }
210 if err := h.processRepo(ctx, ev); err == nil {
211 t.Fatalf("processRepo returned nil, want unmarshal error")
212 }
213}
214
215func TestProcessRepo_NotOwnedRejected(t *testing.T) {
216 h, ctx, _ := newProcessRepoFixture(t)
217 if err := h.db.StoreRepoKey("did:plc:repo1", []byte("k"), "did:plc:akshay", "foo"); err != nil {
218 t.Fatalf("StoreRepoKey: %v", err)
219 }
220
221 ev := repoEvent(t, "did:plc:mallory", "pwned", "3laaaaaaaaaab", tangled.Repo{
222 Knot: "knot.example",
223 RepoDid: ptr("did:plc:repo1"),
224 }, jsmodels.CommitOperationCreate)
225 if err := h.processRepo(ctx, ev); err != nil {
226 t.Fatalf("processRepo: %v", err)
227 }
228
229 _, current, _ := h.db.CurrentRkey("did:plc:repo1")
230 if current != "foo" {
231 t.Errorf("current rkey = %q, want foo (mallory's event must be rejected)", current)
232 }
233 if _, err := h.db.GetRepoDid("did:plc:mallory", "pwned"); err == nil {
234 t.Errorf("mallory should not be able to register an alias on alice's repo")
235 }
236}
237
238func TestProcessRepo_WrongKnotIgnored(t *testing.T) {
239 h, ctx, _ := newProcessRepoFixture(t)
240 if err := h.db.StoreRepoKey("did:plc:repo1", []byte("k"), "did:plc:akshay", "foo"); err != nil {
241 t.Fatalf("StoreRepoKey: %v", err)
242 }
243
244 ev := repoEvent(t, "did:plc:akshay", "bar", "3laaaaaaaaaab", tangled.Repo{
245 Knot: "other.example",
246 RepoDid: ptr("did:plc:repo1"),
247 }, jsmodels.CommitOperationCreate)
248 if err := h.processRepo(ctx, ev); err != nil {
249 t.Fatalf("processRepo: %v", err)
250 }
251
252 _, current, _ := h.db.CurrentRkey("did:plc:repo1")
253 if current != "foo" {
254 t.Errorf("current rkey = %q, want foo (foreign-knot event must be ignored)", current)
255 }
256}