Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "strings"
11 "time"
12
13 "tangled.org/core/appview/cloudflare"
14 "tangled.org/core/appview/notify"
15
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/sites"
21 ec "tangled.org/core/eventconsumer"
22 "tangled.org/core/eventstream"
23 knotdb "tangled.org/core/knotserver/db"
24 "tangled.org/core/log"
25 "tangled.org/core/orm"
26 "tangled.org/core/rbac"
27 "tangled.org/core/workflow"
28
29 "github.com/bluesky-social/indigo/atproto/syntax"
30 "github.com/go-git/go-git/v5/plumbing"
31 "github.com/posthog/posthog-go"
32)
33
34func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
35 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null"))
36 if err != nil {
37 return nil, err
38 }
39
40 hosts := make([]string, len(knots))
41 for i, k := range knots {
42 hosts[i] = k.Domain
43 }
44
45 return bootstrapStream(
46 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr,
47 c.Knotstream, c.Core.Dev,
48 knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
49 ), nil
50}
51
52func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) {
53 if repoDid != nil && *repoDid != "" {
54 return db.GetRepoByDid(d, *repoDid)
55 }
56 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName)))
57 if err != nil {
58 return nil, err
59 }
60 if len(repos) == 0 {
61 return nil, sql.ErrNoRows
62 }
63 return &repos[0], nil
64}
65
66func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
67 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error {
68 switch msg.Nsid {
69 case tangled.GitRefUpdateNSID:
70 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
71 case tangled.PipelineNSID:
72 return ingestPipeline(d, source, msg)
73 case knotdb.RepoDIDAssignNSID:
74 return ingestDIDAssign(d, enforcer, source, msg, ctx)
75 }
76
77 return nil
78 }
79}
80
81// TODO(boltless): remove this. knotmirror should do all sort of indexing
82func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error {
83 logger := log.FromContext(ctx)
84
85 var record tangled.GitRefUpdate
86 err := json.Unmarshal(msg.EventJson, &record)
87 if err != nil {
88 return err
89 }
90
91 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
92 if err != nil {
93 return err
94 }
95 if !slices.Contains(knownKnots, source.Host) {
96 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Host)
97 }
98
99 if record.Repo == "" {
100 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host)
101 }
102
103 repo, lookupErr := db.GetRepoByDid(d, record.Repo)
104 if lookupErr != nil {
105 return fmt.Errorf("failed to look up repo: %w", lookupErr)
106 }
107
108 logger.Info("processing gitRefUpdate event",
109 "repo", repo.RepoIdentifier(),
110 "ref", record.Ref,
111 "old_sha", record.OldSha,
112 "new_sha", record.NewSha)
113
114 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
115
116 errPunchcard := populatePunchcard(d, record)
117 errLanguages := updateRepoLanguages(d, record)
118
119 var errPosthog error
120 if !dev && record.CommitterDid != "" {
121 errPosthog = pc.Enqueue(posthog.Capture{
122 DistinctId: record.CommitterDid,
123 Event: "git_ref_update",
124 })
125 }
126
127 // Trigger a sites redeploy if this push is to the configured sites branch.
128 if cfClient.Enabled() {
129 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
130 }
131
132 return errors.Join(errPunchcard, errLanguages, errPosthog)
133}
134
135// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
136// branch configured for this repo and, if so, syncs the site to R2
137func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) {
138 logger := log.FromContext(ctx)
139
140 ref := plumbing.ReferenceName(record.Ref)
141 if !ref.IsBranch() {
142 return
143 }
144 pushedBranch := ref.Short()
145
146 repo, err := db.GetRepoByDid(d, record.Repo)
147 if err != nil {
148 return
149 }
150
151 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid)
152 if err != nil || siteConfig == nil {
153 return
154 }
155 if siteConfig.Branch != pushedBranch {
156 return
157 }
158
159 deploy := &models.SiteDeploy{
160 RepoDid: syntax.DID(repo.RepoDid),
161 Branch: siteConfig.Branch,
162 Dir: siteConfig.Dir,
163 CommitSHA: record.NewSha,
164 Trigger: models.SiteDeployTriggerPush,
165 }
166
167 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir)
168 if deployErr != nil {
169 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr)
170 deploy.Status = models.SiteDeployStatusFailure
171 deploy.Error = deployErr.Error()
172 } else {
173 deploy.Status = models.SiteDeployStatusSuccess
174 }
175
176 if err := db.AddSiteDeploy(d, deploy); err != nil {
177 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err)
178 }
179
180 if deployErr == nil {
181 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier())
182 }
183}
184
185func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
186 if record.CommitterDid == "" {
187 return nil
188 }
189
190 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
191 if err != nil {
192 return err
193 }
194
195 count := 0
196 for _, ke := range knownEmails {
197 if record.Meta == nil {
198 continue
199 }
200 if record.Meta.CommitCount == nil {
201 continue
202 }
203 for _, ce := range record.Meta.CommitCount.ByEmail {
204 if ce == nil {
205 continue
206 }
207 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
208 count += int(ce.Count)
209 }
210 }
211 }
212
213 punch := models.Punch{
214 Did: record.CommitterDid,
215 Date: time.Now(),
216 Count: count,
217 }
218 return db.AddPunch(d, punch)
219}
220
221func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
222 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
223 return fmt.Errorf("empty language data for repo: %s", record.Repo)
224 }
225
226 r, lookupErr := db.GetRepoByDid(d, record.Repo)
227 if lookupErr != nil {
228 return fmt.Errorf("failed to look up repo: %w", lookupErr)
229 }
230 repo := *r
231
232 ref := plumbing.ReferenceName(record.Ref)
233 if !ref.IsBranch() {
234 return fmt.Errorf("%s is not a valid reference name", ref)
235 }
236
237 var langs []models.RepoLanguage
238 for _, l := range record.Meta.LangBreakdown.Inputs {
239 if l == nil {
240 continue
241 }
242
243 langs = append(langs, models.RepoLanguage{
244 RepoDid: syntax.DID(repo.RepoDid),
245 Ref: ref.Short(),
246 IsDefaultRef: record.Meta.IsDefaultRef,
247 Language: l.Lang,
248 Bytes: l.Size,
249 })
250 }
251
252 tx, err := d.Begin()
253 if err != nil {
254 return err
255 }
256 defer tx.Rollback()
257
258 // update appview's cache
259 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs)
260 if err != nil {
261 fmt.Printf("failed; %s\n", err)
262 // non-fatal
263 }
264
265 return tx.Commit()
266}
267
268func ingestPipeline(d *db.DB, source ec.Source, msg eventstream.Event) error {
269 var record tangled.Pipeline
270 err := json.Unmarshal(msg.EventJson, &record)
271 if err != nil {
272 return err
273 }
274
275 if record.TriggerMetadata == nil {
276 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
277 }
278
279 if record.TriggerMetadata.Repo == nil {
280 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
281 }
282
283 repoName := ""
284 if record.TriggerMetadata.Repo.Repo != nil {
285 repoName = *record.TriggerMetadata.Repo.Repo
286 }
287
288 repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName)
289 if lookupErr != nil {
290 return fmt.Errorf("failed to look up repo: %w", lookupErr)
291 }
292 if repo.Spindle == "" {
293 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
294 }
295
296 // trigger info
297 var trigger models.Trigger
298 var sha string
299 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
300 switch trigger.Kind {
301 case workflow.TriggerKindPush:
302 trigger.PushRef = &record.TriggerMetadata.Push.Ref
303 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
304 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
305 sha = *trigger.PushNewSha
306 case workflow.TriggerKindPullRequest:
307 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
308 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
309 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
310 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
311 sha = *trigger.PRSourceSha
312 }
313
314 tx, err := d.Begin()
315 if err != nil {
316 return fmt.Errorf("failed to start txn: %w", err)
317 }
318
319 triggerId, err := db.AddTrigger(tx, trigger)
320 if err != nil {
321 return fmt.Errorf("failed to add trigger entry: %w", err)
322 }
323
324 pipeline := models.Pipeline{
325 Rkey: msg.Rkey,
326 Knot: source.Host,
327 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
328 RepoName: repoName,
329 RepoDid: repo.RepoDid,
330 TriggerId: int(triggerId),
331 Sha: sha,
332 }
333
334 err = db.AddPipeline(tx, pipeline)
335 if err != nil {
336 return fmt.Errorf("failed to add pipeline: %w", err)
337 }
338
339 err = tx.Commit()
340 if err != nil {
341 return fmt.Errorf("failed to commit txn: %w", err)
342 }
343
344 return nil
345}
346
347func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error {
348 logger := log.FromContext(ctx)
349
350 var record knotdb.RepoDIDAssign
351 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
352 return fmt.Errorf("unmarshal didAssign: %w", err)
353 }
354
355 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
356 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
357 record.RepoDid, record.OwnerDid, record.RepoName)
358 }
359
360 logger.Info("processing didAssign event",
361 "repo_did", record.RepoDid,
362 "owner_did", record.OwnerDid,
363 "repo_name", record.RepoName)
364
365 repos, err := db.GetRepos(d,
366 orm.FilterEq("did", record.OwnerDid),
367 orm.FilterEq("rkey", strings.ToLower(record.RepoName)),
368 )
369 if err != nil || len(repos) == 0 {
370 logger.Warn("didAssign for unknown repo, skipping",
371 "owner_did", record.OwnerDid,
372 "repo_name", record.RepoName)
373 return nil
374 }
375 repo := repos[0]
376 knot := source.Host
377
378 if repo.Knot != knot {
379 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot)
380 }
381
382 repoAtUri := repo.RepoAt().String()
383 legacyResource := record.OwnerDid + "/" + record.RepoName
384
385 if repo.RepoDid != record.RepoDid {
386 tx, err := d.Begin()
387 if err != nil {
388 return fmt.Errorf("begin didAssign txn: %w", err)
389 }
390 defer tx.Rollback()
391
392 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
393 return fmt.Errorf("cascade repo_did: %w", err)
394 }
395
396 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
397 return fmt.Errorf("enqueue pds rewrites: %w", err)
398 }
399
400 if err := tx.Commit(); err != nil {
401 return fmt.Errorf("commit didAssign txn: %w", err)
402 }
403 }
404
405 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
406 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
407 }
408 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
409 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
410 }
411
412 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid))
413 if collabErr != nil {
414 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
415 }
416 for _, c := range collabs {
417 collabDid := c.SubjectDid.String()
418 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
419 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
420 }
421 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
422 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
423 }
424 }
425
426 if err := enforcer.E.SavePolicy(); err != nil {
427 return fmt.Errorf("save RBAC policies after didAssign: %w", err)
428 }
429
430 logger.Info("didAssign processed successfully",
431 "repo_did", record.RepoDid,
432 "owner_did", record.OwnerDid,
433 "repo_name", record.RepoName)
434
435 return nil
436}