Monorepo for Tangled
0

Configure Feed

Select the types of activity you want to include in your feed.

at master 11 kB View raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "slices" 11 "strings" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/repoverify" 19 "tangled.org/core/orm" 20) 21 22func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event) error { 23 l := i.Logger.With("handler", "ingestRepo", "did", e.Did, "rkey", e.Commit.RKey) 24 25 switch e.Commit.Operation { 26 case jmodels.CommitOperationCreate: 27 return i.ingestRepoCreate(ctx, e) 28 case jmodels.CommitOperationUpdate: 29 return i.ingestRepoUpdate(ctx, e) 30 case jmodels.CommitOperationDelete: 31 return i.ingestRepoDelete(ctx, e) 32 default: 33 l.Info("unknown repo operation", "op", e.Commit.Operation) 34 return nil 35 } 36} 37 38func (i *Ingester) ingestRepoCreate(ctx context.Context, e *jmodels.Event) error { 39 l := i.Logger.With("handler", "ingestRepoCreate", "did", e.Did, "rkey", e.Commit.RKey) 40 41 record := tangled.Repo{} 42 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil { 43 l.Error("invalid record", "err", err) 44 return err 45 } 46 47 if record.RepoDid == nil || *record.RepoDid == "" { 48 l.Info("skipping repo create from non-DID-migrated knot") 49 return nil 50 } 51 repoDid := *record.RepoDid 52 53 proceed, err := i.verifyOwnership(ctx, l, repoDid, e.Did, record.Knot) 54 if err != nil { 55 return err 56 } 57 if !proceed { 58 return nil 59 } 60 61 _, err = db.GetRepo(i.Db, 62 orm.FilterEq("did", e.Did), 63 orm.FilterEq("rkey", e.Commit.RKey), 64 ) 65 if err == nil { 66 l.Info("repo row already exists, skipping create", "did", e.Did, "rkey", e.Commit.RKey) 67 return nil 68 } 69 if !errors.Is(err, sql.ErrNoRows) { 70 return fmt.Errorf("failed to check existing repo: %w", err) 71 } 72 73 prev, err := db.GetRepoByDid(i.Db, repoDid) 74 if err != nil && !errors.Is(err, sql.ErrNoRows) { 75 return fmt.Errorf("failed to check existing repoDid: %w", err) 76 } 77 78 if prev != nil { 79 l.Info("repoDid exists under different rkey, renaming", 80 "oldRkey", prev.Rkey, "newRkey", e.Commit.RKey) 81 82 oldRepo := *prev 83 84 tx, txErr := i.Db.Begin() 85 if txErr != nil { 86 return fmt.Errorf("failed to begin rename tx: %w", txErr) 87 } 88 defer tx.Rollback() 89 90 newName := derefString(record.Name) 91 if newName == "" { 92 newName = e.Commit.RKey 93 } 94 95 if err := db.RenameRepo(tx, e.Did, prev.Rkey, e.Commit.RKey, newName); err != nil { 96 return fmt.Errorf("failed to rename repo: %w", err) 97 } 98 if err := db.RecordRepoRename(tx, e.Did, prev.Rkey, repoDid); err != nil { 99 return fmt.Errorf("failed to record rename history: %w", err) 100 } 101 102 renamed := *prev 103 renamed.Rkey = e.Commit.RKey 104 renamed.Name = newName 105 desired := repoFromRecord(&renamed, &record) 106 if repoMetadataChanged(&renamed, &desired) { 107 if err := applyRepoMetadata(tx, &renamed, desired); err != nil { 108 return fmt.Errorf("failed to apply metadata after rename: %w", err) 109 } 110 } 111 112 if err := tx.Commit(); err != nil { 113 return fmt.Errorf("failed to commit rename tx: %w", err) 114 } 115 116 newRepo, err := db.GetRepo(i.Db, 117 orm.FilterEq("did", e.Did), 118 orm.FilterEq("rkey", e.Commit.RKey), 119 ) 120 if err != nil { 121 l.Warn("failed to fetch repo after rename for notification", "err", err) 122 return nil 123 } 124 i.Notifier.RenameRepo(ctx, syntax.DID(e.Did), &oldRepo, newRepo) 125 return nil 126 } 127 128 rkey := e.Commit.RKey 129 name := derefString(record.Name) 130 if name == "" { 131 name = rkey 132 } 133 134 repo := &models.Repo{ 135 Did: e.Did, 136 Name: name, 137 Knot: record.Knot, 138 Rkey: rkey, 139 Description: derefString(record.Description), 140 Website: derefString(record.Website), 141 Topics: append([]string(nil), record.Topics...), 142 Source: derefString(record.Source), 143 Spindle: derefString(record.Spindle), 144 Labels: append([]string(nil), record.Labels...), 145 RepoDid: repoDid, 146 } 147 148 tx, err := i.Db.Begin() 149 if err != nil { 150 return fmt.Errorf("failed to begin insert tx: %w", err) 151 } 152 defer tx.Rollback() 153 154 if err := db.AddRepo(tx, repo); err != nil { 155 return fmt.Errorf("failed to insert repo: %w", err) 156 } 157 if err := tx.Commit(); err != nil { 158 return fmt.Errorf("failed to commit insert tx: %w", err) 159 } 160 161 i.Notifier.NewRepo(ctx, repo) 162 return nil 163} 164 165func (i *Ingester) ingestRepoUpdate(ctx context.Context, e *jmodels.Event) error { 166 l := i.Logger.With("handler", "ingestRepoUpdate", "did", e.Did, "rkey", e.Commit.RKey) 167 168 record := tangled.Repo{} 169 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil { 170 l.Error("invalid record", "err", err) 171 return err 172 } 173 174 if record.RepoDid == nil || *record.RepoDid == "" { 175 l.Info("skipping repo update from non-DID-migrated knot") 176 return nil 177 } 178 179 proceed, err := i.verifyOwnership(ctx, l, *record.RepoDid, e.Did, record.Knot) 180 if err != nil { 181 return err 182 } 183 if !proceed { 184 return nil 185 } 186 187 current, err := db.GetRepo(i.Db, 188 orm.FilterEq("did", e.Did), 189 orm.FilterEq("rkey", e.Commit.RKey), 190 ) 191 if err != nil { 192 if errors.Is(err, sql.ErrNoRows) { 193 l.Info("skipping repo update for unknown row") 194 return nil 195 } 196 return fmt.Errorf("failed to fetch repo for ingest: %w", err) 197 } 198 199 if current.RepoDid != "" && current.RepoDid != *record.RepoDid { 200 l.Warn("rejecting repo update: repoDid is immutable", 201 "currentRepoDid", current.RepoDid, 202 "recordRepoDid", *record.RepoDid, 203 ) 204 return nil 205 } 206 207 desired := repoFromRecord(current, &record) 208 209 if current.Source != desired.Source { 210 l.Warn("source field changed but mutation is unsupported, ignoring", 211 "current", current.Source, "desired", desired.Source) 212 } 213 214 if !repoMetadataChanged(current, &desired) { 215 return nil 216 } 217 218 tx, err := i.Db.Begin() 219 if err != nil { 220 return fmt.Errorf("failed to begin tx: %w", err) 221 } 222 defer tx.Rollback() 223 224 if err := applyRepoMetadata(tx, current, desired); err != nil { 225 return fmt.Errorf("failed to apply repo metadata: %w", err) 226 } 227 return tx.Commit() 228} 229 230func (i *Ingester) ingestRepoDelete(ctx context.Context, e *jmodels.Event) error { 231 l := i.Logger.With("handler", "ingestRepoDelete", "did", e.Did, "rkey", e.Commit.RKey) 232 233 repo, err := db.GetRepo(i.Db, 234 orm.FilterEq("did", e.Did), 235 orm.FilterEq("rkey", e.Commit.RKey), 236 ) 237 if err != nil { 238 if errors.Is(err, sql.ErrNoRows) { 239 l.Info("skipping repo delete for unknown row") 240 return nil 241 } 242 return fmt.Errorf("failed to fetch repo for delete: %w", err) 243 } 244 245 if err := db.RemoveRepo(i.Db, e.Did, e.Commit.RKey); err != nil { 246 return fmt.Errorf("failed to delete repo: %w", err) 247 } 248 249 i.Notifier.DeleteRepo(ctx, repo) 250 l.Info("deleted repo row") 251 return nil 252} 253 254func applyRepoMetadata(tx *sql.Tx, current *models.Repo, desired models.Repo) error { 255 if err := db.PutRepo(tx, desired); err != nil { 256 return err 257 } 258 259 if current.Spindle != desired.Spindle { 260 var spindlePtr *string 261 if desired.Spindle != "" { 262 spindlePtr = &desired.Spindle 263 } 264 if err := db.UpdateSpindle(tx, desired.RepoDid, spindlePtr); err != nil { 265 return err 266 } 267 } 268 269 if !labelsEqual(current.Labels, desired.Labels) { 270 if err := reconcileLabels(tx, current, desired); err != nil { 271 return err 272 } 273 } 274 275 return nil 276} 277 278func reconcileLabels(tx *sql.Tx, current *models.Repo, desired models.Repo) error { 279 added := filterOut(desired.Labels, current.Labels) 280 removed := filterOut(current.Labels, desired.Labels) 281 282 if err := applyEach(added, func(l string) error { 283 return db.SubscribeLabel(tx, &models.RepoLabel{ 284 RepoDid: syntax.DID(desired.RepoDid), 285 LabelAt: syntax.ATURI(l), 286 }) 287 }); err != nil { 288 return err 289 } 290 291 return applyEach(removed, func(l string) error { 292 return db.UnsubscribeLabel(tx, 293 orm.FilterEq("repo_did", desired.RepoDid), 294 orm.FilterEq("label_at", l), 295 ) 296 }) 297} 298 299func filterOut(items, exclude []string) []string { 300 return slices.DeleteFunc(slices.Clone(items), func(s string) bool { 301 return slices.Contains(exclude, s) 302 }) 303} 304 305func applyEach(items []string, fn func(string) error) error { 306 for _, item := range items { 307 if err := fn(item); err != nil { 308 return err 309 } 310 } 311 return nil 312} 313 314func labelsEqual(a, b []string) bool { 315 if len(a) != len(b) { 316 return false 317 } 318 aSorted := append([]string(nil), a...) 319 bSorted := append([]string(nil), b...) 320 slices.Sort(aSorted) 321 slices.Sort(bSorted) 322 return slices.Equal(aSorted, bSorted) 323} 324 325func repoFromRecord(current *models.Repo, record *tangled.Repo) models.Repo { 326 out := *current 327 out.Name = derefString(record.Name) 328 if out.Name == "" { 329 out.Name = current.Rkey 330 } 331 out.Knot = record.Knot 332 out.Description = derefString(record.Description) 333 out.Website = derefString(record.Website) 334 out.Topics = append([]string(nil), record.Topics...) 335 out.Spindle = derefString(record.Spindle) 336 out.Source = derefString(record.Source) 337 out.Labels = append([]string(nil), record.Labels...) 338 if record.RepoDid != nil { 339 out.RepoDid = *record.RepoDid 340 } 341 return out 342} 343 344func repoMetadataChanged(current *models.Repo, desired *models.Repo) bool { 345 return current.Name != desired.Name || 346 current.Knot != desired.Knot || 347 current.Description != desired.Description || 348 current.Website != desired.Website || 349 current.TopicStr() != desired.TopicStr() || 350 current.Spindle != desired.Spindle || 351 !labelsEqual(current.Labels, desired.Labels) 352} 353 354func derefString(s *string) string { 355 if s == nil { 356 return "" 357 } 358 return *s 359} 360 361func (i *Ingester) verifyOwnership(ctx context.Context, l *slog.Logger, repoDid, eventDid, recordKnot string) (bool, error) { 362 if i.Verifier == nil { 363 return false, fmt.Errorf("ingester has no repo ownership verifier configured") 364 } 365 rd, err := repoverify.NewRepoDid(repoDid) 366 if err != nil { 367 l.Warn("rejecting repo event: invalid repoDid on record", "repoDid", repoDid, "err", err) 368 return false, nil 369 } 370 result, err := i.Verifier(ctx, rd) 371 if err != nil { 372 return false, fmt.Errorf("verify repo ownership: %w", err) 373 } 374 if result.OwnerDid == "" { 375 l.Warn("knot lacks RepoDescribeRepo, skipping owner check; upgrade knot to 1.14+", 376 "repoDid", repoDid, "knot", result.KnotURL.String()) 377 } else if result.OwnerDid.String() != eventDid { 378 l.Warn("rejecting repo event: owner mismatch", 379 "repoDid", repoDid, 380 "claimedOwner", eventDid, 381 "knotOwner", result.OwnerDid.String(), 382 "knot", result.KnotURL.String(), 383 ) 384 return false, nil 385 } 386 if !strings.EqualFold(recordKnot, result.KnotURL.Host) { 387 l.Warn("rejecting repo event: record knot does not match DID-doc endpoint", 388 "repoDid", repoDid, 389 "recordKnot", recordKnot, 390 "canonicalKnot", result.KnotURL.Host, 391 ) 392 return false, nil 393 } 394 return true, nil 395}