Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "slices" 11 "strings" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/repoverify" 19 "tangled.org/core/orm" 20) 21 22func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 23 l = l.With("handler", "ingestRepo") 24 25 switch e.Commit.Operation { 26 case jmodels.CommitOperationCreate: 27 return i.ingestRepoCreate(ctx, e, l) 28 case jmodels.CommitOperationUpdate: 29 return i.ingestRepoUpdate(ctx, e, l) 30 case jmodels.CommitOperationDelete: 31 return i.ingestRepoDelete(ctx, e, l) 32 default: 33 l.Info("unknown repo operation") 34 return nil 35 } 36} 37 38func (i *Ingester) ingestRepoCreate(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 39 l = l.With("handler", "ingestRepoCreate") 40 41 record := tangled.Repo{} 42 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil { 43 l.Error("invalid record", "err", err) 44 return err 45 } 46 47 if record.RepoDid == nil || *record.RepoDid == "" { 48 l.Info("skipping repo create from non-DID-migrated knot") 49 return nil 50 } 51 repoDid := *record.RepoDid 52 53 proceed, err := i.verifyOwnership(ctx, l, repoDid, e.Did, record.Knot) 54 if err != nil { 55 return err 56 } 57 if !proceed { 58 return nil 59 } 60 61 existing, err := db.GetRepo(i.Db, 62 orm.FilterEq("did", e.Did), 63 orm.FilterEq("rkey", e.Commit.RKey), 64 ) 65 if err == nil { 66 l.Info("repo row already exists, skipping create", "did", e.Did, "rkey", e.Commit.RKey) 67 if err := i.ensureRepoOwnerPermissions(e.Did, existing.Knot, existing.RepoIdentifier()); err != nil { 68 return fmt.Errorf("failed to ensure repo owner permissions: %w", err) 69 } 70 return nil 71 } 72 if !errors.Is(err, sql.ErrNoRows) { 73 return fmt.Errorf("failed to check existing repo: %w", err) 74 } 75 76 prev, err := db.GetRepoByDid(i.Db, repoDid) 77 if err != nil && !errors.Is(err, sql.ErrNoRows) { 78 return fmt.Errorf("failed to check existing repoDid: %w", err) 79 } 80 81 if prev != nil { 82 l.Info("repoDid exists under different rkey, renaming", 83 "oldRkey", prev.Rkey, "newRkey", e.Commit.RKey) 84 85 oldRepo := *prev 86 87 tx, txErr := i.Db.Begin() 88 if txErr != nil { 89 return fmt.Errorf("failed to begin rename tx: %w", txErr) 90 } 91 defer tx.Rollback() 92 93 newName := derefString(record.Name) 94 if newName == "" { 95 newName = e.Commit.RKey 96 } 97 98 if err := db.RenameRepo(tx, e.Did, prev.Rkey, e.Commit.RKey, newName); err != nil { 99 return fmt.Errorf("failed to rename repo: %w", err) 100 } 101 if err := db.RecordRepoRename(tx, e.Did, prev.Rkey, repoDid); err != nil { 102 return fmt.Errorf("failed to record rename history: %w", err) 103 } 104 105 renamed := *prev 106 renamed.Rkey = e.Commit.RKey 107 renamed.Name = newName 108 desired := repoFromRecord(&renamed, &record) 109 if repoMetadataChanged(&renamed, &desired) { 110 if err := applyRepoMetadata(tx, &renamed, desired); err != nil { 111 return fmt.Errorf("failed to apply metadata after rename: %w", err) 112 } 113 } 114 115 if err := tx.Commit(); err != nil { 116 return fmt.Errorf("failed to commit rename tx: %w", err) 117 } 118 119 newRepo, err := db.GetRepo(i.Db, 120 orm.FilterEq("did", e.Did), 121 orm.FilterEq("rkey", e.Commit.RKey), 122 ) 123 if err != nil { 124 l.Warn("failed to fetch repo after rename for notification", "err", err) 125 return nil 126 } 127 if err := i.ensureRepoOwnerPermissions(e.Did, newRepo.Knot, newRepo.RepoIdentifier()); err != nil { 128 return fmt.Errorf("failed to ensure repo owner permissions: %w", err) 129 } 130 i.Notifier.RenameRepo(ctx, syntax.DID(e.Did), &oldRepo, newRepo) 131 return nil 132 } 133 134 rkey := e.Commit.RKey 135 name := derefString(record.Name) 136 if name == "" { 137 name = rkey 138 } 139 140 repo := &models.Repo{ 141 Did: e.Did, 142 Name: name, 143 Knot: record.Knot, 144 Rkey: rkey, 145 Description: derefString(record.Description), 146 Website: derefString(record.Website), 147 Topics: append([]string(nil), record.Topics...), 148 Source: derefString(record.Source), 149 Spindle: derefString(record.Spindle), 150 Labels: append([]string(nil), record.Labels...), 151 RepoDid: repoDid, 152 } 153 154 tx, err := i.Db.Begin() 155 if err != nil { 156 return fmt.Errorf("failed to begin insert tx: %w", err) 157 } 158 defer tx.Rollback() 159 160 if err := db.AddRepo(tx, repo); err != nil { 161 return fmt.Errorf("failed to insert repo: %w", err) 162 } 163 if err := tx.Commit(); err != nil { 164 return fmt.Errorf("failed to commit insert tx: %w", err) 165 } 166 167 if err := i.ensureRepoOwnerPermissions(e.Did, repo.Knot, repo.RepoIdentifier()); err != nil { 168 return fmt.Errorf("failed to ensure repo owner permissions: %w", err) 169 } 170 171 i.Notifier.NewRepo(ctx, repo) 172 return nil 173} 174 175func (i *Ingester) ensureRepoOwnerPermissions(ownerDid, knot, repo string) error { 176 if i.Enforcer == nil { 177 return fmt.Errorf("ingester has no RBAC enforcer configured") 178 } 179 if err := i.Enforcer.AddRepo(ownerDid, knot, repo); err != nil { 180 return err 181 } 182 return i.Enforcer.E.SavePolicy() 183} 184 185func (i *Ingester) ingestRepoUpdate(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 186 l = l.With("handler", "ingestRepoUpdate") 187 188 record := tangled.Repo{} 189 if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil { 190 l.Error("invalid record", "err", err) 191 return err 192 } 193 194 if record.RepoDid == nil || *record.RepoDid == "" { 195 l.Info("skipping repo update from non-DID-migrated knot") 196 return nil 197 } 198 199 proceed, err := i.verifyOwnership(ctx, l, *record.RepoDid, e.Did, record.Knot) 200 if err != nil { 201 return err 202 } 203 if !proceed { 204 return nil 205 } 206 207 current, err := db.GetRepo(i.Db, 208 orm.FilterEq("did", e.Did), 209 orm.FilterEq("rkey", e.Commit.RKey), 210 ) 211 if err != nil { 212 if errors.Is(err, sql.ErrNoRows) { 213 l.Info("skipping repo update for unknown row") 214 return nil 215 } 216 return fmt.Errorf("failed to fetch repo for ingest: %w", err) 217 } 218 219 if current.RepoDid != "" && current.RepoDid != *record.RepoDid { 220 l.Warn("rejecting repo update: repoDid is immutable", 221 "currentRepoDid", current.RepoDid, 222 "recordRepoDid", *record.RepoDid, 223 ) 224 return nil 225 } 226 227 desired := repoFromRecord(current, &record) 228 229 if current.Source != desired.Source { 230 l.Warn("source field changed but mutation is unsupported, ignoring", 231 "current", current.Source, "desired", desired.Source) 232 } 233 234 if !repoMetadataChanged(current, &desired) { 235 return nil 236 } 237 238 tx, err := i.Db.Begin() 239 if err != nil { 240 return fmt.Errorf("failed to begin tx: %w", err) 241 } 242 defer tx.Rollback() 243 244 if err := applyRepoMetadata(tx, current, desired); err != nil { 245 return fmt.Errorf("failed to apply repo metadata: %w", err) 246 } 247 return tx.Commit() 248} 249 250func (i *Ingester) ingestRepoDelete(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 251 l = l.With("handler", "ingestRepoDelete") 252 253 repo, err := db.GetRepo(i.Db, 254 orm.FilterEq("did", e.Did), 255 orm.FilterEq("rkey", e.Commit.RKey), 256 ) 257 if err != nil { 258 if errors.Is(err, sql.ErrNoRows) { 259 l.Info("skipping repo delete for unknown row") 260 return nil 261 } 262 return fmt.Errorf("failed to fetch repo for delete: %w", err) 263 } 264 265 if i.Enforcer == nil { 266 return fmt.Errorf("ingester has no RBAC enforcer configured") 267 } 268 269 tx, err := i.Db.Begin() 270 if err != nil { 271 return fmt.Errorf("failed to start txn: %w", err) 272 } 273 committed := false 274 defer func() { 275 if committed { 276 return 277 } 278 tx.Rollback() 279 i.Enforcer.E.LoadPolicy() 280 }() 281 282 if err := db.RemoveRepo(tx, e.Did, e.Commit.RKey); err != nil { 283 return fmt.Errorf("failed to delete repo: %w", err) 284 } 285 286 if err := i.Enforcer.WipeRepoPolicies(repo.Knot, repo.RepoIdentifier()); err != nil { 287 return fmt.Errorf("failed to wipe repo permissions: %w", err) 288 } 289 290 if err := tx.Commit(); err != nil { 291 return fmt.Errorf("failed to commit txn: %w", err) 292 } 293 294 if err := i.Enforcer.E.SavePolicy(); err != nil { 295 return fmt.Errorf("failed to save ACLs: %w", err) 296 } 297 committed = true 298 299 i.Notifier.DeleteRepo(ctx, repo) 300 l.Info("deleted repo row") 301 return nil 302} 303 304func applyRepoMetadata(tx *sql.Tx, current *models.Repo, desired models.Repo) error { 305 if err := db.PutRepo(tx, desired); err != nil { 306 return err 307 } 308 309 if current.Spindle != desired.Spindle { 310 var spindlePtr *string 311 if desired.Spindle != "" { 312 spindlePtr = &desired.Spindle 313 } 314 if err := db.UpdateSpindle(tx, desired.RepoDid, spindlePtr); err != nil { 315 return err 316 } 317 } 318 319 if !labelsEqual(current.Labels, desired.Labels) { 320 if err := reconcileLabels(tx, current, desired); err != nil { 321 return err 322 } 323 } 324 325 return nil 326} 327 328func reconcileLabels(tx *sql.Tx, current *models.Repo, desired models.Repo) error { 329 added := filterOut(desired.Labels, current.Labels) 330 removed := filterOut(current.Labels, desired.Labels) 331 332 if err := applyEach(added, func(l string) error { 333 return db.SubscribeLabel(tx, &models.RepoLabel{ 334 RepoDid: syntax.DID(desired.RepoDid), 335 LabelAt: syntax.ATURI(l), 336 }) 337 }); err != nil { 338 return err 339 } 340 341 return applyEach(removed, func(l string) error { 342 return db.UnsubscribeLabel(tx, 343 orm.FilterEq("repo_did", desired.RepoDid), 344 orm.FilterEq("label_at", l), 345 ) 346 }) 347} 348 349func filterOut(items, exclude []string) []string { 350 return slices.DeleteFunc(slices.Clone(items), func(s string) bool { 351 return slices.Contains(exclude, s) 352 }) 353} 354 355func applyEach(items []string, fn func(string) error) error { 356 for _, item := range items { 357 if err := fn(item); err != nil { 358 return err 359 } 360 } 361 return nil 362} 363 364func labelsEqual(a, b []string) bool { 365 if len(a) != len(b) { 366 return false 367 } 368 aSorted := append([]string(nil), a...) 369 bSorted := append([]string(nil), b...) 370 slices.Sort(aSorted) 371 slices.Sort(bSorted) 372 return slices.Equal(aSorted, bSorted) 373} 374 375func repoFromRecord(current *models.Repo, record *tangled.Repo) models.Repo { 376 out := *current 377 out.Name = derefString(record.Name) 378 if out.Name == "" { 379 out.Name = current.Rkey 380 } 381 out.Knot = record.Knot 382 out.Description = derefString(record.Description) 383 out.Website = derefString(record.Website) 384 out.Topics = append([]string(nil), record.Topics...) 385 out.Spindle = derefString(record.Spindle) 386 out.Source = derefString(record.Source) 387 out.Labels = append([]string(nil), record.Labels...) 388 if record.RepoDid != nil { 389 out.RepoDid = *record.RepoDid 390 } 391 return out 392} 393 394func repoMetadataChanged(current *models.Repo, desired *models.Repo) bool { 395 return current.Name != desired.Name || 396 current.Knot != desired.Knot || 397 current.Description != desired.Description || 398 current.Website != desired.Website || 399 current.TopicStr() != desired.TopicStr() || 400 current.Spindle != desired.Spindle || 401 !labelsEqual(current.Labels, desired.Labels) 402} 403 404func derefString(s *string) string { 405 if s == nil { 406 return "" 407 } 408 return *s 409} 410 411func (i *Ingester) verifyOwnership(ctx context.Context, l *slog.Logger, repoDid, eventDid, recordKnot string) (bool, error) { 412 if i.Verifier == nil { 413 return false, fmt.Errorf("ingester has no repo ownership verifier configured") 414 } 415 rd, err := repoverify.NewRepoDid(repoDid) 416 if err != nil { 417 l.Warn("rejecting repo event: invalid repoDid on record", "repoDid", repoDid, "err", err) 418 return false, nil 419 } 420 result, err := i.Verifier(ctx, rd) 421 if err != nil { 422 return false, fmt.Errorf("verify repo ownership: %w", err) 423 } 424 if result.OwnerDid == "" { 425 l.Warn("knot lacks RepoDescribeRepo, skipping owner check; upgrade knot to 1.14+", 426 "repoDid", repoDid, "knot", result.KnotURL.String()) 427 } else if result.OwnerDid.String() != eventDid { 428 l.Warn("rejecting repo event: owner mismatch", 429 "repoDid", repoDid, 430 "claimedOwner", eventDid, 431 "knotOwner", result.OwnerDid.String(), 432 "knot", result.KnotURL.String(), 433 ) 434 return false, nil 435 } 436 if !strings.EqualFold(recordKnot, result.KnotURL.Host) { 437 l.Warn("rejecting repo event: record knot does not match DID-doc endpoint", 438 "repoDid", repoDid, 439 "recordKnot", recordKnot, 440 "canonicalKnot", result.KnotURL.Host, 441 ) 442 return false, nil 443 } 444 return true, nil 445}