Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/knotserver/db"
20 "tangled.org/core/knotserver/git"
21 knotxrpc "tangled.org/core/knotserver/xrpc"
22 "tangled.org/core/log"
23 "tangled.org/core/rbac"
24 "tangled.org/core/workflow"
25)
26
27func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
28 l := log.FromContext(ctx)
29 raw := json.RawMessage(event.Commit.Record)
30 did := event.Did
31
32 var record tangled.PublicKey
33 if err := json.Unmarshal(raw, &record); err != nil {
34 return fmt.Errorf("failed to unmarshal record: %w", err)
35 }
36
37 pk := db.PublicKey{
38 Did: did,
39 PublicKey: record,
40 }
41 if err := h.db.AddPublicKey(pk); err != nil {
42 l.Error("failed to add public key", "error", err)
43 return fmt.Errorf("failed to add public key: %w", err)
44 }
45 l.Info("added public key from firehose", "did", did)
46 return nil
47}
48
49func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
50 l := log.FromContext(ctx)
51 raw := json.RawMessage(event.Commit.Record)
52 did := event.Did
53
54 var record tangled.KnotMember
55 if err := json.Unmarshal(raw, &record); err != nil {
56 return fmt.Errorf("failed to unmarshal record: %w", err)
57 }
58
59 if record.Domain != h.c.Server.Hostname {
60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
62 }
63
64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
65 if err != nil || !ok {
66 l.Error("failed to add member", "did", did)
67 return fmt.Errorf("failed to enforce permissions: %w", err)
68 }
69
70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
71 l.Error("failed to add member", "error", err)
72 return fmt.Errorf("failed to add member: %w", err)
73 }
74 l.Info("added member from firehose", "member", record.Subject)
75
76 if err := h.db.AddDid(record.Subject); err != nil {
77 l.Error("failed to add did", "error", err)
78 return fmt.Errorf("failed to add did: %w", err)
79 }
80 h.jc.AddDid(record.Subject)
81
82 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
83 return fmt.Errorf("failed to fetch and add keys: %w", err)
84 }
85
86 return nil
87}
88
89// returns a repo path on disk if present, and error if not
90type targetRepo struct {
91 RepoPath string
92 OwnerDid string
93 RepoName string
94 RepoDid string
95 DefaultBranch string // default branch
96}
97
98func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
99 if record.Target == nil {
100 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
101 }
102
103 l := log.FromContext(ctx).With("handler", "validatePullRecord")
104 l = l.With("target_repo", record.Target.Repo)
105 l = l.With("target_branch", record.Target.Branch)
106
107 if record.Source == nil {
108 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
109 }
110
111 if record.Source.Repo != nil {
112 return nil, fmt.Errorf("ignoring pull record: fork based pull")
113 }
114
115 var repoPath, ownerDid, repoName, repoDid string
116 switch {
117 case strings.HasPrefix(record.Target.Repo, "did:"):
118 repoDid = record.Target.Repo
119 var lookupErr error
120 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
121 if lookupErr != nil {
122 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
123 }
124
125 case strings.Contains(record.Target.Repo, "/"):
126 // TODO: get rid of this PDS fetch once all repos have DIDs
127 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo)
128 if parseErr != nil {
129 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
130 }
131
132 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
133 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
134 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
135 }
136
137 xrpcc := xrpc.Client{
138 Host: ident.PDSEndpoint(),
139 }
140
141 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
142 if getErr != nil {
143 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
144 }
145
146 repo, ok := resp.Value.Val.(*tangled.Repo)
147 if !ok {
148 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
149 }
150
151 if repo.Knot != h.c.Server.Hostname {
152 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
153 }
154
155 ownerDid = ident.DID.String()
156 repoName = repoAt.RecordKey().String()
157
158 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
159 if didErr != nil {
160 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
161 }
162
163 var lookupErr error
164 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
165 if lookupErr != nil {
166 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
167 }
168
169 default:
170 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo)
171 }
172
173 gr, err := git.Open(repoPath, record.Source.Branch)
174 if err != nil {
175 return nil, fmt.Errorf("failed to open git repository: %w", err)
176 }
177
178 defaultBranch, _ := gr.FindMainBranch()
179
180 return &targetRepo{
181 RepoPath: repoPath,
182 OwnerDid: ownerDid,
183 RepoName: repoName,
184 RepoDid: repoDid,
185 DefaultBranch: defaultBranch,
186 }, nil
187}
188
189func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
190 // resolve the PR owner's identity to fetch the blob from their PDS
191 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
192 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
193 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
194 }
195
196 if len(record.Rounds) == 0 {
197 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
198 }
199
200 roundNumber := len(record.Rounds) - 1
201 round := record.Rounds[roundNumber]
202
203 // fetch the blob from the PR owner's PDS
204 prOwnerPds := prOwnerIdent.PDSEndpoint()
205 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
206 if err != nil {
207 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
208 }
209 q := blobUrl.Query()
210 q.Set("cid", round.PatchBlob.Ref.String())
211 q.Set("did", did)
212 blobUrl.RawQuery = q.Encode()
213
214 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
215 if err != nil {
216 return nil, fmt.Errorf("failed to create blob request: %w", err)
217 }
218 req.Header.Set("Content-Type", "application/json")
219
220 blobResp, err := http.DefaultClient.Do(req)
221 if err != nil {
222 return nil, fmt.Errorf("failed to fetch blob: %w", err)
223 }
224 defer blobResp.Body.Close()
225
226 blob := io.ReadCloser(blobResp.Body)
227 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
228 if err != nil {
229 return nil, fmt.Errorf("failed to parse submission: %w", err)
230 }
231
232 return latestSubmission, nil
233}
234
235func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
236 gr, err := git.Open(repoPath, sha)
237 if err != nil {
238 return nil, fmt.Errorf("failed to open git repository: %w", err)
239 }
240
241 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
242 if err != nil {
243 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
244 }
245
246 var pipeline workflow.RawPipeline
247 for _, e := range workflowDir {
248 if !e.IsFile() {
249 continue
250 }
251
252 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
253 contents, err := gr.RawContent(fpath)
254 if err != nil {
255 continue
256 }
257
258 pipeline = append(pipeline, workflow.RawWorkflow{
259 Name: e.Name,
260 Contents: contents,
261 })
262 }
263
264 return pipeline, nil
265}
266
267func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
268 l := log.FromContext(ctx)
269
270 trigger := tangled.Pipeline_PullRequestTriggerData{
271 Action: "create",
272 SourceBranch: sourceBranch,
273 SourceSha: sourceSha,
274 TargetBranch: targetBranch,
275 }
276
277 compiler := workflow.Compiler{
278 Trigger: tangled.Pipeline_TriggerMetadata{
279 Kind: string(workflow.TriggerKindPullRequest),
280 PullRequest: &trigger,
281 Repo: &tangled.Pipeline_TriggerRepo{
282 Knot: h.c.Server.Hostname,
283 RepoDid: &targetRepo.RepoDid,
284 Did: targetRepo.OwnerDid,
285 Repo: &targetRepo.RepoName,
286 DefaultBranch: targetRepo.DefaultBranch,
287 },
288 },
289 }
290
291 l.Info("raw", "raw", rawPipeline)
292 parsed := compiler.Parse(rawPipeline)
293 l.Info("parsed", "parsed", parsed)
294 compiled := compiler.Compile(parsed)
295
296 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
297
298 return compiled
299}
300
301func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
302 raw := json.RawMessage(event.Commit.Record)
303 rkey := event.Commit.RKey
304 did := event.Did
305
306 var record tangled.RepoPull
307 if err := json.Unmarshal(raw, &record); err != nil {
308 return fmt.Errorf("failed to unmarshal record: %w", err)
309 }
310
311 l := log.FromContext(ctx)
312 l = l.With("handler", "processPull")
313 l = l.With("did", did)
314
315 l.Info("validating pull record")
316 targetRepo, err := h.validatePullRecord(ctx, &record)
317 if err != nil {
318 l.Warn("pull record did not validate, skipping...")
319 return err
320 }
321
322 l = l.With("target_repo", record.Target.Repo)
323 l = l.With("target_branch", record.Target.Branch)
324
325 l.Info("fetching latest submission")
326 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
327 if err != nil {
328 return err
329 }
330
331 sha := latestSubmission.SourceRev
332 if sha == "" {
333 return fmt.Errorf("failed to extract source SHA from pull submission")
334 }
335 l = l.With("sha", sha)
336
337 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
338 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
339 if err != nil {
340 return err
341 }
342
343 l.Info("compiling pipeline", "workflow_count", len(pipeline))
344 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
345
346 // do not run empty pipelines
347 if cp.Workflows == nil {
348 l.Info("skipping empty pipeline")
349 return nil
350 }
351
352 l.Info("marshaling pipeline event")
353 eventJson, err := json.Marshal(cp)
354 if err != nil {
355 return fmt.Errorf("failed to marshal pipeline event: %w", err)
356 }
357
358 ev := db.Event{
359 Rkey: TID(),
360 Nsid: tangled.PipelineNSID,
361 EventJson: string(eventJson),
362 }
363
364 l.Info("inserting pipeline event")
365 return h.db.InsertEvent(ev, h.n)
366}
367
368// duplicated from add collaborator
369func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
370 raw := json.RawMessage(event.Commit.Record)
371 did := event.Did
372
373 var record tangled.RepoCollaborator
374 if err := json.Unmarshal(raw, &record); err != nil {
375 return fmt.Errorf("failed to unmarshal record: %w", err)
376 }
377
378 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
379 if err != nil || subjectId.Handle.IsInvalidHandle() {
380 return err
381 }
382
383 var rbacResource string
384 switch {
385 case strings.HasPrefix(record.Repo, "did:"):
386 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo)
387 if lookupErr != nil {
388 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr)
389 }
390 if ownerDid != did {
391 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo)
392 }
393 rbacResource = record.Repo
394
395 case strings.Contains(record.Repo, "/"):
396 // TODO: get rid of this PDS fetch once all repos have DIDs
397 repoAt, parseErr := syntax.ParseATURI(record.Repo)
398 if parseErr != nil {
399 return parseErr
400 }
401
402 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
403 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
404 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
405 }
406
407 xrpcc := xrpc.Client{
408 Host: owner.PDSEndpoint(),
409 }
410
411 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
412 if getErr != nil {
413 return getErr
414 }
415
416 if _, ok := resp.Value.Val.(*tangled.Repo); !ok {
417 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
418 }
419 rkey := repoAt.RecordKey().String()
420 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey)
421 if didErr != nil {
422 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr)
423 }
424 rbacResource = repoDid
425
426 default:
427 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo)
428 }
429
430 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
431 if err != nil {
432 return fmt.Errorf("failed to check permissions: %w", err)
433 }
434 if !ok {
435 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
436 }
437
438 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
439 return err
440 }
441 h.jc.AddDid(subjectId.DID.String())
442
443 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
444 return err
445 }
446
447 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
448}
449
450func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
451 l := log.FromContext(ctx)
452
453 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
454 if err != nil {
455 l.Error("error building endpoint url", "did", did, "error", err.Error())
456 return fmt.Errorf("error building endpoint url: %w", err)
457 }
458
459 resp, err := http.Get(keysEndpoint)
460 if err != nil {
461 l.Error("error getting keys", "did", did, "error", err)
462 return fmt.Errorf("error getting keys: %w", err)
463 }
464 defer resp.Body.Close()
465
466 if resp.StatusCode == http.StatusNotFound {
467 l.Info("no keys found for did", "did", did)
468 return nil
469 }
470
471 plaintext, err := io.ReadAll(resp.Body)
472 if err != nil {
473 l.Error("error reading response body", "error", err)
474 return fmt.Errorf("error reading response body: %w", err)
475 }
476
477 for key := range strings.SplitSeq(string(plaintext), "\n") {
478 if key == "" {
479 continue
480 }
481 pk := db.PublicKey{
482 Did: did,
483 }
484 pk.Key = key
485 if err := h.db.AddPublicKey(pk); err != nil {
486 l.Error("failed to add public key", "error", err)
487 return fmt.Errorf("failed to add public key: %w", err)
488 }
489 }
490 return nil
491}
492
493func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
494 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
495
496 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
497 if rkey == "" {
498 return nil
499 }
500
501 if event.Commit.Operation == jmodels.CommitOperationDelete {
502 return nil
503 }
504
505 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
506 return nil
507 }
508
509 raw := json.RawMessage(event.Commit.Record)
510 var record tangled.Repo
511 if err := json.Unmarshal(raw, &record); err != nil {
512 return fmt.Errorf("failed to unmarshal repo record: %w", err)
513 }
514
515 if record.Knot != h.c.Server.Hostname {
516 return nil
517 }
518 if record.RepoDid == nil || *record.RepoDid == "" {
519 l.Info("skipping repo event without repoDid")
520 return nil
521 }
522 repoDid := *record.RepoDid
523
524 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
525 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
526 return nil
527 }
528
529 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
530 if lookupErr != nil {
531 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
532 return nil
533 }
534 if ownerDid != event.Did {
535 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
536 return nil
537 }
538
539 alias := db.RepoAlias{
540 OwnerDid: event.Did,
541 Rkey: rkey,
542 RepoDid: repoDid,
543 Rev: event.Commit.Rev,
544 }
545 if err := h.db.UpsertRepoAlias(alias); err != nil {
546 l.Warn("failed to upsert repo alias", "err", err)
547 return nil
548 }
549
550 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
551 return nil
552}
553
554func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
555 var err error
556 switch event.Kind {
557 case jmodels.EventKindIdentity:
558 err = h.resolver.InvalidateIdent(ctx, event.Did)
559 case jmodels.EventKindCommit:
560 switch event.Commit.Collection {
561 case tangled.PublicKeyNSID:
562 err = h.processPublicKey(ctx, event)
563 case tangled.KnotMemberNSID:
564 err = h.processKnotMember(ctx, event)
565 case tangled.RepoNSID:
566 err = h.processRepo(ctx, event)
567 case tangled.RepoPullNSID:
568 err = h.processPull(ctx, event)
569 case tangled.RepoCollaboratorNSID:
570 err = h.processCollaborator(ctx, event)
571 }
572 default:
573 return nil
574 }
575
576 if err != nil {
577 args := []any{"kind", event.Kind, "err", err}
578 if event.Kind == jmodels.EventKindCommit {
579 args = append(args, "nsid", event.Commit.Collection)
580 }
581 h.l.Warn("failed to process event, skipping", args...)
582 }
583
584 lastTimeUs := event.TimeUS + 1
585 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
586 h.l.Error("failed to save cursor", "err", saveErr)
587 }
588
589 return nil
590}