Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/eventstream"
20 "tangled.org/core/knotserver/db"
21 "tangled.org/core/knotserver/git"
22 knotxrpc "tangled.org/core/knotserver/xrpc"
23 "tangled.org/core/log"
24 "tangled.org/core/tid"
25 "tangled.org/core/workflow"
26)
27
28func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
29 l := log.FromContext(ctx).With("handler", "processPublicKey", "did", event.Did, "rkey", event.Commit.RKey)
30 did := syntax.DID(event.Did)
31 rkey := syntax.RecordKey(event.Commit.RKey)
32
33 switch event.Commit.Operation {
34 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
35 var record tangled.PublicKey
36 if err := json.Unmarshal(json.RawMessage(event.Commit.Record), &record); err != nil {
37 return fmt.Errorf("failed to unmarshal record: %w", err)
38 }
39
40 pk := db.PublicKey{
41 Did: did,
42 Rkey: rkey,
43 PublicKey: record,
44 }
45 if err := h.db.UpsertPublicKey(pk); err != nil {
46 return fmt.Errorf("failed to upsert public key: %w", err)
47 }
48 l.Info("upserted public key from firehose")
49 case jmodels.CommitOperationDelete:
50 if err := h.db.DeletePublicKeyByRkey(did, rkey); err != nil {
51 return fmt.Errorf("failed to delete public key: %w", err)
52 }
53 l.Info("deleted public key from firehose")
54 }
55
56 return nil
57}
58
59// returns a repo path on disk if present, and error if not
60type targetRepo struct {
61 RepoPath string
62 OwnerDid string
63 RepoName string
64 RepoDid string
65 DefaultBranch string // default branch
66}
67
68func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
69 if record.Target == nil {
70 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
71 }
72
73 l := log.FromContext(ctx).With("handler", "validatePullRecord")
74 l = l.With("target_repo", record.Target.Repo)
75 l = l.With("target_branch", record.Target.Branch)
76
77 if record.Source == nil {
78 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
79 }
80
81 if record.Source.Repo != nil {
82 return nil, fmt.Errorf("ignoring pull record: fork based pull")
83 }
84
85 var repoPath, ownerDid, repoName, repoDid string
86 switch {
87 case strings.HasPrefix(record.Target.Repo, "did:"):
88 repoDid = record.Target.Repo
89 var lookupErr error
90 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
91 if lookupErr != nil {
92 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
93 }
94
95 case strings.Contains(record.Target.Repo, "/"):
96 // TODO: get rid of this PDS fetch once all repos have DIDs
97 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo)
98 if parseErr != nil {
99 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
100 }
101
102 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
103 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
104 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
105 }
106
107 xrpcc := xrpc.Client{
108 Host: ident.PDSEndpoint(),
109 }
110
111 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
112 if getErr != nil {
113 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
114 }
115
116 repo, ok := resp.Value.Val.(*tangled.Repo)
117 if !ok {
118 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
119 }
120
121 if repo.Knot != h.c.Server.Hostname {
122 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
123 }
124
125 ownerDid = ident.DID.String()
126 repoName = repoAt.RecordKey().String()
127
128 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
129 if didErr != nil {
130 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
131 }
132
133 var lookupErr error
134 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
135 if lookupErr != nil {
136 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
137 }
138
139 default:
140 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo)
141 }
142
143 gr, err := git.Open(repoPath, record.Source.Branch)
144 if err != nil {
145 return nil, fmt.Errorf("failed to open git repository: %w", err)
146 }
147
148 defaultBranch, _ := gr.FindMainBranch()
149
150 return &targetRepo{
151 RepoPath: repoPath,
152 OwnerDid: ownerDid,
153 RepoName: repoName,
154 RepoDid: repoDid,
155 DefaultBranch: defaultBranch,
156 }, nil
157}
158
159func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
160 // resolve the PR owner's identity to fetch the blob from their PDS
161 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
162 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
163 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
164 }
165
166 if len(record.Rounds) == 0 {
167 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
168 }
169
170 roundNumber := len(record.Rounds) - 1
171 round := record.Rounds[roundNumber]
172
173 // fetch the blob from the PR owner's PDS
174 prOwnerPds := prOwnerIdent.PDSEndpoint()
175 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
176 if err != nil {
177 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
178 }
179 q := blobUrl.Query()
180 q.Set("cid", round.PatchBlob.Ref.String())
181 q.Set("did", did)
182 blobUrl.RawQuery = q.Encode()
183
184 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
185 if err != nil {
186 return nil, fmt.Errorf("failed to create blob request: %w", err)
187 }
188 req.Header.Set("Content-Type", "application/json")
189
190 blobResp, err := http.DefaultClient.Do(req)
191 if err != nil {
192 return nil, fmt.Errorf("failed to fetch blob: %w", err)
193 }
194 defer blobResp.Body.Close()
195
196 blob := io.ReadCloser(blobResp.Body)
197 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
198 if err != nil {
199 return nil, fmt.Errorf("failed to parse submission: %w", err)
200 }
201
202 return latestSubmission, nil
203}
204
205func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
206 gr, err := git.Open(repoPath, sha)
207 if err != nil {
208 return nil, fmt.Errorf("failed to open git repository: %w", err)
209 }
210
211 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
212 if err != nil {
213 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
214 }
215
216 var pipeline workflow.RawPipeline
217 for _, e := range workflowDir {
218 if !e.IsFile() {
219 continue
220 }
221
222 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
223 contents, err := gr.RawContent(fpath)
224 if err != nil {
225 continue
226 }
227
228 pipeline = append(pipeline, workflow.RawWorkflow{
229 Name: e.Name,
230 Contents: contents,
231 })
232 }
233
234 return pipeline, nil
235}
236
237func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
238 l := log.FromContext(ctx)
239
240 trigger := tangled.Pipeline_PullRequestTriggerData{
241 Action: "create",
242 SourceBranch: sourceBranch,
243 SourceSha: sourceSha,
244 TargetBranch: targetBranch,
245 }
246
247 compiler := workflow.Compiler{
248 Trigger: tangled.Pipeline_TriggerMetadata{
249 Kind: string(workflow.TriggerKindPullRequest),
250 PullRequest: &trigger,
251 Repo: &tangled.Pipeline_TriggerRepo{
252 Knot: h.c.Server.Hostname,
253 RepoDid: &targetRepo.RepoDid,
254 Did: targetRepo.OwnerDid,
255 Repo: &targetRepo.RepoName,
256 DefaultBranch: targetRepo.DefaultBranch,
257 },
258 },
259 }
260
261 l.Info("raw", "raw", rawPipeline)
262 parsed := compiler.Parse(rawPipeline)
263 l.Info("parsed", "parsed", parsed)
264 compiled := compiler.Compile(parsed)
265
266 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
267
268 return compiled
269}
270
271func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
272 raw := json.RawMessage(event.Commit.Record)
273 rkey := event.Commit.RKey
274 did := event.Did
275
276 var record tangled.RepoPull
277 if err := json.Unmarshal(raw, &record); err != nil {
278 return fmt.Errorf("failed to unmarshal record: %w", err)
279 }
280
281 l := log.FromContext(ctx)
282 l = l.With("handler", "processPull")
283 l = l.With("did", did)
284
285 l.Info("validating pull record")
286 targetRepo, err := h.validatePullRecord(ctx, &record)
287 if err != nil {
288 l.Warn("pull record did not validate, skipping...")
289 return err
290 }
291
292 l = l.With("target_repo", record.Target.Repo)
293 l = l.With("target_branch", record.Target.Branch)
294
295 l.Info("fetching latest submission")
296 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
297 if err != nil {
298 return err
299 }
300
301 sha := latestSubmission.SourceRev
302 if sha == "" {
303 return fmt.Errorf("failed to extract source SHA from pull submission")
304 }
305 l = l.With("sha", sha)
306
307 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
308 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
309 if err != nil {
310 return err
311 }
312
313 l.Info("compiling pipeline", "workflow_count", len(pipeline))
314 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
315
316 // do not run empty pipelines
317 if cp.Workflows == nil {
318 l.Info("skipping empty pipeline")
319 return nil
320 }
321
322 l.Info("marshaling pipeline event")
323 eventJson, err := json.Marshal(cp)
324 if err != nil {
325 return fmt.Errorf("failed to marshal pipeline event: %w", err)
326 }
327
328 ev := eventstream.Event{
329 Rkey: tid.TID(),
330 Nsid: tangled.PipelineNSID,
331 EventJson: eventJson,
332 }
333
334 l.Info("inserting pipeline event")
335 return h.db.InsertEvent(ev, h.n)
336}
337
338func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
339 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
340
341 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
342 if rkey == "" {
343 return nil
344 }
345
346 if event.Commit.Operation == jmodels.CommitOperationDelete {
347 return nil
348 }
349
350 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
351 return nil
352 }
353
354 raw := json.RawMessage(event.Commit.Record)
355 var record tangled.Repo
356 if err := json.Unmarshal(raw, &record); err != nil {
357 return fmt.Errorf("failed to unmarshal repo record: %w", err)
358 }
359
360 if record.Knot != h.c.Server.Hostname {
361 return nil
362 }
363 if record.RepoDid == nil || *record.RepoDid == "" {
364 l.Info("skipping repo event without repoDid")
365 return nil
366 }
367 repoDid := *record.RepoDid
368
369 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
370 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
371 return nil
372 }
373
374 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
375 if lookupErr != nil {
376 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
377 return nil
378 }
379 if ownerDid != event.Did {
380 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
381 return nil
382 }
383
384 alias := db.RepoAlias{
385 OwnerDid: event.Did,
386 Rkey: rkey,
387 RepoDid: repoDid,
388 Rev: event.Commit.Rev,
389 }
390 if err := h.db.UpsertRepoAlias(alias); err != nil {
391 l.Warn("failed to upsert repo alias", "err", err)
392 return nil
393 }
394
395 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
396 return nil
397}
398
399func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
400 var err error
401 switch event.Kind {
402 case jmodels.EventKindIdentity:
403 err = h.resolver.InvalidateIdent(ctx, event.Did)
404 case jmodels.EventKindCommit:
405 switch event.Commit.Collection {
406 case tangled.PublicKeyNSID:
407 err = h.processPublicKey(ctx, event)
408 case tangled.RepoNSID:
409 err = h.processRepo(ctx, event)
410 case tangled.RepoPullNSID:
411 err = h.processPull(ctx, event)
412 }
413 default:
414 return nil
415 }
416
417 if err != nil {
418 args := []any{"kind", event.Kind, "err", err}
419 if event.Kind == jmodels.EventKindCommit {
420 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey)
421 }
422 h.l.Warn("failed to process event, skipping", args...)
423 }
424
425 lastTimeUs := event.TimeUS + 1
426 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
427 h.l.Error("failed to save cursor", "err", saveErr)
428 }
429
430 return nil
431}