Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "slices"
10 "strings"
11 "time"
12
13 "tangled.org/core/appview/cloudflare"
14 "tangled.org/core/appview/notify"
15
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/knotacl"
20 "tangled.org/core/appview/knotcompat"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/sites"
23 "tangled.org/core/consts"
24 ec "tangled.org/core/eventconsumer"
25 "tangled.org/core/eventstream"
26 knotdb "tangled.org/core/knotserver/db"
27 "tangled.org/core/log"
28 "tangled.org/core/orm"
29 "tangled.org/core/rbac"
30
31 "github.com/bluesky-social/indigo/atproto/syntax"
32 "github.com/go-git/go-git/v5/plumbing"
33 "github.com/posthog/posthog-go"
34)
35
36type aclRoster interface {
37 AddKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error
38 RemoveKnotMember(host string, subject syntax.DID, cursor knotacl.Cursor) error
39 AddCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error
40 RemoveCollaborator(repoDid, subject syntax.DID, cursor knotacl.Cursor) error
41 InvalidateMembers(host string)
42 InvalidateCollaborators(host, repoDid string)
43}
44
45func Knotstream(ctx context.Context, c *config.Config, d *db.DB, acl *knotacl.Service, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
46 knots, err := db.GetRegistrations(d, orm.FilterIsNot("registered", "null"))
47 if err != nil {
48 return nil, err
49 }
50
51 hosts := make([]string, len(knots))
52 for i, k := range knots {
53 hosts[i] = k.Domain
54 }
55
56 return bootstrapStream(
57 ctx, "knotstream", ec.KindKnot, hosts, c.Redis.Addr,
58 c.Knotstream,
59 knotIngester(d, acl, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
60 ), nil
61}
62
63func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) {
64 if repoDid != nil && *repoDid != "" {
65 return db.GetRepoByDid(d, *repoDid)
66 }
67 repos, err := db.GetRepos(d, orm.FilterEq("did", ownerDid), orm.FilterEq("rkey", strings.ToLower(repoName)))
68 if err != nil {
69 return nil, err
70 }
71 if len(repos) == 0 {
72 return nil, sql.ErrNoRows
73 }
74 return &repos[0], nil
75}
76
77func knotIngester(d *db.DB, acl aclRoster, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
78 return func(ctx context.Context, source ec.Source, msg eventstream.Event) error {
79 switch msg.Nsid {
80 case tangled.GitRefUpdateNSID:
81 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
82 case knotdb.RepoDIDAssignNSID:
83 return ingestDIDAssign(d, enforcer, source, msg, ctx)
84 case knotdb.KnotMemberUpdateNSID:
85 return ingestKnotMemberUpdate(acl, source, msg)
86 case knotdb.RepoCollaboratorUpdateNSID:
87 return ingestCollaboratorUpdate(ctx, d, acl, source, msg)
88 }
89
90 return nil
91 }
92}
93
94const (
95 aclIngestAttempts = 3
96 aclIngestBackoff = 50 * time.Millisecond
97)
98
99func withAclRetry(attempts int, backoff time.Duration, op func() error) error {
100 if err := op(); err == nil || attempts <= 1 {
101 return err
102 }
103 time.Sleep(backoff)
104 return withAclRetry(attempts-1, backoff, op)
105}
106
107func ingestKnotMemberUpdate(acl aclRoster, source ec.Source, msg eventstream.Event) error {
108 var rec knotdb.KnotMemberUpdate
109 if err := json.Unmarshal(msg.EventJson, &rec); err != nil {
110 return fmt.Errorf("unmarshal memberUpdate: %w", err)
111 }
112
113 subject, err := syntax.ParseDID(rec.Subject)
114 if err != nil {
115 return fmt.Errorf("memberUpdate bad subject %q: %w", rec.Subject, err)
116 }
117
118 cursor := knotacl.Cursor(msg.Created)
119 switch rec.Op {
120 case knotdb.AclOpAdd:
121 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
122 return acl.AddKnotMember(source.Host, subject, cursor)
123 })
124 case knotdb.AclOpRemove:
125 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
126 return acl.RemoveKnotMember(source.Host, subject, cursor)
127 })
128 default:
129 return fmt.Errorf("memberUpdate unknown op %q", rec.Op)
130 }
131
132 if err != nil {
133 acl.InvalidateMembers(source.Host)
134 }
135 return err
136}
137
138func ingestCollaboratorUpdate(ctx context.Context, d *db.DB, acl aclRoster, source ec.Source, msg eventstream.Event) error {
139 var rec knotdb.RepoCollaboratorUpdate
140 if err := json.Unmarshal(msg.EventJson, &rec); err != nil {
141 return fmt.Errorf("unmarshal collaboratorUpdate: %w", err)
142 }
143
144 subject, err := syntax.ParseDID(rec.Subject)
145 if err != nil {
146 return fmt.Errorf("collaboratorUpdate bad subject %q: %w", rec.Subject, err)
147 }
148 repoDid, err := syntax.ParseDID(rec.Repo)
149 if err != nil {
150 return fmt.Errorf("collaboratorUpdate bad repo %q: %w", rec.Repo, err)
151 }
152
153 cursor := knotacl.Cursor(msg.Created)
154 switch rec.Op {
155 case knotdb.AclOpAdd:
156 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
157 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject)
158 if err != nil || !owned {
159 return err
160 }
161 return acl.AddCollaborator(repoDid, subject, cursor)
162 })
163 case knotdb.AclOpRemove:
164 err = withAclRetry(aclIngestAttempts, aclIngestBackoff, func() error {
165 owned, err := repoOwnedBySource(ctx, d, source, repoDid, subject)
166 if err != nil || !owned {
167 return err
168 }
169 return acl.RemoveCollaborator(repoDid, subject, cursor)
170 })
171 default:
172 return fmt.Errorf("collaboratorUpdate unknown op %q", rec.Op)
173 }
174
175 if err != nil {
176 acl.InvalidateCollaborators(source.Host, repoDid.String())
177 }
178 return err
179}
180
181func repoOwnedBySource(ctx context.Context, d *db.DB, source ec.Source, repoDid, subject syntax.DID) (bool, error) {
182 repo, err := db.GetRepoByDid(d, repoDid.String())
183 if errors.Is(err, sql.ErrNoRows) {
184 log.FromContext(ctx).Warn("collaboratorUpdate for unindexed repo, skipping until reconcile",
185 "repo_did", repoDid, "subject", subject)
186 return false, nil
187 }
188 if err != nil {
189 return false, err
190 }
191 if repo.Knot != source.Host {
192 log.FromContext(ctx).Warn("collaboratorUpdate for a repo this knot does not host, dropping",
193 "repo_did", repoDid, "subject", subject, "claimed_by", source.Host, "owner", repo.Knot)
194 return false, nil
195 }
196 return true, nil
197}
198
199// TODO(boltless): remove this. knotmirror should do all sort of indexing
200func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg eventstream.Event) error {
201 logger := log.FromContext(ctx)
202
203 var record tangled.GitRefUpdate
204 err := json.Unmarshal(msg.EventJson, &record)
205 if err != nil {
206 return err
207 }
208
209 if !knotcompat.KnotHasCapability(ctx, source.Host, dev, consts.CapKnotACL) {
210 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
211 switch {
212 case err != nil:
213 logger.Warn("gitRefUpdate membership lookup failed, ingesting without the sanity check", "committer", record.CommitterDid, "knot", source.Host, "err", err)
214 case !slices.Contains(knownKnots, source.Host):
215 logger.Warn("gitRefUpdate committer is not a known member of the knot, ingesting anyway", "committer", record.CommitterDid, "knot", source.Host)
216 }
217 }
218
219 if record.Repo == "" {
220 return fmt.Errorf("gitRefUpdate from %s missing repo", source.Host)
221 }
222
223 repo, lookupErr := db.GetRepoByDid(d, record.Repo)
224 if lookupErr != nil {
225 return fmt.Errorf("failed to look up repo: %w", lookupErr)
226 }
227
228 logger.Info("processing gitRefUpdate event",
229 "repo", repo.RepoIdentifier(),
230 "ref", record.Ref,
231 "old_sha", record.OldSha,
232 "new_sha", record.NewSha)
233
234 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
235
236 errPunchcard := populatePunchcard(d, record)
237 errLanguages := updateRepoLanguages(d, record)
238
239 var errPosthog error
240 if !dev && record.CommitterDid != "" {
241 errPosthog = pc.Enqueue(posthog.Capture{
242 DistinctId: record.CommitterDid,
243 Event: "git_ref_update",
244 })
245 }
246
247 // Trigger a sites redeploy if this push is to the configured sites branch.
248 if cfClient.Enabled() {
249 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
250 }
251
252 return errors.Join(errPunchcard, errLanguages, errPosthog)
253}
254
255// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
256// branch configured for this repo and, if so, syncs the site to R2
257func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, cfg *config.Config, record tangled.GitRefUpdate, source ec.Source) {
258 logger := log.FromContext(ctx)
259
260 ref := plumbing.ReferenceName(record.Ref)
261 if !ref.IsBranch() {
262 return
263 }
264 pushedBranch := ref.Short()
265
266 repo, err := db.GetRepoByDid(d, record.Repo)
267 if err != nil {
268 return
269 }
270
271 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoDid)
272 if err != nil || siteConfig == nil {
273 return
274 }
275 if siteConfig.Branch != pushedBranch {
276 return
277 }
278
279 deploy := &models.SiteDeploy{
280 RepoDid: syntax.DID(repo.RepoDid),
281 Branch: siteConfig.Branch,
282 Dir: siteConfig.Dir,
283 CommitSHA: record.NewSha,
284 Trigger: models.SiteDeployTriggerPush,
285 }
286
287 deployErr := sites.Deploy(ctx, cfClient, cfg, repo, siteConfig.Branch, siteConfig.Dir)
288 if deployErr != nil {
289 logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr)
290 deploy.Status = models.SiteDeployStatusFailure
291 deploy.Error = deployErr.Error()
292 } else {
293 deploy.Status = models.SiteDeployStatusSuccess
294 }
295
296 if err := db.AddSiteDeploy(d, deploy); err != nil {
297 logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err)
298 }
299
300 if deployErr == nil {
301 logger.Info("site deployed to r2", "repo", repo.RepoIdentifier())
302 }
303}
304
305func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
306 if record.CommitterDid == "" {
307 return nil
308 }
309
310 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
311 if err != nil {
312 return err
313 }
314
315 count := 0
316 for _, ke := range knownEmails {
317 if record.Meta == nil {
318 continue
319 }
320 if record.Meta.CommitCount == nil {
321 continue
322 }
323 for _, ce := range record.Meta.CommitCount.ByEmail {
324 if ce == nil {
325 continue
326 }
327 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
328 count += int(ce.Count)
329 }
330 }
331 }
332
333 punch := models.Punch{
334 Did: record.CommitterDid,
335 Date: time.Now(),
336 Count: count,
337 }
338 return db.AddPunch(d, punch)
339}
340
341func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
342 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
343 return fmt.Errorf("empty language data for repo: %s", record.Repo)
344 }
345
346 r, lookupErr := db.GetRepoByDid(d, record.Repo)
347 if lookupErr != nil {
348 return fmt.Errorf("failed to look up repo: %w", lookupErr)
349 }
350 repo := *r
351
352 ref := plumbing.ReferenceName(record.Ref)
353 if !ref.IsBranch() {
354 return fmt.Errorf("%s is not a valid reference name", ref)
355 }
356
357 var langs []models.RepoLanguage
358 for _, l := range record.Meta.LangBreakdown.Inputs {
359 if l == nil {
360 continue
361 }
362
363 langs = append(langs, models.RepoLanguage{
364 RepoDid: syntax.DID(repo.RepoDid),
365 Ref: ref.Short(),
366 IsDefaultRef: record.Meta.IsDefaultRef,
367 Language: l.Lang,
368 Bytes: l.Size,
369 })
370 }
371
372 tx, err := d.Begin()
373 if err != nil {
374 return err
375 }
376 defer tx.Rollback()
377
378 // update appview's cache
379 err = db.UpdateRepoLanguages(tx, syntax.DID(repo.RepoDid), ref.Short(), langs)
380 if err != nil {
381 fmt.Printf("failed; %s\n", err)
382 // non-fatal
383 }
384
385 return tx.Commit()
386}
387
388func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg eventstream.Event, ctx context.Context) error {
389 logger := log.FromContext(ctx)
390
391 var record knotdb.RepoDIDAssign
392 if err := json.Unmarshal(msg.EventJson, &record); err != nil {
393 return fmt.Errorf("unmarshal didAssign: %w", err)
394 }
395
396 if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
397 return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
398 record.RepoDid, record.OwnerDid, record.RepoName)
399 }
400
401 logger.Info("processing didAssign event",
402 "repo_did", record.RepoDid,
403 "owner_did", record.OwnerDid,
404 "repo_name", record.RepoName)
405
406 repos, err := db.GetRepos(d,
407 orm.FilterEq("did", record.OwnerDid),
408 orm.FilterEq("name", record.RepoName),
409 )
410 if err != nil || len(repos) == 0 {
411 logger.Warn("didAssign for unknown repo, skipping",
412 "owner_did", record.OwnerDid,
413 "repo_name", record.RepoName)
414 return nil
415 }
416 repo := repos[0]
417 knot := source.Host
418
419 if repo.Knot != knot {
420 return fmt.Errorf("didAssign from %s for repo hosted on %s, rejecting", knot, repo.Knot)
421 }
422
423 repoAtUri := repo.RepoAt().String()
424 legacyResource := record.OwnerDid + "/" + record.RepoName
425
426 if repo.RepoDid != record.RepoDid {
427 tx, err := d.Begin()
428 if err != nil {
429 return fmt.Errorf("begin didAssign txn: %w", err)
430 }
431 defer tx.Rollback()
432
433 if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
434 return fmt.Errorf("cascade repo_did: %w", err)
435 }
436
437 if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
438 return fmt.Errorf("enqueue pds rewrites: %w", err)
439 }
440
441 if err := tx.Commit(); err != nil {
442 return fmt.Errorf("commit didAssign txn: %w", err)
443 }
444 }
445
446 if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
447 return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
448 }
449 if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
450 return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
451 }
452
453 collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_did", record.RepoDid))
454 if collabErr != nil {
455 return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
456 }
457 for _, c := range collabs {
458 collabDid := c.SubjectDid.String()
459 if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
460 return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
461 }
462 if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
463 return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
464 }
465 }
466
467 if err := enforcer.E.SavePolicy(); err != nil {
468 return fmt.Errorf("save RBAC policies after didAssign: %w", err)
469 }
470
471 logger.Info("didAssign processed successfully",
472 "repo_did", record.RepoDid,
473 "owner_did", record.OwnerDid,
474 "repo_name", record.RepoName)
475
476 return nil
477}