Monorepo for Tangled tangled.org
4

Configure Feed

Select the types of activity you want to include in your feed.

1package state 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "slices" 10 "strings" 11 "time" 12 13 "tangled.org/core/appview/cloudflare" 14 "tangled.org/core/appview/notify" 15 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/sites" 21 ec "tangled.org/core/eventconsumer" 22 "tangled.org/core/eventstream" 23 knotdb "tangled.org/core/knotserver/db" 24 "tangled.org/core/log" 25 "tangled.org/core/orm" 26 "tangled.org/core/rbac" 27 "tangled.org/core/workflow" 28 29 "github.com/bluesky-social/indigo/atproto/syntax" 30 "github.com/go-git/go-git/v5/plumbing" 31 "github.com/posthog/posthog-go" 32) 33 34func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 35 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null")) 36 if err != nil { 37 return nil, err 38 } 39 40 hosts := make([]string, len(knots)) 41 for i, k := range knots { 42 hosts[i] = k.Domain 43 } 44 45 return bootstrapStream( 46 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr, 47 c.Knotstream, c.Core.Dev, 48 knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 49 ), nil 50} 51 52func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { 53 if repoDid != nil && *repoDid != "" { 54 return db.GetRepoByDid(d, *repoDid) 55 } 56 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName))) 57 if err != nil { 58 return nil, err 59 } 60 if len(repos) == 0 { 61 return nil, sql.ErrNoRows 62 } 63 return &repos[0], nil 64} 65 66func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 67 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error { 68 switch msg.Nsid { 69 case tangled.GitRefUpdateNSID: 70 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 71 case tangled.PipelineNSID: 72 return ingestPipeline(d, source, msg) 73 case knotdb.RepoDIDAssignNSID: 74 return ingestDIDAssign(d, enforcer, source, msg, ctx) 75 } 76 77 return nil 78 } 79} 80 81// TODO(boltless): remove this. knotmirror should do all sort of indexing 82func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error { 83 logger := log.FromContext(ctx) 84 85 var record tangled.GitRefUpdate 86 err := json.Unmarshal(msg.EventJson, &record) 87 if err != nil { 88 return err 89 } 90 91 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 92 if err != nil { 93 return err 94 } 95 if !slices.Contains(knownKnots, source.Host) { 96 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Host) 97 } 98 99 if record.Repo == "" { 100 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host) 101 } 102 103 repo, lookupErr := db.GetRepoByDid(d, record.Repo) 104 if lookupErr != nil { 105 return fmt.Errorf("failed to look up repo: %w", lookupErr) 106 } 107 108 logger.Info("processing gitRefUpdate event", 109 "repo", repo.RepoIdentifier(), 110 "ref", record.Ref, 111 "old_sha", record.OldSha, 112 "new_sha", record.NewSha) 113 114 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 115 116 errPunchcard := populatePunchcard(d, record) 117 errLanguages := updateRepoLanguages(d, record) 118 119 var errPosthog error 120 if !dev && record.CommitterDid != "" { 121 errPosthog = pc.Enqueue(posthog.Capture{ 122 DistinctId: record.CommitterDid, 123 Event: "git_ref_update", 124 }) 125 } 126 127 // Trigger a sites redeploy if this push is to the configured sites branch. 128 if cfClient.Enabled() { 129 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 130 } 131 132 return errors.Join(errPunchcard, errLanguages, errPosthog) 133} 134 135// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 136// branch configured for this repo and, if so, syncs the site to R2 137func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) { 138 logger := log.FromContext(ctx) 139 140 ref := plumbing.ReferenceName(record.Ref) 141 if !ref.IsBranch() { 142 return 143 } 144 pushedBranch := ref.Short() 145 146 repo, err := db.GetRepoByDid(d, record.Repo) 147 if err != nil { 148 return 149 } 150 151 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid) 152 if err != nil || siteConfig == nil { 153 return 154 } 155 if siteConfig.Branch != pushedBranch { 156 return 157 } 158 159 deploy := &models.SiteDeploy{ 160 RepoDid: syntax.DID(repo.RepoDid), 161 Branch: siteConfig.Branch, 162 Dir: siteConfig.Dir, 163 CommitSHA: record.NewSha, 164 Trigger: models.SiteDeployTriggerPush, 165 } 166 167 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir) 168 if deployErr != nil { 169 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 170 deploy.Status = models.SiteDeployStatusFailure 171 deploy.Error = deployErr.Error() 172 } else { 173 deploy.Status = models.SiteDeployStatusSuccess 174 } 175 176 if err := db.AddSiteDeploy(d, deploy); err != nil { 177 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 178 } 179 180 if deployErr == nil { 181 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 182 } 183} 184 185func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 186 if record.CommitterDid == "" { 187 return nil 188 } 189 190 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 191 if err != nil { 192 return err 193 } 194 195 count := 0 196 for _, ke := range knownEmails { 197 if record.Meta == nil { 198 continue 199 } 200 if record.Meta.CommitCount == nil { 201 continue 202 } 203 for _, ce := range record.Meta.CommitCount.ByEmail { 204 if ce == nil { 205 continue 206 } 207 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 208 count += int(ce.Count) 209 } 210 } 211 } 212 213 punch := models.Punch{ 214 Did: record.CommitterDid, 215 Date: time.Now(), 216 Count: count, 217 } 218 return db.AddPunch(d, punch) 219} 220 221func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 222 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 223 return fmt.Errorf("empty language data for repo: %s", record.Repo) 224 } 225 226 r, lookupErr := db.GetRepoByDid(d, record.Repo) 227 if lookupErr != nil { 228 return fmt.Errorf("failed to look up repo: %w", lookupErr) 229 } 230 repo := *r 231 232 ref := plumbing.ReferenceName(record.Ref) 233 if !ref.IsBranch() { 234 return fmt.Errorf("%s is not a valid reference name", ref) 235 } 236 237 var langs []models.RepoLanguage 238 for _, l := range record.Meta.LangBreakdown.Inputs { 239 if l == nil { 240 continue 241 } 242 243 langs = append(langs, models.RepoLanguage{ 244 RepoDid: syntax.DID(repo.RepoDid), 245 Ref: ref.Short(), 246 IsDefaultRef: record.Meta.IsDefaultRef, 247 Language: l.Lang, 248 Bytes: l.Size, 249 }) 250 } 251 252 tx, err := d.Begin() 253 if err != nil { 254 return err 255 } 256 defer tx.Rollback() 257 258 // update appview's cache 259 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs) 260 if err != nil { 261 fmt.Printf("failed; %s\n", err) 262 // non-fatal 263 } 264 265 return tx.Commit() 266} 267 268func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error { 269 var record tangled.Pipeline 270 err := json.Unmarshal(msg.EventJson, &record) 271 if err != nil { 272 return err 273 } 274 275 if record.TriggerMetadata == nil { 276 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 277 } 278 279 if record.TriggerMetadata.Repo == nil { 280 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 281 } 282 283 repoName := "" 284 if record.TriggerMetadata.Repo.Repo != nil { 285 repoName = *record.TriggerMetadata.Repo.Repo 286 } 287 288 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 289 if lookupErr != nil { 290 return fmt.Errorf("failed to look up repo: %w", lookupErr) 291 } 292 if repo.Spindle == "" { 293 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 294 } 295 296 // trigger info 297 var trigger models.Trigger 298 var sha string 299 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 300 switch trigger.Kind { 301 case workflow.TriggerKindPush: 302 trigger.PushRef = &record.TriggerMetadata.Push.Ref 303 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 304 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 305 sha = *trigger.PushNewSha 306 case workflow.TriggerKindPullRequest: 307 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 308 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 309 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 310 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 311 sha = *trigger.PRSourceSha 312 } 313 314 tx, err := d.Begin() 315 if err != nil { 316 return fmt.Errorf("failed to start txn: %w", err) 317 } 318 319 triggerId, err := db.AddTrigger(tx, trigger) 320 if err != nil { 321 return fmt.Errorf("failed to add trigger entry: %w", err) 322 } 323 324 pipeline := models.Pipeline{ 325 Rkey: msg.Rkey, 326 Knot: source.Host, 327 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 328 RepoName: repoName, 329 RepoDid: repo.RepoDid, 330 TriggerId: int(triggerId), 331 Sha: sha, 332 } 333 334 err = db.AddPipeline(tx, pipeline) 335 if err != nil { 336 return fmt.Errorf("failed to add pipeline: %w", err) 337 } 338 339 err = tx.Commit() 340 if err != nil { 341 return fmt.Errorf("failed to commit txn: %w", err) 342 } 343 344 return nil 345} 346 347func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error { 348 logger := log.FromContext(ctx) 349 350 var record knotdb.RepoDIDAssign 351 if err := json.Unmarshal(msg.EventJson, &record); err != nil { 352 return fmt.Errorf("unmarshal didAssign: %w", err) 353 } 354 355 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 356 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 357 record.RepoDid, record.OwnerDid, record.RepoName) 358 } 359 360 logger.Info("processing didAssign event", 361 "repo_did", record.RepoDid, 362 "owner_did", record.OwnerDid, 363 "repo_name", record.RepoName) 364 365 repos, err := db.GetRepos(d, 366 orm.FilterEq("did", record.OwnerDid), 367 orm.FilterEq("rkey", strings.ToLower(record.RepoName)), 368 ) 369 if err != nil || len(repos) == 0 { 370 logger.Warn("didAssign for unknown repo, skipping", 371 "owner_did", record.OwnerDid, 372 "repo_name", record.RepoName) 373 return nil 374 } 375 repo := repos[0] 376 knot := source.Host 377 378 if repo.Knot != knot { 379 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot) 380 } 381 382 repoAtUri := repo.RepoAt().String() 383 legacyResource := record.OwnerDid + "/" + record.RepoName 384 385 if repo.RepoDid != record.RepoDid { 386 tx, err := d.Begin() 387 if err != nil { 388 return fmt.Errorf("begin didAssign txn: %w", err) 389 } 390 defer tx.Rollback() 391 392 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 393 return fmt.Errorf("cascade repo_did: %w", err) 394 } 395 396 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 397 return fmt.Errorf("enqueue pds rewrites: %w", err) 398 } 399 400 if err := tx.Commit(); err != nil { 401 return fmt.Errorf("commit didAssign txn: %w", err) 402 } 403 } 404 405 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 406 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 407 } 408 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 409 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 410 } 411 412 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid)) 413 if collabErr != nil { 414 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 415 } 416 for _, c := range collabs { 417 collabDid := c.SubjectDid.String() 418 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 419 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 420 } 421 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 422 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 423 } 424 } 425 426 if err := enforcer.E.SavePolicy(); err != nil { 427 return fmt.Errorf("save RBAC policies after didAssign: %w", err) 428 } 429 430 logger.Info("didAssign processed successfully", 431 "repo_did", record.RepoDid, 432 "owner_did", record.OwnerDid, 433 "repo_name", record.RepoName) 434 435 return nil 436}