Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/url"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
18 "tangled.org/core/api/tangled"
19 avmodels "tangled.org/core/appview/models"
20 "tangled.org/core/eventconsumer"
21 "tangled.org/core/log"
22 "tangled.org/core/rbac"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/git"
25 "tangled.org/core/spindle/models"
26 "tangled.org/core/tapc"
27 "tangled.org/core/tid"
28 "tangled.org/core/workflow"
29)
30
31const (
32 maxPendingPerRepo = 64
33 pendingCollabTTL = 10 * time.Minute
34)
35
36type pendingCollabEvent struct {
37 evt *tapc.RecordEventData
38 at time.Time
39}
40
41type Tap struct {
42 logger *slog.Logger
43 spindle *Spindle
44 tap tapc.Client
45 pendingMu sync.Mutex
46 pendingCollabs map[syntax.DID][]pendingCollabEvent
47}
48
49func NewTapClient(s *Spindle) *Tap {
50 return &Tap{
51 logger: log.SubLogger(s.l, "tapclient"),
52 spindle: s,
53 tap: tapc.NewClient(s.cfg.Server.Tap.Url, s.cfg.Server.Tap.AdminPassword),
54 pendingCollabs: make(map[syntax.DID][]pendingCollabEvent),
55 }
56}
57
58func (t *Tap) AddOwnerDIDs(ctx context.Context, dids []syntax.DID) error {
59 if len(dids) == 0 {
60 return nil
61 }
62 return t.tap.AddRepos(ctx, dids)
63}
64
65func (t *Tap) Start(connCtx context.Context) {
66 go t.tap.Connect(connCtx, &tapc.SimpleIndexer{
67 EventHandler: t.processEvent,
68 ConnectHandler: t.onConnect,
69 })
70 go t.purgePendingCollabsLoop(t.spindle.rootCtx)
71}
72
73func (t *Tap) onConnect(ctx context.Context) {
74 t.spindle.declareTapInterest(ctx)
75}
76
77func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
78 if evt.Type != tapc.EvtRecord || evt.Record == nil {
79 return nil
80 }
81 switch evt.Record.Collection.String() {
82 case tangled.RepoNSID:
83 return t.processRepo(ctx, evt.Record)
84 case tangled.RepoCollaboratorNSID:
85 return t.processCollaborator(ctx, evt.Record)
86 case tangled.RepoPullNSID:
87 return t.processPull(ctx, evt.Record)
88 }
89 return nil
90}
91
92func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
93 l := t.logger.With("collection", tangled.RepoNSID, "did", evt.Did, "rkey", evt.Rkey)
94
95 ownerDid := evt.Did
96 rkey := evt.Rkey
97
98 switch evt.Action {
99 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
100 record := tangled.Repo{}
101 if err := json.Unmarshal(evt.Record, &record); err != nil {
102 l.Warn("skipping invalid repo record", "err", err)
103 return nil
104 }
105
106 hostname := t.spindle.cfg.Server.Hostname
107 prior, priorErr := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
108 knownRepo := priorErr == nil
109
110 if record.Spindle == nil || *record.Spindle != hostname {
111 if knownRepo {
112 l.Info("tearing down repo reassigned from this spindle", "newSpindle", record.Spindle)
113 return t.teardownRepo(l, prior, ownerDid, rkey)
114 }
115 return nil
116 }
117
118 if record.RepoDid == nil || *record.RepoDid == "" {
119 l.Warn("skipping repo record without repoDid")
120 return nil
121 }
122 repoDid, err := syntax.ParseDID(*record.RepoDid)
123 if err != nil {
124 l.Warn("skipping repo record with malformed repoDid", "value", *record.RepoDid, "err", err)
125 return nil
126 }
127
128 if err := t.spindle.e.AddRepo(ownerDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
129 l.Error("failed to add repo policy", "err", err)
130 return fmt.Errorf("add repo policy: %w", err)
131 }
132
133 src := eventconsumer.NewKnotSource(record.Knot)
134 t.spindle.ks.AddSource(t.spindle.rootCtx, src)
135
136 repo := db.Repo{
137 Knot: record.Knot,
138 Owner: ownerDid,
139 Rkey: rkey,
140 RepoDid: repoDid,
141 CreatedAt: record.CreatedAt,
142 }
143
144 if err := t.spindle.db.AddRepo(repo); err != nil {
145 l.Error("failed to add repo row", "err", err)
146 return fmt.Errorf("add repo: %w", err)
147 }
148
149 // setup sparse sync
150 repoCloneUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid)
151 repoPath := t.spindle.newRepoPath(repo.RepoDid)
152 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil {
153 return fmt.Errorf("setting up sparse-clone git repo: %w", err)
154 }
155
156 legacyName := ""
157 if record.Name != nil {
158 legacyName = *record.Name
159 }
160 migrateLegacyRepoSecrets(ctx, t.spindle.db, t.spindle.vault, l, ownerDid, legacyName, rkey, repoDid)
161 migrateLegacyRepoCasbin(ctx, t.spindle.db, t.spindle.e, l, ownerDid, legacyName, rkey, repoDid)
162
163 if removed, err := t.spindle.db.CollapseRepoSiblings(ownerDid, repoDid); err != nil {
164 l.Warn("collapse rename siblings failed", "err", err)
165 } else if removed > 0 {
166 l.Info("collapsed rename leftovers", "owner", ownerDid, "repo_did", repoDid, "removed", removed)
167 }
168
169 if e := t.spindle.embedTap; e == nil || !e.closed.Load() {
170 if err := t.tap.AddRepos(ctx, []syntax.DID{ownerDid}); err != nil {
171 l.Warn("tap AddRepos rejected", "did", ownerDid, "err", err)
172 }
173 }
174 t.spindle.jc.AddDid(ownerDid.String())
175
176 t.drainPendingCollabs(ctx, repoDid)
177
178 case tapc.RecordDeleteAction:
179 repo, err := t.spindle.db.GetRepoByOwnerRkey(ownerDid, rkey)
180 if err != nil {
181 l.Info("skipping delete for unknown repo")
182 return nil
183 }
184 return t.teardownRepo(l, repo, ownerDid, rkey)
185 }
186 return nil
187}
188
189func (t *Tap) teardownRepo(l *slog.Logger, repo *db.Repo, ownerDid syntax.DID, rkey syntax.RecordKey) error {
190 if repo.RepoDid != "" {
191 collabs, err := t.spindle.db.ListCollaboratorsByRepoDid(repo.RepoDid)
192 if err != nil {
193 l.Error("failed to list collaborators for cleanup", "err", err)
194 return fmt.Errorf("list collaborators: %w", err)
195 }
196 for _, c := range collabs {
197 if err := t.spindle.e.RemoveCollaborator(c.Subject.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
198 l.Error("failed to remove collaborator policy", "subject", c.Subject, "err", err)
199 return fmt.Errorf("remove collaborator policy: %w", err)
200 }
201 }
202 if err := t.spindle.db.DeleteRepoCollaboratorsByRepoDid(repo.RepoDid); err != nil {
203 l.Error("failed to clear collaborator rows", "err", err)
204 return err
205 }
206 if err := t.spindle.e.RemoveRepo(ownerDid.String(), rbac.ThisServer, repo.RepoDid.String()); err != nil {
207 l.Error("failed to remove repo policy", "err", err)
208 return fmt.Errorf("remove repo policy: %w", err)
209 }
210 }
211 if err := t.spindle.db.DeleteRepoByOwnerRkey(ownerDid, rkey); err != nil {
212 l.Error("failed to delete repo row", "err", err)
213 return fmt.Errorf("delete repo row: %w", err)
214 }
215 // TODO: clear sparse-synced git repo
216 return nil
217}
218
219func (t *Tap) processCollaborator(ctx context.Context, evt *tapc.RecordEventData) error {
220 l := t.logger.With("collection", tangled.RepoCollaboratorNSID, "did", evt.Did, "rkey", evt.Rkey)
221
222 switch evt.Action {
223 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
224 record := tangled.RepoCollaborator{}
225 if err := json.Unmarshal(evt.Record, &record); err != nil {
226 l.Warn("skipping invalid collaborator record", "err", err)
227 return nil
228 }
229
230 actor := evt.Did
231 rkey := evt.Rkey
232
233 subjectDid, err := syntax.ParseDID(record.Subject)
234 if err != nil {
235 l.Info("skipping collaborator with malformed subject DID", "subject", record.Subject, "err", err)
236 return nil
237 }
238 if _, err := t.spindle.res.ResolveIdent(ctx, subjectDid.String()); err != nil {
239 l.Info("skipping unresolvable collaborator subject", "subject", subjectDid, "err", err)
240 return nil
241 }
242
243 repoRefDid, err := syntax.ParseDID(record.Repo)
244 if err != nil {
245 l.Info("skipping collaborator with non-DID repo ref", "repo", record.Repo, "err", err)
246 return nil
247 }
248 repo, lookupErr := t.spindle.db.GetRepoByDid(repoRefDid)
249 if errors.Is(lookupErr, sql.ErrNoRows) {
250 t.bufferCollab(repoRefDid, evt)
251 l.Info("buffering collaborator until repo arrives", "repo", repoRefDid)
252 return nil
253 }
254 if lookupErr != nil {
255 return fmt.Errorf("lookup repo %s: %w", repoRefDid, lookupErr)
256 }
257 repoDid := repo.RepoDid
258 ownerDid := repo.Owner
259
260 if actor != ownerDid {
261 l.Info("rejecting collaborator with non-owner actor", "actor", actor, "owner", ownerDid)
262 return nil
263 }
264
265 ok, err := t.spindle.e.IsCollaboratorInviteAllowed(ownerDid.String(), rbac.ThisServer, repoDid.String())
266 if err != nil {
267 l.Error("invite permission check failed", "err", err)
268 return fmt.Errorf("invite check: %w", err)
269 }
270 if !ok {
271 l.Info("rejecting collaborator invite", "owner", ownerDid, "repo", repoDid)
272 return nil
273 }
274
275 prior, priorErr := t.spindle.db.GetRepoCollaborator(actor, rkey)
276 staleSubject := priorErr == nil && (prior.Subject != subjectDid || prior.RepoDid != repoDid)
277
278 if err := t.spindle.e.AddCollaborator(subjectDid.String(), rbac.ThisServer, repoDid.String()); err != nil {
279 l.Error("failed to add collaborator policy", "err", err)
280 return fmt.Errorf("add collaborator policy: %w", err)
281 }
282 if staleSubject {
283 if err := t.spindle.e.RemoveCollaborator(prior.Subject.String(), rbac.ThisServer, prior.RepoDid.String()); err != nil {
284 l.Error("failed to remove stale collaborator policy", "err", err)
285 return fmt.Errorf("remove stale collaborator: %w", err)
286 }
287 }
288 if err := t.spindle.db.AddRepoCollaborator(db.RepoCollaborator{
289 OwnerDid: actor,
290 Rkey: rkey,
291 Subject: subjectDid,
292 RepoDid: repoDid,
293 }); err != nil {
294 l.Error("failed to persist collaborator row", "err", err)
295 return fmt.Errorf("track collaborator: %w", err)
296 }
297
298 case tapc.RecordDeleteAction:
299 actor := evt.Did
300 rkey := evt.Rkey
301
302 tracked, err := t.spindle.db.GetRepoCollaborator(actor, rkey)
303 if err != nil {
304 l.Info("skipping delete for unknown collaborator record")
305 return nil
306 }
307 if err := t.spindle.e.RemoveCollaborator(tracked.Subject.String(), rbac.ThisServer, tracked.RepoDid.String()); err != nil {
308 l.Error("failed to remove collaborator policy", "err", err)
309 return fmt.Errorf("remove collaborator policy: %w", err)
310 }
311 if err := t.spindle.db.DeleteRepoCollaborator(actor, rkey); err != nil {
312 l.Error("failed to delete collaborator row", "err", err)
313 return fmt.Errorf("delete collaborator row: %w", err)
314 }
315 }
316 return nil
317}
318
319func (t *Tap) processPull(ctx context.Context, evt *tapc.RecordEventData) error {
320 l := t.logger.With("collection", evt.Collection, "did", evt.Did, "rkey", evt.Rkey)
321
322 // only listen to live events
323 if !evt.Live {
324 l.Info("skipping backfill event", "event", evt.AtUri())
325 return nil
326 }
327
328 switch evt.Action {
329 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
330 record := tangled.RepoPull{}
331 if err := json.Unmarshal(evt.Record, &record); err != nil {
332 l.Error("invalid record", "err", err)
333 return fmt.Errorf("parsing record: %w", err)
334 }
335
336 // ignore legacy records
337 if record.Target == nil {
338 l.Info("ignoring pull record: target repo is nil")
339 return nil
340 }
341
342 // ignore patch-based and fork-based PRs
343 if record.Source == nil || record.Source.Repo != nil {
344 l.Info("ignoring pull record: not a branch-based pull request")
345 return nil
346 }
347
348 // skip if target repo is unknown
349 repo, err := t.spindle.db.GetRepoByDid(syntax.DID(record.Target.Repo))
350 if err != nil {
351 l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err)
352 return fmt.Errorf("target repo is unknown")
353 }
354
355 // only accept branch-based PR (excluding patch-based and fork-based)
356 if record.Source == nil || record.Source.Repo != nil {
357 l.Warn("skipping non-branch-based PR")
358 return nil
359 }
360
361 latestSubmission, err := t.fetchLatestSubmission(ctx, evt.Did.String(), evt.Rkey.String(), &record)
362 if err != nil {
363 return err
364 }
365 sourceSha := latestSubmission.SourceRev
366
367 scheme := "https"
368 if t.spindle.cfg.Server.Dev {
369 scheme = "http"
370 }
371 client := &indigoxrpc.Client{Host: fmt.Sprintf("%s://%s", scheme, repo.Knot)}
372
373 // fetch current default branch
374 defaultBranch, _ := func(repo syntax.DID) (string, error) {
375 defaultBranchOut, err := tangled.RepoGetDefaultBranch(ctx, client, repo.String())
376 if err != nil {
377 return "", err
378 }
379 return defaultBranchOut.Name, nil
380 }(repo.RepoDid)
381
382 compiler := workflow.Compiler{
383 Trigger: tangled.Pipeline_TriggerMetadata{
384 Kind: string(workflow.TriggerKindPullRequest),
385 PullRequest: &tangled.Pipeline_PullRequestTriggerData{
386 Action: "create",
387 SourceBranch: record.Source.Branch,
388 SourceSha: sourceSha,
389 TargetBranch: record.Target.Branch,
390 },
391 Repo: &tangled.Pipeline_TriggerRepo{
392 Did: repo.Owner.String(),
393 Knot: repo.Knot,
394 Repo: (*string)(&repo.Rkey),
395 RepoDid: (*string)(&repo.RepoDid),
396 DefaultBranch: defaultBranch,
397 },
398 },
399 }
400
401 repoUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid)
402 repoPath := t.spindle.newRepoPath(repo.RepoDid)
403
404 // load workflow definitions from rev (without spindle context)
405 rawPipeline, err := t.spindle.loadPipeline(ctx, repoUri, repoPath, sourceSha)
406 if err != nil {
407 // don't retry
408 l.Error("failed loading pipeline", "err", err)
409 return nil
410 }
411 if len(rawPipeline) == 0 {
412 l.Info("no workflow definition find for the repo. skipping the event")
413 return nil
414 }
415 tpl := compiler.Compile(compiler.Parse(rawPipeline))
416 // TODO: pass compile error to workflow log
417 for _, w := range compiler.Diagnostics.Errors {
418 l.Error(w.String())
419 }
420 for _, w := range compiler.Diagnostics.Warnings {
421 l.Warn(w.String())
422 }
423 if len(tpl.Workflows) == 0 {
424 l.Info("no workflow matching trigger 'pull_request'. skipping the event")
425 return nil
426 }
427
428 pipelineId := models.PipelineId{
429 Knot: tpl.TriggerMetadata.Repo.Knot,
430 Rkey: tid.TID(),
431 }
432 if err := t.spindle.db.CreatePipelineEvent(pipelineId.Rkey, tpl, t.spindle.n); err != nil {
433 l.Error("failed to create pipeline event", "err", err)
434 return nil
435 }
436 err = t.spindle.processPipeline(ctx, repo.RepoDid, tpl, pipelineId)
437 if err != nil {
438 // don't retry
439 l.Error("failed processing pipeline", "err", err)
440 return nil
441 }
442 case tapc.RecordDeleteAction:
443 // no-op
444 }
445 return nil
446}
447
448func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) {
449 t.pendingMu.Lock()
450 defer t.pendingMu.Unlock()
451 list := t.pendingCollabs[repoDid]
452 list = append(list, pendingCollabEvent{evt: evt, at: time.Now()})
453 if len(list) > maxPendingPerRepo {
454 list = list[len(list)-maxPendingPerRepo:]
455 }
456 t.pendingCollabs[repoDid] = list
457}
458
459func (t *Tap) drainPendingCollabs(ctx context.Context, repoDid syntax.DID) {
460 t.pendingMu.Lock()
461 list := t.pendingCollabs[repoDid]
462 delete(t.pendingCollabs, repoDid)
463 t.pendingMu.Unlock()
464 if len(list) == 0 {
465 return
466 }
467 cutoff := time.Now().Add(-pendingCollabTTL)
468 for _, p := range list {
469 if p.at.Before(cutoff) {
470 continue
471 }
472 if err := t.processCollaborator(ctx, p.evt); err != nil {
473 t.logger.Warn("replaying buffered collaborator failed", "repo", repoDid, "rkey", p.evt.Rkey, "err", err)
474 }
475 }
476}
477
478func (t *Tap) purgePendingCollabsLoop(ctx context.Context) {
479 ticker := time.NewTicker(pendingCollabTTL / 2)
480 defer ticker.Stop()
481 for {
482 select {
483 case <-ctx.Done():
484 return
485 case <-ticker.C:
486 t.purgeStalePendingCollabs()
487 }
488 }
489}
490
491func (t *Tap) purgeStalePendingCollabs() {
492 cutoff := time.Now().Add(-pendingCollabTTL)
493 t.pendingMu.Lock()
494 defer t.pendingMu.Unlock()
495 expired := 0
496 for did, list := range t.pendingCollabs {
497 kept := list[:0]
498 for _, p := range list {
499 if !p.at.Before(cutoff) {
500 kept = append(kept, p)
501 } else {
502 expired++
503 }
504 }
505 if len(kept) == 0 {
506 delete(t.pendingCollabs, did)
507 } else {
508 t.pendingCollabs[did] = kept
509 }
510 }
511 if expired > 0 {
512 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL)
513 }
514}
515
516func (t *Tap) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*avmodels.PullSubmission, error) {
517 // resolve the PR owner's identity to fetch the blob from their PDS
518 prOwnerIdent, err := t.spindle.res.ResolveIdent(ctx, did)
519 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
520 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
521 }
522
523 if len(record.Rounds) == 0 {
524 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
525 }
526
527 roundNumber := len(record.Rounds) - 1
528 round := record.Rounds[roundNumber]
529
530 // fetch the blob from the PR owner's PDS
531 prOwnerPds := prOwnerIdent.PDSEndpoint()
532 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
533 if err != nil {
534 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
535 }
536 q := blobUrl.Query()
537 q.Set("cid", round.PatchBlob.Ref.String())
538 q.Set("did", did)
539 blobUrl.RawQuery = q.Encode()
540
541 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
542 if err != nil {
543 return nil, fmt.Errorf("failed to create blob request: %w", err)
544 }
545 req.Header.Set("Content-Type", "application/json")
546
547 blobResp, err := http.DefaultClient.Do(req)
548 if err != nil {
549 return nil, fmt.Errorf("failed to fetch blob: %w", err)
550 }
551 defer blobResp.Body.Close()
552
553 blob := io.ReadCloser(blobResp.Body)
554 latestSubmission, err := avmodels.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
555 if err != nil {
556 return nil, fmt.Errorf("failed to parse submission: %w", err)
557 }
558
559 return latestSubmission, nil
560}