Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
17 jmodels "github.com/bluesky-social/jetstream/pkg/models"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/knotserver/db"
21 "tangled.org/core/knotserver/git"
22 knotxrpc "tangled.org/core/knotserver/xrpc"
23 "tangled.org/core/log"
24 "tangled.org/core/rbac"
25 "tangled.org/core/tid"
26 "tangled.org/core/workflow"
27)
28
29func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
30 l := log.FromContext(ctx)
31 raw := json.RawMessage(event.Commit.Record)
32 did := event.Did
33
34 var record tangled.PublicKey
35 if err := json.Unmarshal(raw, &record); err != nil {
36 return fmt.Errorf("failed to unmarshal record: %w", err)
37 }
38
39 pk := db.PublicKey{
40 Did: did,
41 PublicKey: record,
42 }
43 if err := h.db.AddPublicKey(pk); err != nil {
44 l.Error("failed to add public key", "error", err)
45 return fmt.Errorf("failed to add public key: %w", err)
46 }
47 l.Info("added public key from firehose", "did", did)
48 return nil
49}
50
51func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
52 l := log.FromContext(ctx)
53 raw := json.RawMessage(event.Commit.Record)
54 did := event.Did
55
56 var record tangled.KnotMember
57 if err := json.Unmarshal(raw, &record); err != nil {
58 return fmt.Errorf("failed to unmarshal record: %w", err)
59 }
60
61 if record.Domain != h.c.Server.Hostname {
62 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
63 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
64 }
65
66 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
67 if err != nil || !ok {
68 l.Error("failed to add member", "did", did)
69 return fmt.Errorf("failed to enforce permissions: %w", err)
70 }
71
72 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
73 l.Error("failed to add member", "error", err)
74 return fmt.Errorf("failed to add member: %w", err)
75 }
76 l.Info("added member from firehose", "member", record.Subject)
77
78 if err := h.db.AddDid(record.Subject); err != nil {
79 l.Error("failed to add did", "error", err)
80 return fmt.Errorf("failed to add did: %w", err)
81 }
82 h.jc.AddDid(record.Subject)
83
84 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
85 return fmt.Errorf("failed to fetch and add keys: %w", err)
86 }
87
88 return nil
89}
90
91// returns a repo path on disk if present, and error if not
92type targetRepo struct {
93 RepoPath string
94 OwnerDid string
95 RepoName string
96 RepoDid string
97 DefaultBranch string // default branch
98}
99
100func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
101 if record.Target == nil {
102 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
103 }
104
105 l := log.FromContext(ctx).With("handler", "validatePullRecord")
106 l = l.With("target_repo", record.Target.Repo)
107 l = l.With("target_branch", record.Target.Branch)
108
109 if record.Source == nil {
110 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
111 }
112
113 if record.Source.Repo != nil {
114 return nil, fmt.Errorf("ignoring pull record: fork based pull")
115 }
116
117 var repoPath, ownerDid, repoName, repoDid string
118 switch {
119 case strings.HasPrefix(record.Target.Repo, "did:"):
120 repoDid = record.Target.Repo
121 var lookupErr error
122 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
123 if lookupErr != nil {
124 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
125 }
126
127 case strings.Contains(record.Target.Repo, "/"):
128 // TODO: get rid of this PDS fetch once all repos have DIDs
129 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo)
130 if parseErr != nil {
131 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
132 }
133
134 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
135 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
136 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
137 }
138
139 xrpcc := xrpc.Client{
140 Host: ident.PDSEndpoint(),
141 }
142
143 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
144 if getErr != nil {
145 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
146 }
147
148 repo, ok := resp.Value.Val.(*tangled.Repo)
149 if !ok {
150 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
151 }
152
153 if repo.Knot != h.c.Server.Hostname {
154 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
155 }
156
157 ownerDid = ident.DID.String()
158 repoName = repoAt.RecordKey().String()
159
160 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
161 if didErr != nil {
162 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
163 }
164
165 var lookupErr error
166 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
167 if lookupErr != nil {
168 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
169 }
170
171 default:
172 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo)
173 }
174
175 gr, err := git.Open(repoPath, record.Source.Branch)
176 if err != nil {
177 return nil, fmt.Errorf("failed to open git repository: %w", err)
178 }
179
180 defaultBranch, _ := gr.FindMainBranch()
181
182 return &targetRepo{
183 RepoPath: repoPath,
184 OwnerDid: ownerDid,
185 RepoName: repoName,
186 RepoDid: repoDid,
187 DefaultBranch: defaultBranch,
188 }, nil
189}
190
191func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
192 // resolve the PR owner's identity to fetch the blob from their PDS
193 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
194 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
195 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
196 }
197
198 if len(record.Rounds) == 0 {
199 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
200 }
201
202 roundNumber := len(record.Rounds) - 1
203 round := record.Rounds[roundNumber]
204
205 // fetch the blob from the PR owner's PDS
206 prOwnerPds := prOwnerIdent.PDSEndpoint()
207 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
208 if err != nil {
209 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
210 }
211 q := blobUrl.Query()
212 q.Set("cid", round.PatchBlob.Ref.String())
213 q.Set("did", did)
214 blobUrl.RawQuery = q.Encode()
215
216 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
217 if err != nil {
218 return nil, fmt.Errorf("failed to create blob request: %w", err)
219 }
220 req.Header.Set("Content-Type", "application/json")
221
222 blobResp, err := http.DefaultClient.Do(req)
223 if err != nil {
224 return nil, fmt.Errorf("failed to fetch blob: %w", err)
225 }
226 defer blobResp.Body.Close()
227
228 blob := io.ReadCloser(blobResp.Body)
229 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
230 if err != nil {
231 return nil, fmt.Errorf("failed to parse submission: %w", err)
232 }
233
234 return latestSubmission, nil
235}
236
237func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
238 gr, err := git.Open(repoPath, sha)
239 if err != nil {
240 return nil, fmt.Errorf("failed to open git repository: %w", err)
241 }
242
243 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
244 if err != nil {
245 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
246 }
247
248 var pipeline workflow.RawPipeline
249 for _, e := range workflowDir {
250 if !e.IsFile() {
251 continue
252 }
253
254 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
255 contents, err := gr.RawContent(fpath)
256 if err != nil {
257 continue
258 }
259
260 pipeline = append(pipeline, workflow.RawWorkflow{
261 Name: e.Name,
262 Contents: contents,
263 })
264 }
265
266 return pipeline, nil
267}
268
269func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
270 l := log.FromContext(ctx)
271
272 trigger := tangled.Pipeline_PullRequestTriggerData{
273 Action: "create",
274 SourceBranch: sourceBranch,
275 SourceSha: sourceSha,
276 TargetBranch: targetBranch,
277 }
278
279 compiler := workflow.Compiler{
280 Trigger: tangled.Pipeline_TriggerMetadata{
281 Kind: string(workflow.TriggerKindPullRequest),
282 PullRequest: &trigger,
283 Repo: &tangled.Pipeline_TriggerRepo{
284 Knot: h.c.Server.Hostname,
285 RepoDid: &targetRepo.RepoDid,
286 Did: targetRepo.OwnerDid,
287 Repo: &targetRepo.RepoName,
288 DefaultBranch: targetRepo.DefaultBranch,
289 },
290 },
291 }
292
293 l.Info("raw", "raw", rawPipeline)
294 parsed := compiler.Parse(rawPipeline)
295 l.Info("parsed", "parsed", parsed)
296 compiled := compiler.Compile(parsed)
297
298 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
299
300 return compiled
301}
302
303func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
304 raw := json.RawMessage(event.Commit.Record)
305 rkey := event.Commit.RKey
306 did := event.Did
307
308 var record tangled.RepoPull
309 if err := json.Unmarshal(raw, &record); err != nil {
310 return fmt.Errorf("failed to unmarshal record: %w", err)
311 }
312
313 l := log.FromContext(ctx)
314 l = l.With("handler", "processPull")
315 l = l.With("did", did)
316
317 l.Info("validating pull record")
318 targetRepo, err := h.validatePullRecord(ctx, &record)
319 if err != nil {
320 l.Warn("pull record did not validate, skipping...")
321 return err
322 }
323
324 l = l.With("target_repo", record.Target.Repo)
325 l = l.With("target_branch", record.Target.Branch)
326
327 l.Info("fetching latest submission")
328 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
329 if err != nil {
330 return err
331 }
332
333 sha := latestSubmission.SourceRev
334 if sha == "" {
335 return fmt.Errorf("failed to extract source SHA from pull submission")
336 }
337 l = l.With("sha", sha)
338
339 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
340 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
341 if err != nil {
342 return err
343 }
344
345 l.Info("compiling pipeline", "workflow_count", len(pipeline))
346 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
347
348 // do not run empty pipelines
349 if cp.Workflows == nil {
350 l.Info("skipping empty pipeline")
351 return nil
352 }
353
354 l.Info("marshaling pipeline event")
355 eventJson, err := json.Marshal(cp)
356 if err != nil {
357 return fmt.Errorf("failed to marshal pipeline event: %w", err)
358 }
359
360 ev := db.Event{
361 Rkey: tid.TID(),
362 Nsid: tangled.PipelineNSID,
363 EventJson: string(eventJson),
364 }
365
366 l.Info("inserting pipeline event")
367 return h.db.InsertEvent(ev, h.n)
368}
369
370// duplicated from add collaborator
371func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
372 raw := json.RawMessage(event.Commit.Record)
373 did := event.Did
374
375 var record tangled.RepoCollaborator
376 if err := json.Unmarshal(raw, &record); err != nil {
377 return fmt.Errorf("failed to unmarshal record: %w", err)
378 }
379
380 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
381 if err != nil || subjectId.Handle.IsInvalidHandle() {
382 return err
383 }
384
385 var rbacResource string
386 switch {
387 case strings.HasPrefix(record.Repo, "did:"):
388 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo)
389 if lookupErr != nil {
390 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr)
391 }
392 if ownerDid != did {
393 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo)
394 }
395 rbacResource = record.Repo
396
397 case strings.Contains(record.Repo, "/"):
398 // TODO: get rid of this PDS fetch once all repos have DIDs
399 repoAt, parseErr := syntax.ParseATURI(record.Repo)
400 if parseErr != nil {
401 return parseErr
402 }
403
404 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
405 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
406 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
407 }
408
409 xrpcc := xrpc.Client{
410 Host: owner.PDSEndpoint(),
411 }
412
413 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
414 if getErr != nil {
415 return getErr
416 }
417
418 if _, ok := resp.Value.Val.(*tangled.Repo); !ok {
419 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
420 }
421 rkey := repoAt.RecordKey().String()
422 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey)
423 if didErr != nil {
424 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr)
425 }
426 rbacResource = repoDid
427
428 default:
429 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo)
430 }
431
432 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
433 if err != nil {
434 return fmt.Errorf("failed to check permissions: %w", err)
435 }
436 if !ok {
437 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
438 }
439
440 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
441 return err
442 }
443 h.jc.AddDid(subjectId.DID.String())
444
445 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
446 return err
447 }
448
449 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
450}
451
452func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
453 l := log.FromContext(ctx)
454
455 id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did))
456 if err != nil {
457 return fmt.Errorf("lookup did to fetch keys: %w", err)
458 }
459
460 serviceEndpoint, ok := id.Services["atproto_pds"]
461 if !ok {
462 l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did)
463 return nil
464 }
465
466 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL}
467 resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false)
468 if err != nil {
469 return fmt.Errorf("fetching public keys for did: %w", err)
470 }
471
472 for _, record := range resp.Records {
473 if record == nil {
474 continue
475 }
476 key := record.Value.Val.(*tangled.PublicKey)
477 if key == nil {
478 continue
479 }
480 pk := db.PublicKey{
481 Did: did,
482 PublicKey: *key,
483 }
484 err = h.db.AddPublicKey(pk)
485 if err != nil {
486 return fmt.Errorf("adding public key to db: %w", err)
487 }
488 }
489 return nil
490}
491
492func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
493 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
494
495 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
496 if rkey == "" {
497 return nil
498 }
499
500 if event.Commit.Operation == jmodels.CommitOperationDelete {
501 return nil
502 }
503
504 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
505 return nil
506 }
507
508 raw := json.RawMessage(event.Commit.Record)
509 var record tangled.Repo
510 if err := json.Unmarshal(raw, &record); err != nil {
511 return fmt.Errorf("failed to unmarshal repo record: %w", err)
512 }
513
514 if record.Knot != h.c.Server.Hostname {
515 return nil
516 }
517 if record.RepoDid == nil || *record.RepoDid == "" {
518 l.Info("skipping repo event without repoDid")
519 return nil
520 }
521 repoDid := *record.RepoDid
522
523 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
524 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
525 return nil
526 }
527
528 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
529 if lookupErr != nil {
530 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
531 return nil
532 }
533 if ownerDid != event.Did {
534 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
535 return nil
536 }
537
538 alias := db.RepoAlias{
539 OwnerDid: event.Did,
540 Rkey: rkey,
541 RepoDid: repoDid,
542 Rev: event.Commit.Rev,
543 }
544 if err := h.db.UpsertRepoAlias(alias); err != nil {
545 l.Warn("failed to upsert repo alias", "err", err)
546 return nil
547 }
548
549 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
550 return nil
551}
552
553func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
554 var err error
555 switch event.Kind {
556 case jmodels.EventKindIdentity:
557 err = h.resolver.InvalidateIdent(ctx, event.Did)
558 case jmodels.EventKindCommit:
559 switch event.Commit.Collection {
560 case tangled.PublicKeyNSID:
561 err = h.processPublicKey(ctx, event)
562 case tangled.KnotMemberNSID:
563 err = h.processKnotMember(ctx, event)
564 case tangled.RepoNSID:
565 err = h.processRepo(ctx, event)
566 case tangled.RepoPullNSID:
567 err = h.processPull(ctx, event)
568 case tangled.RepoCollaboratorNSID:
569 err = h.processCollaborator(ctx, event)
570 }
571 default:
572 return nil
573 }
574
575 if err != nil {
576 args := []any{"kind", event.Kind, "err", err}
577 if event.Kind == jmodels.EventKindCommit {
578 args = append(args, "nsid", event.Commit.Collection)
579 }
580 h.l.Warn("failed to process event, skipping", args...)
581 }
582
583 lastTimeUs := event.TimeUS + 1
584 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
585 h.l.Error("failed to save cursor", "err", saveErr)
586 }
587
588 return nil
589}