Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

at icy/lqyotq 17 kB View raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "strings" 11 "time" 12 13 "tangled.org/core/appview/cloudflare" 14 "tangled.org/core/appview/notify" 15 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/knotacl" 20 "tangled.org/core/appview/knotcompat" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/sites" 23 "tangled.org/core/consts" 24 ec "tangled.org/core/eventconsumer" 25 "tangled.org/core/eventstream" 26 knotdb "tangled.org/core/knotserver/db" 27 "tangled.org/core/log" 28 "tangled.org/core/orm" 29 "tangled.org/core/rbac" 30 "tangled.org/core/workflow" 31 32 "github.com/bluesky-social/indigo/atproto/syntax" 33 "github.com/go-git/go-git/v5/plumbing" 34 "github.com/posthog/posthog-go" 35) 36 37type aclRoster interface { 38 AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error 39 RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error 40 AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error 41 RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error 42 InvalidateMembers(host string) 43 InvalidateCollaborators(host, repoDid string) 44} 45 46func Knotstream(ctx context.Context, c *config.Config, d *db.DB, acl *knotacl.Service, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 47 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null")) 48 if err != nil { 49 return nil, err 50 } 51 52 hosts := make([]string, len(knots)) 53 for i, k := range knots { 54 hosts[i] = k.Domain 55 } 56 57 return bootstrapStream( 58 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr, 59 c.Knotstream, c.Core.Dev, 60 knotIngester(d, acl, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 61 ), nil 62} 63 64func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { 65 if repoDid != nil && *repoDid != "" { 66 return db.GetRepoByDid(d, *repoDid) 67 } 68 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName))) 69 if err != nil { 70 return nil, err 71 } 72 if len(repos) == 0 { 73 return nil, sql.ErrNoRows 74 } 75 return &repos[0], nil 76} 77 78func knotIngester(d *db.DB, acl aclRoster, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 79 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 80 switch msg.Nsid { 81 case tangled.GitRefUpdateNSID: 82 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 83 case tangled.PipelineNSID: 84 return ingestPipeline(d, source, msg) 85 case knotdb.RepoDIDAssignNSID: 86 return ingestDIDAssign(d, enforcer, source, msg, ctx) 87 case knotdb.KnotMemberUpdateNSID: 88 return ingestKnotMemberUpdate(acl, source, msg) 89 case knotdb.RepoCollaboratorUpdateNSID: 90 return ingestCollaboratorUpdate(ctx, d, acl, source, msg) 91 } 92 93 return nil 94 } 95} 96 97const ( 98 aclIngestAttempts = 3 99 aclIngestBackoff = 50 * time.Millisecond 100) 101 102func withAclRetry(attempts int, backoff time.Duration, op func() error) error { 103 if err := op(); err == nil || attempts <= 1 { 104 return err 105 } 106 time.Sleep(backoff) 107 return withAclRetry(attempts-1, backoff, op) 108} 109 110func ingestKnotMemberUpdate(acl aclRoster, source ec.Source, msg eventstream.Event) error { 111 var rec knotdb.KnotMemberUpdate 112 if err := json.Unmarshal(msg.EventJson, &rec); err != nil { 113 return fmt.Errorf("unmarshal memberUpdate: %w", err) 114 } 115 116 subject, err := syntax.ParseDID(rec.Subject) 117 if err != nil { 118 return fmt.Errorf("memberUpdate bad subject %q: %w", rec.Subject, err) 119 } 120 121 cursor := knotacl.Cursor(msg.Created) 122 switch rec.Op { 123 case knotdb.AclOpAdd: 124 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 125 return acl.AddKnotMember(source.Host, subject, cursor) 126 }) 127 case knotdb.AclOpRemove: 128 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 129 return acl.RemoveKnotMember(source.Host, subject, cursor) 130 }) 131 default: 132 return fmt.Errorf("memberUpdate unknown op %q", rec.Op) 133 } 134 135 if err != nil { 136 acl.InvalidateMembers(source.Host) 137 } 138 return err 139} 140 141func ingestCollaboratorUpdate(ctx context.Context, d *db.DB, acl aclRoster, source ec.Source, msg eventstream.Event) error { 142 var rec knotdb.RepoCollaboratorUpdate 143 if err := json.Unmarshal(msg.EventJson, &rec); err != nil { 144 return fmt.Errorf("unmarshal collaboratorUpdate: %w", err) 145 } 146 147 subject, err := syntax.ParseDID(rec.Subject) 148 if err != nil { 149 return fmt.Errorf("collaboratorUpdate bad subject %q: %w", rec.Subject, err) 150 } 151 repoDid, err := syntax.ParseDID(rec.Repo) 152 if err != nil { 153 return fmt.Errorf("collaboratorUpdate bad repo %q: %w", rec.Repo, err) 154 } 155 156 cursor := knotacl.Cursor(msg.Created) 157 switch rec.Op { 158 case knotdb.AclOpAdd: 159 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 160 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject) 161 if err != nil || !owned { 162 return err 163 } 164 return acl.AddCollaborator(repoDid, subject, cursor) 165 }) 166 case knotdb.AclOpRemove: 167 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 168 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject) 169 if err != nil || !owned { 170 return err 171 } 172 return acl.RemoveCollaborator(repoDid, subject, cursor) 173 }) 174 default: 175 return fmt.Errorf("collaboratorUpdate unknown op %q", rec.Op) 176 } 177 178 if err != nil { 179 acl.InvalidateCollaborators(source.Host, repoDid.String()) 180 } 181 return err 182} 183 184func repoOwnedBySource(ctx context.Context, d *db.DB, source ec.Source, repoDid, subject syntax.DID) (bool, error) { 185 repo, err := db.GetRepoByDid(d, repoDid.String()) 186 if errors.Is(err, sql.ErrNoRows) { 187 log.FromContext(ctx).Warn("collaboratorUpdate for unindexed repo, skipping until reconcile", 188 "repo_did", repoDid, "subject", subject) 189 return false, nil 190 } 191 if err != nil { 192 return false, err 193 } 194 if repo.Knot != source.Host { 195 log.FromContext(ctx).Warn("collaboratorUpdate for a repo this knot does not host, dropping", 196 "repo_did", repoDid, "subject", subject, "claimed_by", source.Host, "owner", repo.Knot) 197 return false, nil 198 } 199 return true, nil 200} 201 202// TODO(boltless): remove this. knotmirror should do all sort of indexing 203func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error { 204 logger := log.FromContext(ctx) 205 206 var record tangled.GitRefUpdate 207 err := json.Unmarshal(msg.EventJson, &record) 208 if err != nil { 209 return err 210 } 211 212 if !knotcompat.KnotHasCapability(ctx, source.Host, dev, consts.CapKnotACL) { 213 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 214 switch { 215 case err != nil: 216 logger.Warn("gitRefUpdate membership lookup failed, ingesting without the sanity check", "committer", record.CommitterDid, "knot", source.Host, "err", err) 217 case !slices.Contains(knownKnots, source.Host): 218 logger.Warn("gitRefUpdate committer is not a known member of the knot, ingesting anyway", "committer", record.CommitterDid, "knot", source.Host) 219 } 220 } 221 222 if record.Repo == "" { 223 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host) 224 } 225 226 repo, lookupErr := db.GetRepoByDid(d, record.Repo) 227 if lookupErr != nil { 228 return fmt.Errorf("failed to look up repo: %w", lookupErr) 229 } 230 231 logger.Info("processing gitRefUpdate event", 232 "repo", repo.RepoIdentifier(), 233 "ref", record.Ref, 234 "old_sha", record.OldSha, 235 "new_sha", record.NewSha) 236 237 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 238 239 errPunchcard := populatePunchcard(d, record) 240 errLanguages := updateRepoLanguages(d, record) 241 242 var errPosthog error 243 if !dev && record.CommitterDid != "" { 244 errPosthog = pc.Enqueue(posthog.Capture{ 245 DistinctId: record.CommitterDid, 246 Event: "git_ref_update", 247 }) 248 } 249 250 // Trigger a sites redeploy if this push is to the configured sites branch. 251 if cfClient.Enabled() { 252 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 253 } 254 255 return errors.Join(errPunchcard, errLanguages, errPosthog) 256} 257 258// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 259// branch configured for this repo and, if so, syncs the site to R2 260func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) { 261 logger := log.FromContext(ctx) 262 263 ref := plumbing.ReferenceName(record.Ref) 264 if !ref.IsBranch() { 265 return 266 } 267 pushedBranch := ref.Short() 268 269 repo, err := db.GetRepoByDid(d, record.Repo) 270 if err != nil { 271 return 272 } 273 274 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid) 275 if err != nil || siteConfig == nil { 276 return 277 } 278 if siteConfig.Branch != pushedBranch { 279 return 280 } 281 282 deploy := &models.SiteDeploy{ 283 RepoDid: syntax.DID(repo.RepoDid), 284 Branch: siteConfig.Branch, 285 Dir: siteConfig.Dir, 286 CommitSHA: record.NewSha, 287 Trigger: models.SiteDeployTriggerPush, 288 } 289 290 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir) 291 if deployErr != nil { 292 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 293 deploy.Status = models.SiteDeployStatusFailure 294 deploy.Error = deployErr.Error() 295 } else { 296 deploy.Status = models.SiteDeployStatusSuccess 297 } 298 299 if err := db.AddSiteDeploy(d, deploy); err != nil { 300 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 301 } 302 303 if deployErr == nil { 304 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 305 } 306} 307 308func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 309 if record.CommitterDid == "" { 310 return nil 311 } 312 313 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 314 if err != nil { 315 return err 316 } 317 318 count := 0 319 for _, ke := range knownEmails { 320 if record.Meta == nil { 321 continue 322 } 323 if record.Meta.CommitCount == nil { 324 continue 325 } 326 for _, ce := range record.Meta.CommitCount.ByEmail { 327 if ce == nil { 328 continue 329 } 330 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 331 count += int(ce.Count) 332 } 333 } 334 } 335 336 punch := models.Punch{ 337 Did: record.CommitterDid, 338 Date: time.Now(), 339 Count: count, 340 } 341 return db.AddPunch(d, punch) 342} 343 344func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 345 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 346 return fmt.Errorf("empty language data for repo: %s", record.Repo) 347 } 348 349 r, lookupErr := db.GetRepoByDid(d, record.Repo) 350 if lookupErr != nil { 351 return fmt.Errorf("failed to look up repo: %w", lookupErr) 352 } 353 repo := *r 354 355 ref := plumbing.ReferenceName(record.Ref) 356 if !ref.IsBranch() { 357 return fmt.Errorf("%s is not a valid reference name", ref) 358 } 359 360 var langs []models.RepoLanguage 361 for _, l := range record.Meta.LangBreakdown.Inputs { 362 if l == nil { 363 continue 364 } 365 366 langs = append(langs, models.RepoLanguage{ 367 RepoDid: syntax.DID(repo.RepoDid), 368 Ref: ref.Short(), 369 IsDefaultRef: record.Meta.IsDefaultRef, 370 Language: l.Lang, 371 Bytes: l.Size, 372 }) 373 } 374 375 tx, err := d.Begin() 376 if err != nil { 377 return err 378 } 379 defer tx.Rollback() 380 381 // update appview's cache 382 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs) 383 if err != nil { 384 fmt.Printf("failed; %s\n", err) 385 // non-fatal 386 } 387 388 return tx.Commit() 389} 390 391func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error { 392 var record tangled.Pipeline 393 err := json.Unmarshal(msg.EventJson, &record) 394 if err != nil { 395 return err 396 } 397 398 if record.TriggerMetadata == nil { 399 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 400 } 401 402 if record.TriggerMetadata.Repo == nil { 403 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 404 } 405 406 repoName := "" 407 if record.TriggerMetadata.Repo.Repo != nil { 408 repoName = *record.TriggerMetadata.Repo.Repo 409 } 410 411 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 412 if lookupErr != nil { 413 return fmt.Errorf("failed to look up repo: %w", lookupErr) 414 } 415 if repo.Spindle == "" { 416 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 417 } 418 419 // trigger info 420 var trigger models.Trigger 421 var sha string 422 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 423 switch trigger.Kind { 424 case workflow.TriggerKindPush: 425 trigger.PushRef = &record.TriggerMetadata.Push.Ref 426 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 427 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 428 sha = *trigger.PushNewSha 429 case workflow.TriggerKindPullRequest: 430 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 431 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 432 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 433 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 434 sha = *trigger.PRSourceSha 435 } 436 437 tx, err := d.Begin() 438 if err != nil { 439 return fmt.Errorf("failed to start txn: %w", err) 440 } 441 442 triggerId, err := db.AddTrigger(tx, trigger) 443 if err != nil { 444 return fmt.Errorf("failed to add trigger entry: %w", err) 445 } 446 447 pipeline := models.Pipeline{ 448 Rkey: msg.Rkey, 449 Knot: source.Host, 450 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 451 RepoName: repoName, 452 RepoDid: repo.RepoDid, 453 TriggerId: int(triggerId), 454 Sha: sha, 455 } 456 457 err = db.AddPipeline(tx, pipeline) 458 if err != nil { 459 return fmt.Errorf("failed to add pipeline: %w", err) 460 } 461 462 err = tx.Commit() 463 if err != nil { 464 return fmt.Errorf("failed to commit txn: %w", err) 465 } 466 467 return nil 468} 469 470func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error { 471 logger := log.FromContext(ctx) 472 473 var record knotdb.RepoDIDAssign 474 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 475 return fmt.Errorf("unmarshal didAssign: %w", err) 476 } 477 478 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 479 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 480 record.RepoDid, record.OwnerDid, record.RepoName) 481 } 482 483 logger.Info("processing didAssign event", 484 "repo_did", record.RepoDid, 485 "owner_did", record.OwnerDid, 486 "repo_name", record.RepoName) 487 488 repos, err := db.GetRepos(d, 489 orm.FilterEq("did", record.OwnerDid), 490 orm.FilterEq("name", record.RepoName), 491 ) 492 if err != nil || len(repos) == 0 { 493 logger.Warn("didAssign for unknown repo, skipping", 494 "owner_did", record.OwnerDid, 495 "repo_name", record.RepoName) 496 return nil 497 } 498 repo := repos[0] 499 knot := source.Host 500 501 if repo.Knot != knot { 502 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot) 503 } 504 505 repoAtUri := repo.RepoAt().String() 506 legacyResource := record.OwnerDid + "/" + record.RepoName 507 508 if repo.RepoDid != record.RepoDid { 509 tx, err := d.Begin() 510 if err != nil { 511 return fmt.Errorf("begin didAssign txn: %w", err) 512 } 513 defer tx.Rollback() 514 515 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 516 return fmt.Errorf("cascade repo_did: %w", err) 517 } 518 519 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 520 return fmt.Errorf("enqueue pds rewrites: %w", err) 521 } 522 523 if err := tx.Commit(); err != nil { 524 return fmt.Errorf("commit didAssign txn: %w", err) 525 } 526 } 527 528 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 529 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 530 } 531 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 532 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 533 } 534 535 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid)) 536 if collabErr != nil { 537 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 538 } 539 for _, c := range collabs { 540 collabDid := c.SubjectDid.String() 541 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 542 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 543 } 544 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 545 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 546 } 547 } 548 549 if err := enforcer.E.SavePolicy(); err != nil { 550 return fmt.Errorf("save RBAC policies after didAssign: %w", err) 551 } 552 553 logger.Info("didAssign processed successfully", 554 "repo_did", record.RepoDid, 555 "owner_did", record.OwnerDid, 556 "repo_name", record.RepoName) 557 558 return nil 559}