Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "strings"
11 "time"
12
13 "tangled.org/core/appview/cloudflare"
14 "tangled.org/core/appview/notify"
15
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/knotcompat"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/sites"
22 "tangled.org/core/consts"
23 ec "tangled.org/core/eventconsumer"
24 "tangled.org/core/eventstream"
25 knotdb "tangled.org/core/knotserver/db"
26 "tangled.org/core/log"
27 "tangled.org/core/orm"
28 "tangled.org/core/rbac"
29 "tangled.org/core/workflow"
30
31 "github.com/bluesky-social/indigo/atproto/syntax"
32 "github.com/go-git/go-git/v5/plumbing"
33 "github.com/posthog/posthog-go"
34)
35
36func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
37 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null"))
38 if err != nil {
39 return nil, err
40 }
41
42 hosts := make([]string, len(knots))
43 for i, k := range knots {
44 hosts[i] = k.Domain
45 }
46
47 return bootstrapStream(
48 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr,
49 c.Knotstream, c.Core.Dev,
50 knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
51 ), nil
52}
53
54func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) {
55 if repoDid != nil && *repoDid != "" {
56 return db.GetRepoByDid(d, *repoDid)
57 }
58 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName)))
59 if err != nil {
60 return nil, err
61 }
62 if len(repos) == 0 {
63 return nil, sql.ErrNoRows
64 }
65 return &repos[0], nil
66}
67
68func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
69 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error {
70 switch msg.Nsid {
71 case tangled.GitRefUpdateNSID:
72 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
73 case tangled.PipelineNSID:
74 return ingestPipeline(d, source, msg)
75 case knotdb.RepoDIDAssignNSID:
76 return ingestDIDAssign(d, enforcer, source, msg, ctx)
77 }
78
79 return nil
80 }
81}
82
83// TODO(boltless): remove this. knotmirror should do all sort of indexing
84func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error {
85 logger := log.FromContext(ctx)
86
87 var record tangled.GitRefUpdate
88 err := json.Unmarshal(msg.EventJson, &record)
89 if err != nil {
90 return err
91 }
92
93 if !knotcompat.KnotHasCapability(ctx, source.Host, dev, consts.CapKnotACL) {
94 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
95 switch {
96 case err != nil:
97 logger.Warn("gitRefUpdate membership lookup failed, ingesting without the sanity check", "committer", record.CommitterDid, "knot", source.Host, "err", err)
98 case !slices.Contains(knownKnots, source.Host):
99 logger.Warn("gitRefUpdate committer is not a known member of the knot, ingesting anyway", "committer", record.CommitterDid, "knot", source.Host)
100 }
101 }
102
103 if record.Repo == "" {
104 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host)
105 }
106
107 repo, lookupErr := db.GetRepoByDid(d, record.Repo)
108 if lookupErr != nil {
109 return fmt.Errorf("failed to look up repo: %w", lookupErr)
110 }
111
112 logger.Info("processing gitRefUpdate event",
113 "repo", repo.RepoIdentifier(),
114 "ref", record.Ref,
115 "old_sha", record.OldSha,
116 "new_sha", record.NewSha)
117
118 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
119
120 errPunchcard := populatePunchcard(d, record)
121 errLanguages := updateRepoLanguages(d, record)
122
123 var errPosthog error
124 if !dev && record.CommitterDid != "" {
125 errPosthog = pc.Enqueue(posthog.Capture{
126 DistinctId: record.CommitterDid,
127 Event: "git_ref_update",
128 })
129 }
130
131 // Trigger a sites redeploy if this push is to the configured sites branch.
132 if cfClient.Enabled() {
133 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
134 }
135
136 return errors.Join(errPunchcard, errLanguages, errPosthog)
137}
138
139// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
140// branch configured for this repo and, if so, syncs the site to R2
141func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) {
142 logger := log.FromContext(ctx)
143
144 ref := plumbing.ReferenceName(record.Ref)
145 if !ref.IsBranch() {
146 return
147 }
148 pushedBranch := ref.Short()
149
150 repo, err := db.GetRepoByDid(d, record.Repo)
151 if err != nil {
152 return
153 }
154
155 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid)
156 if err != nil || siteConfig == nil {
157 return
158 }
159 if siteConfig.Branch != pushedBranch {
160 return
161 }
162
163 deploy := &models.SiteDeploy{
164 RepoDid: syntax.DID(repo.RepoDid),
165 Branch: siteConfig.Branch,
166 Dir: siteConfig.Dir,
167 CommitSHA: record.NewSha,
168 Trigger: models.SiteDeployTriggerPush,
169 }
170
171 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir)
172 if deployErr != nil {
173 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr)
174 deploy.Status = models.SiteDeployStatusFailure
175 deploy.Error = deployErr.Error()
176 } else {
177 deploy.Status = models.SiteDeployStatusSuccess
178 }
179
180 if err := db.AddSiteDeploy(d, deploy); err != nil {
181 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err)
182 }
183
184 if deployErr == nil {
185 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier())
186 }
187}
188
189func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
190 if record.CommitterDid == "" {
191 return nil
192 }
193
194 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
195 if err != nil {
196 return err
197 }
198
199 count := 0
200 for _, ke := range knownEmails {
201 if record.Meta == nil {
202 continue
203 }
204 if record.Meta.CommitCount == nil {
205 continue
206 }
207 for _, ce := range record.Meta.CommitCount.ByEmail {
208 if ce == nil {
209 continue
210 }
211 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
212 count += int(ce.Count)
213 }
214 }
215 }
216
217 punch := models.Punch{
218 Did: record.CommitterDid,
219 Date: time.Now(),
220 Count: count,
221 }
222 return db.AddPunch(d, punch)
223}
224
225func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
226 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
227 return fmt.Errorf("empty language data for repo: %s", record.Repo)
228 }
229
230 r, lookupErr := db.GetRepoByDid(d, record.Repo)
231 if lookupErr != nil {
232 return fmt.Errorf("failed to look up repo: %w", lookupErr)
233 }
234 repo := *r
235
236 ref := plumbing.ReferenceName(record.Ref)
237 if !ref.IsBranch() {
238 return fmt.Errorf("%s is not a valid reference name", ref)
239 }
240
241 var langs []models.RepoLanguage
242 for _, l := range record.Meta.LangBreakdown.Inputs {
243 if l == nil {
244 continue
245 }
246
247 langs = append(langs, models.RepoLanguage{
248 RepoDid: syntax.DID(repo.RepoDid),
249 Ref: ref.Short(),
250 IsDefaultRef: record.Meta.IsDefaultRef,
251 Language: l.Lang,
252 Bytes: l.Size,
253 })
254 }
255
256 tx, err := d.Begin()
257 if err != nil {
258 return err
259 }
260 defer tx.Rollback()
261
262 // update appview's cache
263 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs)
264 if err != nil {
265 fmt.Printf("failed; %s\n", err)
266 // non-fatal
267 }
268
269 return tx.Commit()
270}
271
272func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error {
273 var record tangled.Pipeline
274 err := json.Unmarshal(msg.EventJson, &record)
275 if err != nil {
276 return err
277 }
278
279 if record.TriggerMetadata == nil {
280 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
281 }
282
283 if record.TriggerMetadata.Repo == nil {
284 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
285 }
286
287 repoName := ""
288 if record.TriggerMetadata.Repo.Repo != nil {
289 repoName = *record.TriggerMetadata.Repo.Repo
290 }
291
292 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName)
293 if lookupErr != nil {
294 return fmt.Errorf("failed to look up repo: %w", lookupErr)
295 }
296 if repo.Spindle == "" {
297 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
298 }
299
300 // trigger info
301 var trigger models.Trigger
302 var sha string
303 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
304 switch trigger.Kind {
305 case workflow.TriggerKindPush:
306 trigger.PushRef = &record.TriggerMetadata.Push.Ref
307 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
308 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
309 sha = *trigger.PushNewSha
310 case workflow.TriggerKindPullRequest:
311 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
312 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
313 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
314 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
315 sha = *trigger.PRSourceSha
316 }
317
318 tx, err := d.Begin()
319 if err != nil {
320 return fmt.Errorf("failed to start txn: %w", err)
321 }
322
323 triggerId, err := db.AddTrigger(tx, trigger)
324 if err != nil {
325 return fmt.Errorf("failed to add trigger entry: %w", err)
326 }
327
328 pipeline := models.Pipeline{
329 Rkey: msg.Rkey,
330 Knot: source.Host,
331 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
332 RepoName: repoName,
333 RepoDid: repo.RepoDid,
334 TriggerId: int(triggerId),
335 Sha: sha,
336 }
337
338 err = db.AddPipeline(tx, pipeline)
339 if err != nil {
340 return fmt.Errorf("failed to add pipeline: %w", err)
341 }
342
343 err = tx.Commit()
344 if err != nil {
345 return fmt.Errorf("failed to commit txn: %w", err)
346 }
347
348 return nil
349}
350
351func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error {
352 logger := log.FromContext(ctx)
353
354 var record knotdb.RepoDIDAssign
355 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
356 return fmt.Errorf("unmarshal didAssign: %w", err)
357 }
358
359 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
360 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
361 record.RepoDid, record.OwnerDid, record.RepoName)
362 }
363
364 logger.Info("processing didAssign event",
365 "repo_did", record.RepoDid,
366 "owner_did", record.OwnerDid,
367 "repo_name", record.RepoName)
368
369 repos, err := db.GetRepos(d,
370 orm.FilterEq("did", record.OwnerDid),
371 orm.FilterEq("name", record.RepoName),
372 )
373 if err != nil || len(repos) == 0 {
374 logger.Warn("didAssign for unknown repo, skipping",
375 "owner_did", record.OwnerDid,
376 "repo_name", record.RepoName)
377 return nil
378 }
379 repo := repos[0]
380 knot := source.Host
381
382 if repo.Knot != knot {
383 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot)
384 }
385
386 repoAtUri := repo.RepoAt().String()
387 legacyResource := record.OwnerDid + "/" + record.RepoName
388
389 if repo.RepoDid != record.RepoDid {
390 tx, err := d.Begin()
391 if err != nil {
392 return fmt.Errorf("begin didAssign txn: %w", err)
393 }
394 defer tx.Rollback()
395
396 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
397 return fmt.Errorf("cascade repo_did: %w", err)
398 }
399
400 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
401 return fmt.Errorf("enqueue pds rewrites: %w", err)
402 }
403
404 if err := tx.Commit(); err != nil {
405 return fmt.Errorf("commit didAssign txn: %w", err)
406 }
407 }
408
409 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
410 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
411 }
412 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
413 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
414 }
415
416 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid))
417 if collabErr != nil {
418 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
419 }
420 for _, c := range collabs {
421 collabDid := c.SubjectDid.String()
422 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
423 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
424 }
425 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
426 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
427 }
428 }
429
430 if err := enforcer.E.SavePolicy(); err != nil {
431 return fmt.Errorf("save RBAC policies after didAssign: %w", err)
432 }
433
434 logger.Info("didAssign processed successfully",
435 "repo_did", record.RepoDid,
436 "owner_did", record.OwnerDid,
437 "repo_name", record.RepoName)
438
439 return nil
440}