forked from
tangled.org/core
Monorepo for Tangled
1package appview
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "slices"
11 "strings"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/models"
18 "tangled.org/core/appview/repoverify"
19 "tangled.org/core/orm"
20)
21
22func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event) error {
23 l := i.Logger.With("handler", "ingestRepo", "did", e.Did, "rkey", e.Commit.RKey)
24
25 switch e.Commit.Operation {
26 case jmodels.CommitOperationCreate:
27 return i.ingestRepoCreate(ctx, e)
28 case jmodels.CommitOperationUpdate:
29 return i.ingestRepoUpdate(ctx, e)
30 case jmodels.CommitOperationDelete:
31 return i.ingestRepoDelete(ctx, e)
32 default:
33 l.Info("unknown repo operation", "op", e.Commit.Operation)
34 return nil
35 }
36}
37
38func (i *Ingester) ingestRepoCreate(ctx context.Context, e *jmodels.Event) error {
39 l := i.Logger.With("handler", "ingestRepoCreate", "did", e.Did, "rkey", e.Commit.RKey)
40
41 record := tangled.Repo{}
42 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil {
43 l.Error("invalid record", "err", err)
44 return err
45 }
46
47 if record.RepoDid == nil || *record.RepoDid == "" {
48 l.Info("skipping repo create from non-DID-migrated knot")
49 return nil
50 }
51 repoDid := *record.RepoDid
52
53 proceed, err := i.verifyOwnership(ctx, l, repoDid, e.Did, record.Knot)
54 if err != nil {
55 return err
56 }
57 if !proceed {
58 return nil
59 }
60
61 _, err = db.GetRepo(i.Db,
62 orm.FilterEq("did", e.Did),
63 orm.FilterEq("rkey", e.Commit.RKey),
64 )
65 if err == nil {
66 l.Info("repo row already exists, skipping create", "did", e.Did, "rkey", e.Commit.RKey)
67 return nil
68 }
69 if !errors.Is(err, sql.ErrNoRows) {
70 return fmt.Errorf("failed to check existing repo: %w", err)
71 }
72
73 prev, err := db.GetRepoByDid(i.Db, repoDid)
74 if err != nil && !errors.Is(err, sql.ErrNoRows) {
75 return fmt.Errorf("failed to check existing repoDid: %w", err)
76 }
77
78 if prev != nil {
79 l.Info("repoDid exists under different rkey, renaming",
80 "oldRkey", prev.Rkey, "newRkey", e.Commit.RKey)
81
82 oldRepo := *prev
83
84 tx, txErr := i.Db.Begin()
85 if txErr != nil {
86 return fmt.Errorf("failed to begin rename tx: %w", txErr)
87 }
88 defer tx.Rollback()
89
90 newName := derefString(record.Name)
91 if newName == "" {
92 newName = e.Commit.RKey
93 }
94
95 if err := db.RenameRepo(tx, e.Did, prev.Rkey, e.Commit.RKey, newName); err != nil {
96 return fmt.Errorf("failed to rename repo: %w", err)
97 }
98 if err := db.RecordRepoRename(tx, e.Did, prev.Rkey, repoDid); err != nil {
99 return fmt.Errorf("failed to record rename history: %w", err)
100 }
101
102 renamed := *prev
103 renamed.Rkey = e.Commit.RKey
104 renamed.Name = newName
105 desired := repoFromRecord(&renamed, &record)
106 if repoMetadataChanged(&renamed, &desired) {
107 if err := applyRepoMetadata(tx, &renamed, desired); err != nil {
108 return fmt.Errorf("failed to apply metadata after rename: %w", err)
109 }
110 }
111
112 if err := tx.Commit(); err != nil {
113 return fmt.Errorf("failed to commit rename tx: %w", err)
114 }
115
116 newRepo, err := db.GetRepo(i.Db,
117 orm.FilterEq("did", e.Did),
118 orm.FilterEq("rkey", e.Commit.RKey),
119 )
120 if err != nil {
121 l.Warn("failed to fetch repo after rename for notification", "err", err)
122 return nil
123 }
124 i.Notifier.RenameRepo(ctx, syntax.DID(e.Did), &oldRepo, newRepo)
125 return nil
126 }
127
128 rkey := e.Commit.RKey
129 name := derefString(record.Name)
130 if name == "" {
131 name = rkey
132 }
133
134 repo := &models.Repo{
135 Did: e.Did,
136 Name: name,
137 Knot: record.Knot,
138 Rkey: rkey,
139 Description: derefString(record.Description),
140 Website: derefString(record.Website),
141 Topics: append([]string(nil), record.Topics...),
142 Source: derefString(record.Source),
143 Spindle: derefString(record.Spindle),
144 Labels: append([]string(nil), record.Labels...),
145 RepoDid: repoDid,
146 }
147
148 tx, err := i.Db.Begin()
149 if err != nil {
150 return fmt.Errorf("failed to begin insert tx: %w", err)
151 }
152 defer tx.Rollback()
153
154 if err := db.AddRepo(tx, repo); err != nil {
155 return fmt.Errorf("failed to insert repo: %w", err)
156 }
157 if err := tx.Commit(); err != nil {
158 return fmt.Errorf("failed to commit insert tx: %w", err)
159 }
160
161 i.Notifier.NewRepo(ctx, repo)
162 return nil
163}
164
165func (i *Ingester) ingestRepoUpdate(ctx context.Context, e *jmodels.Event) error {
166 l := i.Logger.With("handler", "ingestRepoUpdate", "did", e.Did, "rkey", e.Commit.RKey)
167
168 record := tangled.Repo{}
169 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil {
170 l.Error("invalid record", "err", err)
171 return err
172 }
173
174 if record.RepoDid == nil || *record.RepoDid == "" {
175 l.Info("skipping repo update from non-DID-migrated knot")
176 return nil
177 }
178
179 proceed, err := i.verifyOwnership(ctx, l, *record.RepoDid, e.Did, record.Knot)
180 if err != nil {
181 return err
182 }
183 if !proceed {
184 return nil
185 }
186
187 current, err := db.GetRepo(i.Db,
188 orm.FilterEq("did", e.Did),
189 orm.FilterEq("rkey", e.Commit.RKey),
190 )
191 if err != nil {
192 if errors.Is(err, sql.ErrNoRows) {
193 l.Info("skipping repo update for unknown row")
194 return nil
195 }
196 return fmt.Errorf("failed to fetch repo for ingest: %w", err)
197 }
198
199 if current.RepoDid != "" && current.RepoDid != *record.RepoDid {
200 l.Warn("rejecting repo update: repoDid is immutable",
201 "currentRepoDid", current.RepoDid,
202 "recordRepoDid", *record.RepoDid,
203 )
204 return nil
205 }
206
207 desired := repoFromRecord(current, &record)
208
209 if current.Source != desired.Source {
210 l.Warn("source field changed but mutation is unsupported, ignoring",
211 "current", current.Source, "desired", desired.Source)
212 }
213
214 if !repoMetadataChanged(current, &desired) {
215 return nil
216 }
217
218 tx, err := i.Db.Begin()
219 if err != nil {
220 return fmt.Errorf("failed to begin tx: %w", err)
221 }
222 defer tx.Rollback()
223
224 if err := applyRepoMetadata(tx, current, desired); err != nil {
225 return fmt.Errorf("failed to apply repo metadata: %w", err)
226 }
227 return tx.Commit()
228}
229
230func (i *Ingester) ingestRepoDelete(ctx context.Context, e *jmodels.Event) error {
231 l := i.Logger.With("handler", "ingestRepoDelete", "did", e.Did, "rkey", e.Commit.RKey)
232
233 repo, err := db.GetRepo(i.Db,
234 orm.FilterEq("did", e.Did),
235 orm.FilterEq("rkey", e.Commit.RKey),
236 )
237 if err != nil {
238 if errors.Is(err, sql.ErrNoRows) {
239 l.Info("skipping repo delete for unknown row")
240 return nil
241 }
242 return fmt.Errorf("failed to fetch repo for delete: %w", err)
243 }
244
245 if err := db.RemoveRepo(i.Db, e.Did, e.Commit.RKey); err != nil {
246 return fmt.Errorf("failed to delete repo: %w", err)
247 }
248
249 i.Notifier.DeleteRepo(ctx, repo)
250 l.Info("deleted repo row")
251 return nil
252}
253
254func applyRepoMetadata(tx *sql.Tx, current *models.Repo, desired models.Repo) error {
255 if err := db.PutRepo(tx, desired); err != nil {
256 return err
257 }
258
259 if current.Spindle != desired.Spindle {
260 var spindlePtr *string
261 if desired.Spindle != "" {
262 spindlePtr = &desired.Spindle
263 }
264 if err := db.UpdateSpindle(tx, desired.RepoDid, spindlePtr); err != nil {
265 return err
266 }
267 }
268
269 if !labelsEqual(current.Labels, desired.Labels) {
270 if err := reconcileLabels(tx, current, desired); err != nil {
271 return err
272 }
273 }
274
275 return nil
276}
277
278func reconcileLabels(tx *sql.Tx, current *models.Repo, desired models.Repo) error {
279 added := filterOut(desired.Labels, current.Labels)
280 removed := filterOut(current.Labels, desired.Labels)
281
282 if err := applyEach(added, func(l string) error {
283 return db.SubscribeLabel(tx, &models.RepoLabel{
284 RepoDid: syntax.DID(desired.RepoDid),
285 LabelAt: syntax.ATURI(l),
286 })
287 }); err != nil {
288 return err
289 }
290
291 return applyEach(removed, func(l string) error {
292 return db.UnsubscribeLabel(tx,
293 orm.FilterEq("repo_did", desired.RepoDid),
294 orm.FilterEq("label_at", l),
295 )
296 })
297}
298
299func filterOut(items, exclude []string) []string {
300 return slices.DeleteFunc(slices.Clone(items), func(s string) bool {
301 return slices.Contains(exclude, s)
302 })
303}
304
305func applyEach(items []string, fn func(string) error) error {
306 for _, item := range items {
307 if err := fn(item); err != nil {
308 return err
309 }
310 }
311 return nil
312}
313
314func labelsEqual(a, b []string) bool {
315 if len(a) != len(b) {
316 return false
317 }
318 aSorted := append([]string(nil), a...)
319 bSorted := append([]string(nil), b...)
320 slices.Sort(aSorted)
321 slices.Sort(bSorted)
322 return slices.Equal(aSorted, bSorted)
323}
324
325func repoFromRecord(current *models.Repo, record *tangled.Repo) models.Repo {
326 out := *current
327 out.Name = derefString(record.Name)
328 if out.Name == "" {
329 out.Name = current.Rkey
330 }
331 out.Knot = record.Knot
332 out.Description = derefString(record.Description)
333 out.Website = derefString(record.Website)
334 out.Topics = append([]string(nil), record.Topics...)
335 out.Spindle = derefString(record.Spindle)
336 out.Source = derefString(record.Source)
337 out.Labels = append([]string(nil), record.Labels...)
338 if record.RepoDid != nil {
339 out.RepoDid = *record.RepoDid
340 }
341 return out
342}
343
344func repoMetadataChanged(current *models.Repo, desired *models.Repo) bool {
345 return current.Name != desired.Name ||
346 current.Knot != desired.Knot ||
347 current.Description != desired.Description ||
348 current.Website != desired.Website ||
349 current.TopicStr() != desired.TopicStr() ||
350 current.Spindle != desired.Spindle ||
351 !labelsEqual(current.Labels, desired.Labels)
352}
353
354func derefString(s *string) string {
355 if s == nil {
356 return ""
357 }
358 return *s
359}
360
361func (i *Ingester) verifyOwnership(ctx context.Context, l *slog.Logger, repoDid, eventDid, recordKnot string) (bool, error) {
362 if i.Verifier == nil {
363 return false, fmt.Errorf("ingester has no repo ownership verifier configured")
364 }
365 rd, err := repoverify.NewRepoDid(repoDid)
366 if err != nil {
367 l.Warn("rejecting repo event: invalid repoDid on record", "repoDid", repoDid, "err", err)
368 return false, nil
369 }
370 result, err := i.Verifier(ctx, rd)
371 if err != nil {
372 return false, fmt.Errorf("verify repo ownership: %w", err)
373 }
374 if result.OwnerDid == "" {
375 l.Warn("knot lacks RepoDescribeRepo, skipping owner check; upgrade knot to 1.14+",
376 "repoDid", repoDid, "knot", result.KnotURL.String())
377 } else if result.OwnerDid.String() != eventDid {
378 l.Warn("rejecting repo event: owner mismatch",
379 "repoDid", repoDid,
380 "claimedOwner", eventDid,
381 "knotOwner", result.OwnerDid.String(),
382 "knot", result.KnotURL.String(),
383 )
384 return false, nil
385 }
386 if !strings.EqualFold(recordKnot, result.KnotURL.Host) {
387 l.Warn("rejecting repo event: record knot does not match DID-doc endpoint",
388 "repoDid", repoDid,
389 "recordKnot", recordKnot,
390 "canonicalKnot", result.KnotURL.Host,
391 )
392 return false, nil
393 }
394 return true, nil
395}