Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

at icy/yovxsu 13 kB View raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "strings" 11 "time" 12 13 "tangled.org/core/appview/cloudflare" 14 "tangled.org/core/appview/notify" 15 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/knotcompat" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/sites" 22 "tangled.org/core/consts" 23 ec "tangled.org/core/eventconsumer" 24 "tangled.org/core/eventstream" 25 knotdb "tangled.org/core/knotserver/db" 26 "tangled.org/core/log" 27 "tangled.org/core/orm" 28 "tangled.org/core/rbac" 29 "tangled.org/core/workflow" 30 31 "github.com/bluesky-social/indigo/atproto/syntax" 32 "github.com/go-git/go-git/v5/plumbing" 33 "github.com/posthog/posthog-go" 34) 35 36func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 37 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null")) 38 if err != nil { 39 return nil, err 40 } 41 42 hosts := make([]string, len(knots)) 43 for i, k := range knots { 44 hosts[i] = k.Domain 45 } 46 47 return bootstrapStream( 48 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr, 49 c.Knotstream, c.Core.Dev, 50 knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 51 ), nil 52} 53 54func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { 55 if repoDid != nil && *repoDid != "" { 56 return db.GetRepoByDid(d, *repoDid) 57 } 58 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName))) 59 if err != nil { 60 return nil, err 61 } 62 if len(repos) == 0 { 63 return nil, sql.ErrNoRows 64 } 65 return &repos[0], nil 66} 67 68func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 69 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 70 switch msg.Nsid { 71 case tangled.GitRefUpdateNSID: 72 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 73 case tangled.PipelineNSID: 74 return ingestPipeline(d, source, msg) 75 case knotdb.RepoDIDAssignNSID: 76 return ingestDIDAssign(d, enforcer, source, msg, ctx) 77 } 78 79 return nil 80 } 81} 82 83// TODO(boltless): remove this. knotmirror should do all sort of indexing 84func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error { 85 logger := log.FromContext(ctx) 86 87 var record tangled.GitRefUpdate 88 err := json.Unmarshal(msg.EventJson, &record) 89 if err != nil { 90 return err 91 } 92 93 if !knotcompat.KnotHasCapability(ctx, source.Host, dev, consts.CapKnotACL) { 94 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 95 switch { 96 case err != nil: 97 logger.Warn("gitRefUpdate membership lookup failed, ingesting without the sanity check", "committer", record.CommitterDid, "knot", source.Host, "err", err) 98 case !slices.Contains(knownKnots, source.Host): 99 logger.Warn("gitRefUpdate committer is not a known member of the knot, ingesting anyway", "committer", record.CommitterDid, "knot", source.Host) 100 } 101 } 102 103 if record.Repo == "" { 104 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host) 105 } 106 107 repo, lookupErr := db.GetRepoByDid(d, record.Repo) 108 if lookupErr != nil { 109 return fmt.Errorf("failed to look up repo: %w", lookupErr) 110 } 111 112 logger.Info("processing gitRefUpdate event", 113 "repo", repo.RepoIdentifier(), 114 "ref", record.Ref, 115 "old_sha", record.OldSha, 116 "new_sha", record.NewSha) 117 118 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 119 120 errPunchcard := populatePunchcard(d, record) 121 errLanguages := updateRepoLanguages(d, record) 122 123 var errPosthog error 124 if !dev && record.CommitterDid != "" { 125 errPosthog = pc.Enqueue(posthog.Capture{ 126 DistinctId: record.CommitterDid, 127 Event: "git_ref_update", 128 }) 129 } 130 131 // Trigger a sites redeploy if this push is to the configured sites branch. 132 if cfClient.Enabled() { 133 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 134 } 135 136 return errors.Join(errPunchcard, errLanguages, errPosthog) 137} 138 139// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 140// branch configured for this repo and, if so, syncs the site to R2 141func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) { 142 logger := log.FromContext(ctx) 143 144 ref := plumbing.ReferenceName(record.Ref) 145 if !ref.IsBranch() { 146 return 147 } 148 pushedBranch := ref.Short() 149 150 repo, err := db.GetRepoByDid(d, record.Repo) 151 if err != nil { 152 return 153 } 154 155 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid) 156 if err != nil || siteConfig == nil { 157 return 158 } 159 if siteConfig.Branch != pushedBranch { 160 return 161 } 162 163 deploy := &models.SiteDeploy{ 164 RepoDid: syntax.DID(repo.RepoDid), 165 Branch: siteConfig.Branch, 166 Dir: siteConfig.Dir, 167 CommitSHA: record.NewSha, 168 Trigger: models.SiteDeployTriggerPush, 169 } 170 171 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir) 172 if deployErr != nil { 173 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 174 deploy.Status = models.SiteDeployStatusFailure 175 deploy.Error = deployErr.Error() 176 } else { 177 deploy.Status = models.SiteDeployStatusSuccess 178 } 179 180 if err := db.AddSiteDeploy(d, deploy); err != nil { 181 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 182 } 183 184 if deployErr == nil { 185 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 186 } 187} 188 189func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 190 if record.CommitterDid == "" { 191 return nil 192 } 193 194 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 195 if err != nil { 196 return err 197 } 198 199 count := 0 200 for _, ke := range knownEmails { 201 if record.Meta == nil { 202 continue 203 } 204 if record.Meta.CommitCount == nil { 205 continue 206 } 207 for _, ce := range record.Meta.CommitCount.ByEmail { 208 if ce == nil { 209 continue 210 } 211 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 212 count += int(ce.Count) 213 } 214 } 215 } 216 217 punch := models.Punch{ 218 Did: record.CommitterDid, 219 Date: time.Now(), 220 Count: count, 221 } 222 return db.AddPunch(d, punch) 223} 224 225func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 226 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 227 return fmt.Errorf("empty language data for repo: %s", record.Repo) 228 } 229 230 r, lookupErr := db.GetRepoByDid(d, record.Repo) 231 if lookupErr != nil { 232 return fmt.Errorf("failed to look up repo: %w", lookupErr) 233 } 234 repo := *r 235 236 ref := plumbing.ReferenceName(record.Ref) 237 if !ref.IsBranch() { 238 return fmt.Errorf("%s is not a valid reference name", ref) 239 } 240 241 var langs []models.RepoLanguage 242 for _, l := range record.Meta.LangBreakdown.Inputs { 243 if l == nil { 244 continue 245 } 246 247 langs = append(langs, models.RepoLanguage{ 248 RepoDid: syntax.DID(repo.RepoDid), 249 Ref: ref.Short(), 250 IsDefaultRef: record.Meta.IsDefaultRef, 251 Language: l.Lang, 252 Bytes: l.Size, 253 }) 254 } 255 256 tx, err := d.Begin() 257 if err != nil { 258 return err 259 } 260 defer tx.Rollback() 261 262 // update appview's cache 263 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs) 264 if err != nil { 265 fmt.Printf("failed; %s\n", err) 266 // non-fatal 267 } 268 269 return tx.Commit() 270} 271 272func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error { 273 var record tangled.Pipeline 274 err := json.Unmarshal(msg.EventJson, &record) 275 if err != nil { 276 return err 277 } 278 279 if record.TriggerMetadata == nil { 280 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 281 } 282 283 if record.TriggerMetadata.Repo == nil { 284 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 285 } 286 287 repoName := "" 288 if record.TriggerMetadata.Repo.Repo != nil { 289 repoName = *record.TriggerMetadata.Repo.Repo 290 } 291 292 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 293 if lookupErr != nil { 294 return fmt.Errorf("failed to look up repo: %w", lookupErr) 295 } 296 if repo.Spindle == "" { 297 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 298 } 299 300 // trigger info 301 var trigger models.Trigger 302 var sha string 303 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 304 switch trigger.Kind { 305 case workflow.TriggerKindPush: 306 trigger.PushRef = &record.TriggerMetadata.Push.Ref 307 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 308 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 309 sha = *trigger.PushNewSha 310 case workflow.TriggerKindPullRequest: 311 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 312 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 313 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 314 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 315 sha = *trigger.PRSourceSha 316 } 317 318 tx, err := d.Begin() 319 if err != nil { 320 return fmt.Errorf("failed to start txn: %w", err) 321 } 322 323 triggerId, err := db.AddTrigger(tx, trigger) 324 if err != nil { 325 return fmt.Errorf("failed to add trigger entry: %w", err) 326 } 327 328 pipeline := models.Pipeline{ 329 Rkey: msg.Rkey, 330 Knot: source.Host, 331 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 332 RepoName: repoName, 333 RepoDid: repo.RepoDid, 334 TriggerId: int(triggerId), 335 Sha: sha, 336 } 337 338 err = db.AddPipeline(tx, pipeline) 339 if err != nil { 340 return fmt.Errorf("failed to add pipeline: %w", err) 341 } 342 343 err = tx.Commit() 344 if err != nil { 345 return fmt.Errorf("failed to commit txn: %w", err) 346 } 347 348 return nil 349} 350 351func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error { 352 logger := log.FromContext(ctx) 353 354 var record knotdb.RepoDIDAssign 355 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 356 return fmt.Errorf("unmarshal didAssign: %w", err) 357 } 358 359 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 360 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 361 record.RepoDid, record.OwnerDid, record.RepoName) 362 } 363 364 logger.Info("processing didAssign event", 365 "repo_did", record.RepoDid, 366 "owner_did", record.OwnerDid, 367 "repo_name", record.RepoName) 368 369 repos, err := db.GetRepos(d, 370 orm.FilterEq("did", record.OwnerDid), 371 orm.FilterEq("name", record.RepoName), 372 ) 373 if err != nil || len(repos) == 0 { 374 logger.Warn("didAssign for unknown repo, skipping", 375 "owner_did", record.OwnerDid, 376 "repo_name", record.RepoName) 377 return nil 378 } 379 repo := repos[0] 380 knot := source.Host 381 382 if repo.Knot != knot { 383 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot) 384 } 385 386 repoAtUri := repo.RepoAt().String() 387 legacyResource := record.OwnerDid + "/" + record.RepoName 388 389 if repo.RepoDid != record.RepoDid { 390 tx, err := d.Begin() 391 if err != nil { 392 return fmt.Errorf("begin didAssign txn: %w", err) 393 } 394 defer tx.Rollback() 395 396 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 397 return fmt.Errorf("cascade repo_did: %w", err) 398 } 399 400 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 401 return fmt.Errorf("enqueue pds rewrites: %w", err) 402 } 403 404 if err := tx.Commit(); err != nil { 405 return fmt.Errorf("commit didAssign txn: %w", err) 406 } 407 } 408 409 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 410 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 411 } 412 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 413 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 414 } 415 416 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid)) 417 if collabErr != nil { 418 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 419 } 420 for _, c := range collabs { 421 collabDid := c.SubjectDid.String() 422 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 423 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 424 } 425 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 426 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 427 } 428 } 429 430 if err := enforcer.E.SavePolicy(); err != nil { 431 return fmt.Errorf("save RBAC policies after didAssign: %w", err) 432 } 433 434 logger.Info("didAssign processed successfully", 435 "repo_did", record.RepoDid, 436 "owner_did", record.OwnerDid, 437 "repo_name", record.RepoName) 438 439 return nil 440}