Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "strings" 11 "time" 12 13 "tangled.org/core/appview/cloudflare" 14 "tangled.org/core/appview/notify" 15 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/knotacl" 20 "tangled.org/core/appview/knotcompat" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/sites" 23 "tangled.org/core/consts" 24 ec "tangled.org/core/eventconsumer" 25 "tangled.org/core/eventstream" 26 knotdb "tangled.org/core/knotserver/db" 27 "tangled.org/core/log" 28 "tangled.org/core/orm" 29 "tangled.org/core/rbac" 30 31 "github.com/bluesky-social/indigo/atproto/syntax" 32 "github.com/go-git/go-git/v5/plumbing" 33 "github.com/posthog/posthog-go" 34) 35 36type aclRoster interface { 37 AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error 38 RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error 39 AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error 40 RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error 41 InvalidateMembers(host string) 42 InvalidateCollaborators(host, repoDid string) 43} 44 45func Knotstream(ctx context.Context, c *config.Config, d *db.DB, acl *knotacl.Service, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 46 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null")) 47 if err != nil { 48 return nil, err 49 } 50 51 hosts := make([]string, len(knots)) 52 for i, k := range knots { 53 hosts[i] = k.Domain 54 } 55 56 return bootstrapStream( 57 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr, 58 c.Knotstream, 59 knotIngester(d, acl, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 60 ), nil 61} 62 63func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { 64 if repoDid != nil && *repoDid != "" { 65 return db.GetRepoByDid(d, *repoDid) 66 } 67 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName))) 68 if err != nil { 69 return nil, err 70 } 71 if len(repos) == 0 { 72 return nil, sql.ErrNoRows 73 } 74 return &repos[0], nil 75} 76 77func knotIngester(d *db.DB, acl aclRoster, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 78 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 79 switch msg.Nsid { 80 case tangled.GitRefUpdateNSID: 81 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 82 case knotdb.RepoDIDAssignNSID: 83 return ingestDIDAssign(d, enforcer, source, msg, ctx) 84 case knotdb.KnotMemberUpdateNSID: 85 return ingestKnotMemberUpdate(acl, source, msg) 86 case knotdb.RepoCollaboratorUpdateNSID: 87 return ingestCollaboratorUpdate(ctx, d, acl, source, msg) 88 } 89 90 return nil 91 } 92} 93 94const ( 95 aclIngestAttempts = 3 96 aclIngestBackoff = 50 * time.Millisecond 97) 98 99func withAclRetry(attempts int, backoff time.Duration, op func() error) error { 100 if err := op(); err == nil || attempts <= 1 { 101 return err 102 } 103 time.Sleep(backoff) 104 return withAclRetry(attempts-1, backoff, op) 105} 106 107func ingestKnotMemberUpdate(acl aclRoster, source ec.Source, msg eventstream.Event) error { 108 var rec knotdb.KnotMemberUpdate 109 if err := json.Unmarshal(msg.EventJson, &rec); err != nil { 110 return fmt.Errorf("unmarshal memberUpdate: %w", err) 111 } 112 113 subject, err := syntax.ParseDID(rec.Subject) 114 if err != nil { 115 return fmt.Errorf("memberUpdate bad subject %q: %w", rec.Subject, err) 116 } 117 118 cursor := knotacl.Cursor(msg.Created) 119 switch rec.Op { 120 case knotdb.AclOpAdd: 121 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 122 return acl.AddKnotMember(source.Host, subject, cursor) 123 }) 124 case knotdb.AclOpRemove: 125 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 126 return acl.RemoveKnotMember(source.Host, subject, cursor) 127 }) 128 default: 129 return fmt.Errorf("memberUpdate unknown op %q", rec.Op) 130 } 131 132 if err != nil { 133 acl.InvalidateMembers(source.Host) 134 } 135 return err 136} 137 138func ingestCollaboratorUpdate(ctx context.Context, d *db.DB, acl aclRoster, source ec.Source, msg eventstream.Event) error { 139 var rec knotdb.RepoCollaboratorUpdate 140 if err := json.Unmarshal(msg.EventJson, &rec); err != nil { 141 return fmt.Errorf("unmarshal collaboratorUpdate: %w", err) 142 } 143 144 subject, err := syntax.ParseDID(rec.Subject) 145 if err != nil { 146 return fmt.Errorf("collaboratorUpdate bad subject %q: %w", rec.Subject, err) 147 } 148 repoDid, err := syntax.ParseDID(rec.Repo) 149 if err != nil { 150 return fmt.Errorf("collaboratorUpdate bad repo %q: %w", rec.Repo, err) 151 } 152 153 cursor := knotacl.Cursor(msg.Created) 154 switch rec.Op { 155 case knotdb.AclOpAdd: 156 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 157 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject) 158 if err != nil || !owned { 159 return err 160 } 161 return acl.AddCollaborator(repoDid, subject, cursor) 162 }) 163 case knotdb.AclOpRemove: 164 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error { 165 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject) 166 if err != nil || !owned { 167 return err 168 } 169 return acl.RemoveCollaborator(repoDid, subject, cursor) 170 }) 171 default: 172 return fmt.Errorf("collaboratorUpdate unknown op %q", rec.Op) 173 } 174 175 if err != nil { 176 acl.InvalidateCollaborators(source.Host, repoDid.String()) 177 } 178 return err 179} 180 181func repoOwnedBySource(ctx context.Context, d *db.DB, source ec.Source, repoDid, subject syntax.DID) (bool, error) { 182 repo, err := db.GetRepoByDid(d, repoDid.String()) 183 if errors.Is(err, sql.ErrNoRows) { 184 log.FromContext(ctx).Warn("collaboratorUpdate for unindexed repo, skipping until reconcile", 185 "repo_did", repoDid, "subject", subject) 186 return false, nil 187 } 188 if err != nil { 189 return false, err 190 } 191 if repo.Knot != source.Host { 192 log.FromContext(ctx).Warn("collaboratorUpdate for a repo this knot does not host, dropping", 193 "repo_did", repoDid, "subject", subject, "claimed_by", source.Host, "owner", repo.Knot) 194 return false, nil 195 } 196 return true, nil 197} 198 199// TODO(boltless): remove this. knotmirror should do all sort of indexing 200func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error { 201 logger := log.FromContext(ctx) 202 203 var record tangled.GitRefUpdate 204 err := json.Unmarshal(msg.EventJson, &record) 205 if err != nil { 206 return err 207 } 208 209 if !knotcompat.KnotHasCapability(ctx, source.Host, dev, consts.CapKnotACL) { 210 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 211 switch { 212 case err != nil: 213 logger.Warn("gitRefUpdate membership lookup failed, ingesting without the sanity check", "committer", record.CommitterDid, "knot", source.Host, "err", err) 214 case !slices.Contains(knownKnots, source.Host): 215 logger.Warn("gitRefUpdate committer is not a known member of the knot, ingesting anyway", "committer", record.CommitterDid, "knot", source.Host) 216 } 217 } 218 219 if record.Repo == "" { 220 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host) 221 } 222 223 repo, lookupErr := db.GetRepoByDid(d, record.Repo) 224 if lookupErr != nil { 225 return fmt.Errorf("failed to look up repo: %w", lookupErr) 226 } 227 228 logger.Info("processing gitRefUpdate event", 229 "repo", repo.RepoIdentifier(), 230 "ref", record.Ref, 231 "old_sha", record.OldSha, 232 "new_sha", record.NewSha) 233 234 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 235 236 errPunchcard := populatePunchcard(d, record) 237 errLanguages := updateRepoLanguages(d, record) 238 239 var errPosthog error 240 if !dev && record.CommitterDid != "" { 241 errPosthog = pc.Enqueue(posthog.Capture{ 242 DistinctId: record.CommitterDid, 243 Event: "git_ref_update", 244 }) 245 } 246 247 // Trigger a sites redeploy if this push is to the configured sites branch. 248 if cfClient.Enabled() { 249 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 250 } 251 252 return errors.Join(errPunchcard, errLanguages, errPosthog) 253} 254 255// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 256// branch configured for this repo and, if so, syncs the site to R2 257func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) { 258 logger := log.FromContext(ctx) 259 260 ref := plumbing.ReferenceName(record.Ref) 261 if !ref.IsBranch() { 262 return 263 } 264 pushedBranch := ref.Short() 265 266 repo, err := db.GetRepoByDid(d, record.Repo) 267 if err != nil { 268 return 269 } 270 271 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid) 272 if err != nil || siteConfig == nil { 273 return 274 } 275 if siteConfig.Branch != pushedBranch { 276 return 277 } 278 279 deploy := &models.SiteDeploy{ 280 RepoDid: syntax.DID(repo.RepoDid), 281 Branch: siteConfig.Branch, 282 Dir: siteConfig.Dir, 283 CommitSHA: record.NewSha, 284 Trigger: models.SiteDeployTriggerPush, 285 } 286 287 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir) 288 if deployErr != nil { 289 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 290 deploy.Status = models.SiteDeployStatusFailure 291 deploy.Error = deployErr.Error() 292 } else { 293 deploy.Status = models.SiteDeployStatusSuccess 294 } 295 296 if err := db.AddSiteDeploy(d, deploy); err != nil { 297 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 298 } 299 300 if deployErr == nil { 301 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 302 } 303} 304 305func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 306 if record.CommitterDid == "" { 307 return nil 308 } 309 310 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 311 if err != nil { 312 return err 313 } 314 315 count := 0 316 for _, ke := range knownEmails { 317 if record.Meta == nil { 318 continue 319 } 320 if record.Meta.CommitCount == nil { 321 continue 322 } 323 for _, ce := range record.Meta.CommitCount.ByEmail { 324 if ce == nil { 325 continue 326 } 327 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 328 count += int(ce.Count) 329 } 330 } 331 } 332 333 punch := models.Punch{ 334 Did: record.CommitterDid, 335 Date: time.Now(), 336 Count: count, 337 } 338 return db.AddPunch(d, punch) 339} 340 341func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 342 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 343 return fmt.Errorf("empty language data for repo: %s", record.Repo) 344 } 345 346 r, lookupErr := db.GetRepoByDid(d, record.Repo) 347 if lookupErr != nil { 348 return fmt.Errorf("failed to look up repo: %w", lookupErr) 349 } 350 repo := *r 351 352 ref := plumbing.ReferenceName(record.Ref) 353 if !ref.IsBranch() { 354 return fmt.Errorf("%s is not a valid reference name", ref) 355 } 356 357 var langs []models.RepoLanguage 358 for _, l := range record.Meta.LangBreakdown.Inputs { 359 if l == nil { 360 continue 361 } 362 363 langs = append(langs, models.RepoLanguage{ 364 RepoDid: syntax.DID(repo.RepoDid), 365 Ref: ref.Short(), 366 IsDefaultRef: record.Meta.IsDefaultRef, 367 Language: l.Lang, 368 Bytes: l.Size, 369 }) 370 } 371 372 tx, err := d.Begin() 373 if err != nil { 374 return err 375 } 376 defer tx.Rollback() 377 378 // update appview's cache 379 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs) 380 if err != nil { 381 fmt.Printf("failed; %s\n", err) 382 // non-fatal 383 } 384 385 return tx.Commit() 386} 387 388func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error { 389 logger := log.FromContext(ctx) 390 391 var record knotdb.RepoDIDAssign 392 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 393 return fmt.Errorf("unmarshal didAssign: %w", err) 394 } 395 396 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 397 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 398 record.RepoDid, record.OwnerDid, record.RepoName) 399 } 400 401 logger.Info("processing didAssign event", 402 "repo_did", record.RepoDid, 403 "owner_did", record.OwnerDid, 404 "repo_name", record.RepoName) 405 406 repos, err := db.GetRepos(d, 407 orm.FilterEq("did", record.OwnerDid), 408 orm.FilterEq("name", record.RepoName), 409 ) 410 if err != nil || len(repos) == 0 { 411 logger.Warn("didAssign for unknown repo, skipping", 412 "owner_did", record.OwnerDid, 413 "repo_name", record.RepoName) 414 return nil 415 } 416 repo := repos[0] 417 knot := source.Host 418 419 if repo.Knot != knot { 420 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot) 421 } 422 423 repoAtUri := repo.RepoAt().String() 424 legacyResource := record.OwnerDid + "/" + record.RepoName 425 426 if repo.RepoDid != record.RepoDid { 427 tx, err := d.Begin() 428 if err != nil { 429 return fmt.Errorf("begin didAssign txn: %w", err) 430 } 431 defer tx.Rollback() 432 433 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 434 return fmt.Errorf("cascade repo_did: %w", err) 435 } 436 437 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 438 return fmt.Errorf("enqueue pds rewrites: %w", err) 439 } 440 441 if err := tx.Commit(); err != nil { 442 return fmt.Errorf("commit didAssign txn: %w", err) 443 } 444 } 445 446 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 447 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 448 } 449 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 450 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 451 } 452 453 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid)) 454 if collabErr != nil { 455 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 456 } 457 for _, c := range collabs { 458 collabDid := c.SubjectDid.String() 459 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 460 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 461 } 462 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 463 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 464 } 465 } 466 467 if err := enforcer.E.SavePolicy(); err != nil { 468 return fmt.Errorf("save RBAC policies after didAssign: %w", err) 469 } 470 471 logger.Info("didAssign processed successfully", 472 "repo_did", record.RepoDid, 473 "owner_did", record.OwnerDid, 474 "repo_name", record.RepoName) 475 476 return nil 477}