Monorepo for Tangled
tangled.org
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "slices"
11 "strings"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/models"
18 "tangled.org/core/appview/repoverify"
19 "tangled.org/core/orm"
20)
21
22func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
23 l = l.With("handler", "ingestRepo")
24
25 switch e.Commit.Operation {
26 case jmodels.CommitOperationCreate:
27 return i.ingestRepoCreate(ctx, e, l)
28 case jmodels.CommitOperationUpdate:
29 return i.ingestRepoUpdate(ctx, e, l)
30 case jmodels.CommitOperationDelete:
31 return i.ingestRepoDelete(ctx, e, l)
32 default:
33 l.Info("unknown repo operation")
34 return nil
35 }
36}
37
38func (i *Ingester) ingestRepoCreate(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
39 l = l.With("handler", "ingestRepoCreate")
40
41 record := tangled.Repo{}
42 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil {
43 l.Error("invalid record", "err", err)
44 return err
45 }
46
47 if record.RepoDid == nil || *record.RepoDid == "" {
48 l.Info("skipping repo create from non-DID-migrated knot")
49 return nil
50 }
51 repoDid := *record.RepoDid
52
53 proceed, err := i.verifyOwnership(ctx, l, repoDid, e.Did, record.Knot)
54 if err != nil {
55 return err
56 }
57 if !proceed {
58 return nil
59 }
60
61 existing, err := db.GetRepo(i.Db,
62 orm.FilterEq("did", e.Did),
63 orm.FilterEq("rkey", e.Commit.RKey),
64 )
65 if err == nil {
66 l.Info("repo row already exists, skipping create", "did", e.Did, "rkey", e.Commit.RKey)
67 if err := i.ensureRepoOwnerPermissions(e.Did, existing.Knot, existing.RepoIdentifier()); err != nil {
68 return fmt.Errorf("failed to ensure repo owner permissions: %w", err)
69 }
70 return nil
71 }
72 if !errors.Is(err, sql.ErrNoRows) {
73 return fmt.Errorf("failed to check existing repo: %w", err)
74 }
75
76 prev, err := db.GetRepoByDid(i.Db, repoDid)
77 if err != nil && !errors.Is(err, sql.ErrNoRows) {
78 return fmt.Errorf("failed to check existing repoDid: %w", err)
79 }
80
81 if prev != nil {
82 l.Info("repoDid exists under different rkey, renaming",
83 "oldRkey", prev.Rkey, "newRkey", e.Commit.RKey)
84
85 oldRepo := *prev
86
87 tx, txErr := i.Db.Begin()
88 if txErr != nil {
89 return fmt.Errorf("failed to begin rename tx: %w", txErr)
90 }
91 defer tx.Rollback()
92
93 newName := derefString(record.Name)
94 if newName == "" {
95 newName = e.Commit.RKey
96 }
97
98 if err := db.RenameRepo(tx, e.Did, prev.Rkey, e.Commit.RKey, newName); err != nil {
99 return fmt.Errorf("failed to rename repo: %w", err)
100 }
101 if err := db.RecordRepoRename(tx, e.Did, prev.Rkey, repoDid); err != nil {
102 return fmt.Errorf("failed to record rename history: %w", err)
103 }
104
105 renamed := *prev
106 renamed.Rkey = e.Commit.RKey
107 renamed.Name = newName
108 desired := repoFromRecord(&renamed, &record)
109 if repoMetadataChanged(&renamed, &desired) {
110 if err := applyRepoMetadata(tx, &renamed, desired); err != nil {
111 return fmt.Errorf("failed to apply metadata after rename: %w", err)
112 }
113 }
114
115 if err := tx.Commit(); err != nil {
116 return fmt.Errorf("failed to commit rename tx: %w", err)
117 }
118
119 newRepo, err := db.GetRepo(i.Db,
120 orm.FilterEq("did", e.Did),
121 orm.FilterEq("rkey", e.Commit.RKey),
122 )
123 if err != nil {
124 l.Warn("failed to fetch repo after rename for notification", "err", err)
125 return nil
126 }
127 if err := i.ensureRepoOwnerPermissions(e.Did, newRepo.Knot, newRepo.RepoIdentifier()); err != nil {
128 return fmt.Errorf("failed to ensure repo owner permissions: %w", err)
129 }
130 i.Notifier.RenameRepo(ctx, syntax.DID(e.Did), &oldRepo, newRepo)
131 return nil
132 }
133
134 rkey := e.Commit.RKey
135 name := derefString(record.Name)
136 if name == "" {
137 name = rkey
138 }
139
140 repo := &models.Repo{
141 Did: e.Did,
142 Name: name,
143 Knot: record.Knot,
144 Rkey: rkey,
145 Description: derefString(record.Description),
146 Website: derefString(record.Website),
147 Topics: append([]string(nil), record.Topics...),
148 Source: derefString(record.Source),
149 Spindle: derefString(record.Spindle),
150 Labels: append([]string(nil), record.Labels...),
151 RepoDid: repoDid,
152 }
153
154 tx, err := i.Db.Begin()
155 if err != nil {
156 return fmt.Errorf("failed to begin insert tx: %w", err)
157 }
158 defer tx.Rollback()
159
160 if err := db.AddRepo(tx, repo); err != nil {
161 return fmt.Errorf("failed to insert repo: %w", err)
162 }
163 if err := tx.Commit(); err != nil {
164 return fmt.Errorf("failed to commit insert tx: %w", err)
165 }
166
167 if err := i.ensureRepoOwnerPermissions(e.Did, repo.Knot, repo.RepoIdentifier()); err != nil {
168 return fmt.Errorf("failed to ensure repo owner permissions: %w", err)
169 }
170
171 i.Notifier.NewRepo(ctx, repo)
172 return nil
173}
174
175func (i *Ingester) ensureRepoOwnerPermissions(ownerDid, knot, repo string) error {
176 if i.Enforcer == nil {
177 return fmt.Errorf("ingester has no RBAC enforcer configured")
178 }
179 if err := i.Enforcer.AddRepo(ownerDid, knot, repo); err != nil {
180 return err
181 }
182 return i.Enforcer.E.SavePolicy()
183}
184
185func (i *Ingester) ingestRepoUpdate(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
186 l = l.With("handler", "ingestRepoUpdate")
187
188 record := tangled.Repo{}
189 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil {
190 l.Error("invalid record", "err", err)
191 return err
192 }
193
194 if record.RepoDid == nil || *record.RepoDid == "" {
195 l.Info("skipping repo update from non-DID-migrated knot")
196 return nil
197 }
198
199 proceed, err := i.verifyOwnership(ctx, l, *record.RepoDid, e.Did, record.Knot)
200 if err != nil {
201 return err
202 }
203 if !proceed {
204 return nil
205 }
206
207 current, err := db.GetRepo(i.Db,
208 orm.FilterEq("did", e.Did),
209 orm.FilterEq("rkey", e.Commit.RKey),
210 )
211 if err != nil {
212 if errors.Is(err, sql.ErrNoRows) {
213 l.Info("skipping repo update for unknown row")
214 return nil
215 }
216 return fmt.Errorf("failed to fetch repo for ingest: %w", err)
217 }
218
219 if current.RepoDid != "" && current.RepoDid != *record.RepoDid {
220 l.Warn("rejecting repo update: repoDid is immutable",
221 "currentRepoDid", current.RepoDid,
222 "recordRepoDid", *record.RepoDid,
223 )
224 return nil
225 }
226
227 desired := repoFromRecord(current, &record)
228
229 if current.Source != desired.Source {
230 l.Warn("source field changed but mutation is unsupported, ignoring",
231 "current", current.Source, "desired", desired.Source)
232 }
233
234 if !repoMetadataChanged(current, &desired) {
235 return nil
236 }
237
238 tx, err := i.Db.Begin()
239 if err != nil {
240 return fmt.Errorf("failed to begin tx: %w", err)
241 }
242 defer tx.Rollback()
243
244 if err := applyRepoMetadata(tx, current, desired); err != nil {
245 return fmt.Errorf("failed to apply repo metadata: %w", err)
246 }
247 return tx.Commit()
248}
249
250func (i *Ingester) ingestRepoDelete(ctx context.Context, e *jmodels.Event, l *slog.Logger) error {
251 l = l.With("handler", "ingestRepoDelete")
252
253 repo, err := db.GetRepo(i.Db,
254 orm.FilterEq("did", e.Did),
255 orm.FilterEq("rkey", e.Commit.RKey),
256 )
257 if err != nil {
258 if errors.Is(err, sql.ErrNoRows) {
259 l.Info("skipping repo delete for unknown row")
260 return nil
261 }
262 return fmt.Errorf("failed to fetch repo for delete: %w", err)
263 }
264
265 if i.Enforcer == nil {
266 return fmt.Errorf("ingester has no RBAC enforcer configured")
267 }
268
269 tx, err := i.Db.Begin()
270 if err != nil {
271 return fmt.Errorf("failed to start txn: %w", err)
272 }
273 committed := false
274 defer func() {
275 if committed {
276 return
277 }
278 tx.Rollback()
279 i.Enforcer.E.LoadPolicy()
280 }()
281
282 if err := db.RemoveRepo(tx, e.Did, e.Commit.RKey); err != nil {
283 return fmt.Errorf("failed to delete repo: %w", err)
284 }
285
286 if err := i.Enforcer.WipeRepoPolicies(repo.Knot, repo.RepoIdentifier()); err != nil {
287 return fmt.Errorf("failed to wipe repo permissions: %w", err)
288 }
289
290 if err := tx.Commit(); err != nil {
291 return fmt.Errorf("failed to commit txn: %w", err)
292 }
293
294 if err := i.Enforcer.E.SavePolicy(); err != nil {
295 return fmt.Errorf("failed to save ACLs: %w", err)
296 }
297 committed = true
298
299 i.Notifier.DeleteRepo(ctx, repo)
300 l.Info("deleted repo row")
301 return nil
302}
303
304func applyRepoMetadata(tx *sql.Tx, current *models.Repo, desired models.Repo) error {
305 if err := db.PutRepo(tx, desired); err != nil {
306 return err
307 }
308
309 if current.Spindle != desired.Spindle {
310 var spindlePtr *string
311 if desired.Spindle != "" {
312 spindlePtr = &desired.Spindle
313 }
314 if err := db.UpdateSpindle(tx, desired.RepoDid, spindlePtr); err != nil {
315 return err
316 }
317 }
318
319 if !labelsEqual(current.Labels, desired.Labels) {
320 if err := reconcileLabels(tx, current, desired); err != nil {
321 return err
322 }
323 }
324
325 return nil
326}
327
328func reconcileLabels(tx *sql.Tx, current *models.Repo, desired models.Repo) error {
329 added := filterOut(desired.Labels, current.Labels)
330 removed := filterOut(current.Labels, desired.Labels)
331
332 if err := applyEach(added, func(l string) error {
333 return db.SubscribeLabel(tx, &models.RepoLabel{
334 RepoDid: syntax.DID(desired.RepoDid),
335 LabelAt: syntax.ATURI(l),
336 })
337 }); err != nil {
338 return err
339 }
340
341 return applyEach(removed, func(l string) error {
342 return db.UnsubscribeLabel(tx,
343 orm.FilterEq("repo_did", desired.RepoDid),
344 orm.FilterEq("label_at", l),
345 )
346 })
347}
348
349func filterOut(items, exclude []string) []string {
350 return slices.DeleteFunc(slices.Clone(items), func(s string) bool {
351 return slices.Contains(exclude, s)
352 })
353}
354
355func applyEach(items []string, fn func(string) error) error {
356 for _, item := range items {
357 if err := fn(item); err != nil {
358 return err
359 }
360 }
361 return nil
362}
363
364func labelsEqual(a, b []string) bool {
365 if len(a) != len(b) {
366 return false
367 }
368 aSorted := append([]string(nil), a...)
369 bSorted := append([]string(nil), b...)
370 slices.Sort(aSorted)
371 slices.Sort(bSorted)
372 return slices.Equal(aSorted, bSorted)
373}
374
375func repoFromRecord(current *models.Repo, record *tangled.Repo) models.Repo {
376 out := *current
377 out.Name = derefString(record.Name)
378 if out.Name == "" {
379 out.Name = current.Rkey
380 }
381 out.Knot = record.Knot
382 out.Description = derefString(record.Description)
383 out.Website = derefString(record.Website)
384 out.Topics = append([]string(nil), record.Topics...)
385 out.Spindle = derefString(record.Spindle)
386 out.Source = derefString(record.Source)
387 out.Labels = append([]string(nil), record.Labels...)
388 if record.RepoDid != nil {
389 out.RepoDid = *record.RepoDid
390 }
391 return out
392}
393
394func repoMetadataChanged(current *models.Repo, desired *models.Repo) bool {
395 return current.Name != desired.Name ||
396 current.Knot != desired.Knot ||
397 current.Description != desired.Description ||
398 current.Website != desired.Website ||
399 current.TopicStr() != desired.TopicStr() ||
400 current.Spindle != desired.Spindle ||
401 !labelsEqual(current.Labels, desired.Labels)
402}
403
404func derefString(s *string) string {
405 if s == nil {
406 return ""
407 }
408 return *s
409}
410
411func (i *Ingester) verifyOwnership(ctx context.Context, l *slog.Logger, repoDid, eventDid, recordKnot string) (bool, error) {
412 if i.Verifier == nil {
413 return false, fmt.Errorf("ingester has no repo ownership verifier configured")
414 }
415 rd, err := repoverify.NewRepoDid(repoDid)
416 if err != nil {
417 l.Warn("rejecting repo event: invalid repoDid on record", "repoDid", repoDid, "err", err)
418 return false, nil
419 }
420 result, err := i.Verifier(ctx, rd)
421 if err != nil {
422 return false, fmt.Errorf("verify repo ownership: %w", err)
423 }
424 if result.OwnerDid == "" {
425 l.Warn("knot lacks RepoDescribeRepo, skipping owner check; upgrade knot to 1.14+",
426 "repoDid", repoDid, "knot", result.KnotURL.String())
427 } else if result.OwnerDid.String() != eventDid {
428 l.Warn("rejecting repo event: owner mismatch",
429 "repoDid", repoDid,
430 "claimedOwner", eventDid,
431 "knotOwner", result.OwnerDid.String(),
432 "knot", result.KnotURL.String(),
433 )
434 return false, nil
435 }
436 if !strings.EqualFold(recordKnot, result.KnotURL.Host) {
437 l.Warn("rejecting repo event: record knot does not match DID-doc endpoint",
438 "repoDid", repoDid,
439 "recordKnot", recordKnot,
440 "canonicalKnot", result.KnotURL.Host,
441 )
442 return false, nil
443 }
444 return true, nil
445}