Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/knotserver/db"
20 "tangled.org/core/knotserver/git"
21 "tangled.org/core/log"
22 "tangled.org/core/rbac"
23 "tangled.org/core/workflow"
24)
25
26func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
27 l := log.FromContext(ctx)
28 raw := json.RawMessage(event.Commit.Record)
29 did := event.Did
30
31 var record tangled.PublicKey
32 if err := json.Unmarshal(raw, &record); err != nil {
33 return fmt.Errorf("failed to unmarshal record: %w", err)
34 }
35
36 pk := db.PublicKey{
37 Did: did,
38 PublicKey: record,
39 }
40 if err := h.db.AddPublicKey(pk); err != nil {
41 l.Error("failed to add public key", "error", err)
42 return fmt.Errorf("failed to add public key: %w", err)
43 }
44 l.Info("added public key from firehose", "did", did)
45 return nil
46}
47
48func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
49 l := log.FromContext(ctx)
50 raw := json.RawMessage(event.Commit.Record)
51 did := event.Did
52
53 var record tangled.KnotMember
54 if err := json.Unmarshal(raw, &record); err != nil {
55 return fmt.Errorf("failed to unmarshal record: %w", err)
56 }
57
58 if record.Domain != h.c.Server.Hostname {
59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
61 }
62
63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
64 if err != nil || !ok {
65 l.Error("failed to add member", "did", did)
66 return fmt.Errorf("failed to enforce permissions: %w", err)
67 }
68
69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
70 l.Error("failed to add member", "error", err)
71 return fmt.Errorf("failed to add member: %w", err)
72 }
73 l.Info("added member from firehose", "member", record.Subject)
74
75 if err := h.db.AddDid(record.Subject); err != nil {
76 l.Error("failed to add did", "error", err)
77 return fmt.Errorf("failed to add did: %w", err)
78 }
79 h.jc.AddDid(record.Subject)
80
81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
82 return fmt.Errorf("failed to fetch and add keys: %w", err)
83 }
84
85 return nil
86}
87
88// returns a repo path on disk if present, and error if not
89type targetRepo struct {
90 RepoPath string
91 OwnerDid string
92 RepoName string
93 RepoDid string
94 DefaultBranch string // default branch
95}
96
97func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
98 if record.Target == nil {
99 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
100 }
101
102 if record.Source == nil {
103 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
104 }
105
106 if record.Source.Repo != nil || record.Source.RepoDid != nil {
107 return nil, fmt.Errorf("ignoring pull record: fork based pull")
108 }
109
110 var repoPath, ownerDid, repoName, repoDid string
111 switch {
112 case record.Target.RepoDid != nil && *record.Target.RepoDid != "":
113 repoDid = *record.Target.RepoDid
114 var lookupErr error
115 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
116 if lookupErr != nil {
117 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
118 }
119
120 case record.Target.Repo != nil:
121 // TODO: get rid of this PDS fetch once all repos have DIDs
122 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo)
123 if parseErr != nil {
124 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
125 }
126
127 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
128 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
129 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
130 }
131
132 xrpcc := xrpc.Client{
133 Host: ident.PDSEndpoint(),
134 }
135
136 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
137 if getErr != nil {
138 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
139 }
140
141 repo := resp.Value.Val.(*tangled.Repo)
142
143 if repo.Knot != h.c.Server.Hostname {
144 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
145 }
146
147 ownerDid = ident.DID.String()
148 repoName = repo.Name
149
150 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
151 if didErr != nil {
152 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
153 }
154
155 var lookupErr error
156 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
157 if lookupErr != nil {
158 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
159 }
160
161 default:
162 return nil, fmt.Errorf("ignoring pull record: target has neither repo nor repoDid")
163 }
164
165 gr, err := git.Open(repoPath, record.Source.Branch)
166 if err != nil {
167 return nil, fmt.Errorf("failed to open git repository: %w", err)
168 }
169
170 defaultBranch, _ := gr.FindMainBranch()
171
172 return &targetRepo{
173 RepoPath: repoPath,
174 OwnerDid: ownerDid,
175 RepoName: repoName,
176 RepoDid: repoDid,
177 DefaultBranch: defaultBranch,
178 }, nil
179}
180
181func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
182 // resolve the PR owner's identity to fetch the blob from their PDS
183 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
184 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
185 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
186 }
187
188 if len(record.Rounds) == 0 {
189 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
190 }
191
192 roundNumber := len(record.Rounds) - 1
193 round := record.Rounds[roundNumber]
194
195 // fetch the blob from the PR owner's PDS
196 prOwnerPds := prOwnerIdent.PDSEndpoint()
197 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
198 if err != nil {
199 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
200 }
201 q := blobUrl.Query()
202 q.Set("cid", round.PatchBlob.Ref.String())
203 q.Set("did", did)
204 blobUrl.RawQuery = q.Encode()
205
206 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
207 if err != nil {
208 return nil, fmt.Errorf("failed to create blob request: %w", err)
209 }
210 req.Header.Set("Content-Type", "application/json")
211
212 blobResp, err := http.DefaultClient.Do(req)
213 if err != nil {
214 return nil, fmt.Errorf("failed to fetch blob: %w", err)
215 }
216 defer blobResp.Body.Close()
217
218 blob := io.ReadCloser(blobResp.Body)
219 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
220 if err != nil {
221 return nil, fmt.Errorf("failed to parse submission: %w", err)
222 }
223
224 return latestSubmission, nil
225}
226
227func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
228 gr, err := git.Open(repoPath, sha)
229 if err != nil {
230 return nil, fmt.Errorf("failed to open git repository: %w", err)
231 }
232
233 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
234 if err != nil {
235 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
236 }
237
238 var pipeline workflow.RawPipeline
239 for _, e := range workflowDir {
240 if !e.IsFile() {
241 continue
242 }
243
244 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
245 contents, err := gr.RawContent(fpath)
246 if err != nil {
247 continue
248 }
249
250 pipeline = append(pipeline, workflow.RawWorkflow{
251 Name: e.Name,
252 Contents: contents,
253 })
254 }
255
256 return pipeline, nil
257}
258
259func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
260 l := log.FromContext(ctx)
261
262 trigger := tangled.Pipeline_PullRequestTriggerData{
263 Action: "create",
264 SourceBranch: sourceBranch,
265 SourceSha: sourceSha,
266 TargetBranch: targetBranch,
267 }
268
269 compiler := workflow.Compiler{
270 Trigger: tangled.Pipeline_TriggerMetadata{
271 Kind: string(workflow.TriggerKindPullRequest),
272 PullRequest: &trigger,
273 Repo: &tangled.Pipeline_TriggerRepo{
274 Knot: h.c.Server.Hostname,
275 RepoDid: &targetRepo.RepoDid,
276 Did: targetRepo.OwnerDid,
277 Repo: &targetRepo.RepoName,
278 DefaultBranch: targetRepo.DefaultBranch,
279 },
280 },
281 }
282
283 l.Info("raw", "raw", rawPipeline)
284 parsed := compiler.Parse(rawPipeline)
285 l.Info("parsed", "parsed", parsed)
286 compiled := compiler.Compile(parsed)
287
288 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
289
290 return compiled
291}
292
293func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
294 raw := json.RawMessage(event.Commit.Record)
295 rkey := event.Commit.RKey
296 did := event.Did
297
298 var record tangled.RepoPull
299 if err := json.Unmarshal(raw, &record); err != nil {
300 return fmt.Errorf("failed to unmarshal record: %w", err)
301 }
302
303 l := log.FromContext(ctx)
304 l = l.With("handler", "processPull")
305 l = l.With("did", did)
306
307 l.Info("validating pull record")
308 targetRepo, err := h.validatePullRecord(ctx, &record)
309 if err != nil {
310 l.Warn("pull record did not validate, skipping...")
311 return err
312 }
313
314 l = l.With("target_repo", record.Target.Repo)
315 l = l.With("target_branch", record.Target.Branch)
316
317 l.Info("fetching latest submission")
318 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
319 if err != nil {
320 return err
321 }
322
323 sha := latestSubmission.SourceRev
324 if sha == "" {
325 return fmt.Errorf("failed to extract source SHA from pull submission")
326 }
327 l = l.With("sha", sha)
328
329 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
330 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
331 if err != nil {
332 return err
333 }
334
335 l.Info("compiling pipeline", "workflow_count", len(pipeline))
336 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
337
338 // do not run empty pipelines
339 if cp.Workflows == nil {
340 l.Info("skipping empty pipeline")
341 return nil
342 }
343
344 l.Info("marshaling pipeline event")
345 eventJson, err := json.Marshal(cp)
346 if err != nil {
347 return fmt.Errorf("failed to marshal pipeline event: %w", err)
348 }
349
350 ev := db.Event{
351 Rkey: TID(),
352 Nsid: tangled.PipelineNSID,
353 EventJson: string(eventJson),
354 }
355
356 l.Info("inserting pipeline event")
357 return h.db.InsertEvent(ev, h.n)
358}
359
360// duplicated from add collaborator
361func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
362 raw := json.RawMessage(event.Commit.Record)
363 did := event.Did
364
365 var record tangled.RepoCollaborator
366 if err := json.Unmarshal(raw, &record); err != nil {
367 return fmt.Errorf("failed to unmarshal record: %w", err)
368 }
369
370 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
371 if err != nil || subjectId.Handle.IsInvalidHandle() {
372 return err
373 }
374
375 var rbacResource string
376 switch {
377 case record.RepoDid != nil && *record.RepoDid != "":
378 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid)
379 if lookupErr != nil {
380 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr)
381 }
382 if ownerDid != did {
383 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid)
384 }
385 rbacResource = *record.RepoDid
386
387 case record.Repo != nil:
388 // TODO: get rid of this PDS fetch once all repos have DIDs
389 repoAt, parseErr := syntax.ParseATURI(*record.Repo)
390 if parseErr != nil {
391 return parseErr
392 }
393
394 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
395 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
396 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
397 }
398
399 xrpcc := xrpc.Client{
400 Host: owner.PDSEndpoint(),
401 }
402
403 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
404 if getErr != nil {
405 return getErr
406 }
407
408 repo := resp.Value.Val.(*tangled.Repo)
409 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name)
410 if didErr != nil {
411 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr)
412 }
413 rbacResource = repoDid
414
415 default:
416 return fmt.Errorf("collaborator record has neither repo nor repoDid")
417 }
418
419 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
420 if err != nil {
421 return fmt.Errorf("failed to check permissions: %w", err)
422 }
423 if !ok {
424 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
425 }
426
427 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
428 return err
429 }
430 h.jc.AddDid(subjectId.DID.String())
431
432 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
433 return err
434 }
435
436 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
437}
438
439func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
440 l := log.FromContext(ctx)
441
442 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
443 if err != nil {
444 l.Error("error building endpoint url", "did", did, "error", err.Error())
445 return fmt.Errorf("error building endpoint url: %w", err)
446 }
447
448 resp, err := http.Get(keysEndpoint)
449 if err != nil {
450 l.Error("error getting keys", "did", did, "error", err)
451 return fmt.Errorf("error getting keys: %w", err)
452 }
453 defer resp.Body.Close()
454
455 if resp.StatusCode == http.StatusNotFound {
456 l.Info("no keys found for did", "did", did)
457 return nil
458 }
459
460 plaintext, err := io.ReadAll(resp.Body)
461 if err != nil {
462 l.Error("error reading response body", "error", err)
463 return fmt.Errorf("error reading response body: %w", err)
464 }
465
466 for key := range strings.SplitSeq(string(plaintext), "\n") {
467 if key == "" {
468 continue
469 }
470 pk := db.PublicKey{
471 Did: did,
472 }
473 pk.Key = key
474 if err := h.db.AddPublicKey(pk); err != nil {
475 l.Error("failed to add public key", "error", err)
476 return fmt.Errorf("failed to add public key: %w", err)
477 }
478 }
479 return nil
480}
481
482func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
483 var err error
484 switch event.Kind {
485 case jmodels.EventKindIdentity:
486 err = h.resolver.InvalidateIdent(ctx, event.Did)
487 case jmodels.EventKindCommit:
488 switch event.Commit.Collection {
489 case tangled.PublicKeyNSID:
490 err = h.processPublicKey(ctx, event)
491 case tangled.KnotMemberNSID:
492 err = h.processKnotMember(ctx, event)
493 case tangled.RepoPullNSID:
494 err = h.processPull(ctx, event)
495 case tangled.RepoCollaboratorNSID:
496 err = h.processCollaborator(ctx, event)
497 }
498 default:
499 return nil
500 }
501
502 if err != nil {
503 args := []any{"kind", event.Kind, "err", err}
504 if event.Kind == jmodels.EventKindCommit {
505 args = append(args, "nsid", event.Commit.Collection)
506 }
507 h.l.Warn("failed to process event, skipping", args...)
508 }
509
510 lastTimeUs := event.TimeUS + 1
511 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
512 h.l.Error("failed to save cursor", "err", saveErr)
513 }
514
515 return nil
516}