Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "strings"
11 "time"
12
13 "tangled.org/core/appview/cloudflare"
14 "tangled.org/core/appview/notify"
15
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/knotacl"
20 "tangled.org/core/appview/knotcompat"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/sites"
23 "tangled.org/core/consts"
24 ec "tangled.org/core/eventconsumer"
25 "tangled.org/core/eventstream"
26 knotdb "tangled.org/core/knotserver/db"
27 "tangled.org/core/log"
28 "tangled.org/core/orm"
29 "tangled.org/core/rbac"
30 "tangled.org/core/workflow"
31
32 "github.com/bluesky-social/indigo/atproto/syntax"
33 "github.com/go-git/go-git/v5/plumbing"
34 "github.com/posthog/posthog-go"
35)
36
37type aclRoster interface {
38 AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error
39 RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error
40 AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error
41 RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error
42 InvalidateMembers(host string)
43 InvalidateCollaborators(host, repoDid string)
44}
45
46func Knotstream(ctx context.Context, c *config.Config, d *db.DB, acl *knotacl.Service, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
47 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null"))
48 if err != nil {
49 return nil, err
50 }
51
52 hosts := make([]string, len(knots))
53 for i, k := range knots {
54 hosts[i] = k.Domain
55 }
56
57 return bootstrapStream(
58 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr,
59 c.Knotstream, c.Core.Dev,
60 knotIngester(d, acl, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
61 ), nil
62}
63
64func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) {
65 if repoDid != nil && *repoDid != "" {
66 return db.GetRepoByDid(d, *repoDid)
67 }
68 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName)))
69 if err != nil {
70 return nil, err
71 }
72 if len(repos) == 0 {
73 return nil, sql.ErrNoRows
74 }
75 return &repos[0], nil
76}
77
78func knotIngester(d *db.DB, acl aclRoster, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
79 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error {
80 switch msg.Nsid {
81 case tangled.GitRefUpdateNSID:
82 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
83 case tangled.PipelineNSID:
84 return ingestPipeline(d, source, msg)
85 case knotdb.RepoDIDAssignNSID:
86 return ingestDIDAssign(d, enforcer, source, msg, ctx)
87 case knotdb.KnotMemberUpdateNSID:
88 return ingestKnotMemberUpdate(acl, source, msg)
89 case knotdb.RepoCollaboratorUpdateNSID:
90 return ingestCollaboratorUpdate(ctx, d, acl, source, msg)
91 }
92
93 return nil
94 }
95}
96
97const (
98 aclIngestAttempts = 3
99 aclIngestBackoff = 50 * time.Millisecond
100)
101
102func withAclRetry(attempts int, backoff time.Duration, op func() error) error {
103 if err := op(); err == nil || attempts <= 1 {
104 return err
105 }
106 time.Sleep(backoff)
107 return withAclRetry(attempts-1, backoff, op)
108}
109
110func ingestKnotMemberUpdate(acl aclRoster, source ec.Source, msg eventstream.Event) error {
111 var rec knotdb.KnotMemberUpdate
112 if err := json.Unmarshal(msg.EventJson, &rec); err != nil {
113 return fmt.Errorf("unmarshal memberUpdate: %w", err)
114 }
115
116 subject, err := syntax.ParseDID(rec.Subject)
117 if err != nil {
118 return fmt.Errorf("memberUpdate bad subject %q: %w", rec.Subject, err)
119 }
120
121 cursor := knotacl.Cursor(msg.Created)
122 switch rec.Op {
123 case knotdb.AclOpAdd:
124 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
125 return acl.AddKnotMember(source.Host, subject, cursor)
126 })
127 case knotdb.AclOpRemove:
128 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
129 return acl.RemoveKnotMember(source.Host, subject, cursor)
130 })
131 default:
132 return fmt.Errorf("memberUpdate unknown op %q", rec.Op)
133 }
134
135 if err != nil {
136 acl.InvalidateMembers(source.Host)
137 }
138 return err
139}
140
141func ingestCollaboratorUpdate(ctx context.Context, d *db.DB, acl aclRoster, source ec.Source, msg eventstream.Event) error {
142 var rec knotdb.RepoCollaboratorUpdate
143 if err := json.Unmarshal(msg.EventJson, &rec); err != nil {
144 return fmt.Errorf("unmarshal collaboratorUpdate: %w", err)
145 }
146
147 subject, err := syntax.ParseDID(rec.Subject)
148 if err != nil {
149 return fmt.Errorf("collaboratorUpdate bad subject %q: %w", rec.Subject, err)
150 }
151 repoDid, err := syntax.ParseDID(rec.Repo)
152 if err != nil {
153 return fmt.Errorf("collaboratorUpdate bad repo %q: %w", rec.Repo, err)
154 }
155
156 cursor := knotacl.Cursor(msg.Created)
157 switch rec.Op {
158 case knotdb.AclOpAdd:
159 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
160 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject)
161 if err != nil || !owned {
162 return err
163 }
164 return acl.AddCollaborator(repoDid, subject, cursor)
165 })
166 case knotdb.AclOpRemove:
167 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
168 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject)
169 if err != nil || !owned {
170 return err
171 }
172 return acl.RemoveCollaborator(repoDid, subject, cursor)
173 })
174 default:
175 return fmt.Errorf("collaboratorUpdate unknown op %q", rec.Op)
176 }
177
178 if err != nil {
179 acl.InvalidateCollaborators(source.Host, repoDid.String())
180 }
181 return err
182}
183
184func repoOwnedBySource(ctx context.Context, d *db.DB, source ec.Source, repoDid, subject syntax.DID) (bool, error) {
185 repo, err := db.GetRepoByDid(d, repoDid.String())
186 if errors.Is(err, sql.ErrNoRows) {
187 log.FromContext(ctx).Warn("collaboratorUpdate for unindexed repo, skipping until reconcile",
188 "repo_did", repoDid, "subject", subject)
189 return false, nil
190 }
191 if err != nil {
192 return false, err
193 }
194 if repo.Knot != source.Host {
195 log.FromContext(ctx).Warn("collaboratorUpdate for a repo this knot does not host, dropping",
196 "repo_did", repoDid, "subject", subject, "claimed_by", source.Host, "owner", repo.Knot)
197 return false, nil
198 }
199 return true, nil
200}
201
202// TODO(boltless): remove this. knotmirror should do all sort of indexing
203func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error {
204 logger := log.FromContext(ctx)
205
206 var record tangled.GitRefUpdate
207 err := json.Unmarshal(msg.EventJson, &record)
208 if err != nil {
209 return err
210 }
211
212 if !knotcompat.KnotHasCapability(ctx, source.Host, dev, consts.CapKnotACL) {
213 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
214 switch {
215 case err != nil:
216 logger.Warn("gitRefUpdate membership lookup failed, ingesting without the sanity check", "committer", record.CommitterDid, "knot", source.Host, "err", err)
217 case !slices.Contains(knownKnots, source.Host):
218 logger.Warn("gitRefUpdate committer is not a known member of the knot, ingesting anyway", "committer", record.CommitterDid, "knot", source.Host)
219 }
220 }
221
222 if record.Repo == "" {
223 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host)
224 }
225
226 repo, lookupErr := db.GetRepoByDid(d, record.Repo)
227 if lookupErr != nil {
228 return fmt.Errorf("failed to look up repo: %w", lookupErr)
229 }
230
231 logger.Info("processing gitRefUpdate event",
232 "repo", repo.RepoIdentifier(),
233 "ref", record.Ref,
234 "old_sha", record.OldSha,
235 "new_sha", record.NewSha)
236
237 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
238
239 errPunchcard := populatePunchcard(d, record)
240 errLanguages := updateRepoLanguages(d, record)
241
242 var errPosthog error
243 if !dev && record.CommitterDid != "" {
244 errPosthog = pc.Enqueue(posthog.Capture{
245 DistinctId: record.CommitterDid,
246 Event: "git_ref_update",
247 })
248 }
249
250 // Trigger a sites redeploy if this push is to the configured sites branch.
251 if cfClient.Enabled() {
252 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
253 }
254
255 return errors.Join(errPunchcard, errLanguages, errPosthog)
256}
257
258// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
259// branch configured for this repo and, if so, syncs the site to R2
260func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) {
261 logger := log.FromContext(ctx)
262
263 ref := plumbing.ReferenceName(record.Ref)
264 if !ref.IsBranch() {
265 return
266 }
267 pushedBranch := ref.Short()
268
269 repo, err := db.GetRepoByDid(d, record.Repo)
270 if err != nil {
271 return
272 }
273
274 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid)
275 if err != nil || siteConfig == nil {
276 return
277 }
278 if siteConfig.Branch != pushedBranch {
279 return
280 }
281
282 deploy := &models.SiteDeploy{
283 RepoDid: syntax.DID(repo.RepoDid),
284 Branch: siteConfig.Branch,
285 Dir: siteConfig.Dir,
286 CommitSHA: record.NewSha,
287 Trigger: models.SiteDeployTriggerPush,
288 }
289
290 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir)
291 if deployErr != nil {
292 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr)
293 deploy.Status = models.SiteDeployStatusFailure
294 deploy.Error = deployErr.Error()
295 } else {
296 deploy.Status = models.SiteDeployStatusSuccess
297 }
298
299 if err := db.AddSiteDeploy(d, deploy); err != nil {
300 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err)
301 }
302
303 if deployErr == nil {
304 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier())
305 }
306}
307
308func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
309 if record.CommitterDid == "" {
310 return nil
311 }
312
313 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
314 if err != nil {
315 return err
316 }
317
318 count := 0
319 for _, ke := range knownEmails {
320 if record.Meta == nil {
321 continue
322 }
323 if record.Meta.CommitCount == nil {
324 continue
325 }
326 for _, ce := range record.Meta.CommitCount.ByEmail {
327 if ce == nil {
328 continue
329 }
330 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
331 count += int(ce.Count)
332 }
333 }
334 }
335
336 punch := models.Punch{
337 Did: record.CommitterDid,
338 Date: time.Now(),
339 Count: count,
340 }
341 return db.AddPunch(d, punch)
342}
343
344func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
345 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
346 return fmt.Errorf("empty language data for repo: %s", record.Repo)
347 }
348
349 r, lookupErr := db.GetRepoByDid(d, record.Repo)
350 if lookupErr != nil {
351 return fmt.Errorf("failed to look up repo: %w", lookupErr)
352 }
353 repo := *r
354
355 ref := plumbing.ReferenceName(record.Ref)
356 if !ref.IsBranch() {
357 return fmt.Errorf("%s is not a valid reference name", ref)
358 }
359
360 var langs []models.RepoLanguage
361 for _, l := range record.Meta.LangBreakdown.Inputs {
362 if l == nil {
363 continue
364 }
365
366 langs = append(langs, models.RepoLanguage{
367 RepoDid: syntax.DID(repo.RepoDid),
368 Ref: ref.Short(),
369 IsDefaultRef: record.Meta.IsDefaultRef,
370 Language: l.Lang,
371 Bytes: l.Size,
372 })
373 }
374
375 tx, err := d.Begin()
376 if err != nil {
377 return err
378 }
379 defer tx.Rollback()
380
381 // update appview's cache
382 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs)
383 if err != nil {
384 fmt.Printf("failed; %s\n", err)
385 // non-fatal
386 }
387
388 return tx.Commit()
389}
390
391func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error {
392 var record tangled.Pipeline
393 err := json.Unmarshal(msg.EventJson, &record)
394 if err != nil {
395 return err
396 }
397
398 if record.TriggerMetadata == nil {
399 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
400 }
401
402 if record.TriggerMetadata.Repo == nil {
403 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
404 }
405
406 repoName := ""
407 if record.TriggerMetadata.Repo.Repo != nil {
408 repoName = *record.TriggerMetadata.Repo.Repo
409 }
410
411 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName)
412 if lookupErr != nil {
413 return fmt.Errorf("failed to look up repo: %w", lookupErr)
414 }
415 if repo.Spindle == "" {
416 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
417 }
418
419 // trigger info
420 var trigger models.Trigger
421 var sha string
422 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
423 switch trigger.Kind {
424 case workflow.TriggerKindPush:
425 trigger.PushRef = &record.TriggerMetadata.Push.Ref
426 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
427 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
428 sha = *trigger.PushNewSha
429 case workflow.TriggerKindPullRequest:
430 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
431 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
432 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
433 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
434 sha = *trigger.PRSourceSha
435 }
436
437 tx, err := d.Begin()
438 if err != nil {
439 return fmt.Errorf("failed to start txn: %w", err)
440 }
441
442 triggerId, err := db.AddTrigger(tx, trigger)
443 if err != nil {
444 return fmt.Errorf("failed to add trigger entry: %w", err)
445 }
446
447 pipeline := models.Pipeline{
448 Rkey: msg.Rkey,
449 Knot: source.Host,
450 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
451 RepoName: repoName,
452 RepoDid: repo.RepoDid,
453 TriggerId: int(triggerId),
454 Sha: sha,
455 }
456
457 err = db.AddPipeline(tx, pipeline)
458 if err != nil {
459 return fmt.Errorf("failed to add pipeline: %w", err)
460 }
461
462 err = tx.Commit()
463 if err != nil {
464 return fmt.Errorf("failed to commit txn: %w", err)
465 }
466
467 return nil
468}
469
470func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error {
471 logger := log.FromContext(ctx)
472
473 var record knotdb.RepoDIDAssign
474 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
475 return fmt.Errorf("unmarshal didAssign: %w", err)
476 }
477
478 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
479 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
480 record.RepoDid, record.OwnerDid, record.RepoName)
481 }
482
483 logger.Info("processing didAssign event",
484 "repo_did", record.RepoDid,
485 "owner_did", record.OwnerDid,
486 "repo_name", record.RepoName)
487
488 repos, err := db.GetRepos(d,
489 orm.FilterEq("did", record.OwnerDid),
490 orm.FilterEq("name", record.RepoName),
491 )
492 if err != nil || len(repos) == 0 {
493 logger.Warn("didAssign for unknown repo, skipping",
494 "owner_did", record.OwnerDid,
495 "repo_name", record.RepoName)
496 return nil
497 }
498 repo := repos[0]
499 knot := source.Host
500
501 if repo.Knot != knot {
502 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot)
503 }
504
505 repoAtUri := repo.RepoAt().String()
506 legacyResource := record.OwnerDid + "/" + record.RepoName
507
508 if repo.RepoDid != record.RepoDid {
509 tx, err := d.Begin()
510 if err != nil {
511 return fmt.Errorf("begin didAssign txn: %w", err)
512 }
513 defer tx.Rollback()
514
515 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
516 return fmt.Errorf("cascade repo_did: %w", err)
517 }
518
519 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
520 return fmt.Errorf("enqueue pds rewrites: %w", err)
521 }
522
523 if err := tx.Commit(); err != nil {
524 return fmt.Errorf("commit didAssign txn: %w", err)
525 }
526 }
527
528 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
529 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
530 }
531 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
532 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
533 }
534
535 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid))
536 if collabErr != nil {
537 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
538 }
539 for _, c := range collabs {
540 collabDid := c.SubjectDid.String()
541 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
542 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
543 }
544 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
545 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
546 }
547 }
548
549 if err := enforcer.E.SavePolicy(); err != nil {
550 return fmt.Errorf("save RBAC policies after didAssign: %w", err)
551 }
552
553 logger.Info("didAssign processed successfully",
554 "repo_did", record.RepoDid,
555 "owner_did", record.OwnerDid,
556 "repo_name", record.RepoName)
557
558 return nil
559}