Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 jmodels "github.com/bluesky-social/jetstream/pkg/models"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/eventstream"
20 "tangled.org/core/knotserver/db"
21 "tangled.org/core/knotserver/git"
22 knotxrpc "tangled.org/core/knotserver/xrpc"
23 "tangled.org/core/log"
24 "tangled.org/core/tid"
25 "tangled.org/core/workflow"
26)
27
28func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
29 l := log.FromContext(ctx)
30 raw := json.RawMessage(event.Commit.Record)
31 did := event.Did
32
33 var record tangled.PublicKey
34 if err := json.Unmarshal(raw, &record); err != nil {
35 return fmt.Errorf("failed to unmarshal record: %w", err)
36 }
37
38 pk := db.PublicKey{
39 Did: did,
40 PublicKey: record,
41 }
42 if err := h.db.AddPublicKey(pk); err != nil {
43 l.Error("failed to add public key", "error", err)
44 return fmt.Errorf("failed to add public key: %w", err)
45 }
46 l.Info("added public key from firehose", "did", did)
47 return nil
48}
49
50// returns a repo path on disk if present, and error if not
51type targetRepo struct {
52 RepoPath string
53 OwnerDid string
54 RepoName string
55 RepoDid string
56 DefaultBranch string // default branch
57}
58
59func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
60 if record.Target == nil {
61 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
62 }
63
64 l := log.FromContext(ctx).With("handler", "validatePullRecord")
65 l = l.With("target_repo", record.Target.Repo)
66 l = l.With("target_branch", record.Target.Branch)
67
68 if record.Source == nil {
69 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
70 }
71
72 if record.Source.Repo != nil {
73 return nil, fmt.Errorf("ignoring pull record: fork based pull")
74 }
75
76 var repoPath, ownerDid, repoName, repoDid string
77 switch {
78 case strings.HasPrefix(record.Target.Repo, "did:"):
79 repoDid = record.Target.Repo
80 var lookupErr error
81 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
82 if lookupErr != nil {
83 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
84 }
85
86 case strings.Contains(record.Target.Repo, "/"):
87 // TODO: get rid of this PDS fetch once all repos have DIDs
88 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo)
89 if parseErr != nil {
90 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
91 }
92
93 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
94 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
95 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
96 }
97
98 xrpcc := xrpc.Client{
99 Host: ident.PDSEndpoint(),
100 }
101
102 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
103 if getErr != nil {
104 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
105 }
106
107 repo, ok := resp.Value.Val.(*tangled.Repo)
108 if !ok {
109 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
110 }
111
112 if repo.Knot != h.c.Server.Hostname {
113 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
114 }
115
116 ownerDid = ident.DID.String()
117 repoName = repoAt.RecordKey().String()
118
119 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
120 if didErr != nil {
121 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
122 }
123
124 var lookupErr error
125 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
126 if lookupErr != nil {
127 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
128 }
129
130 default:
131 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo)
132 }
133
134 gr, err := git.Open(repoPath, record.Source.Branch)
135 if err != nil {
136 return nil, fmt.Errorf("failed to open git repository: %w", err)
137 }
138
139 defaultBranch, _ := gr.FindMainBranch()
140
141 return &targetRepo{
142 RepoPath: repoPath,
143 OwnerDid: ownerDid,
144 RepoName: repoName,
145 RepoDid: repoDid,
146 DefaultBranch: defaultBranch,
147 }, nil
148}
149
150func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
151 // resolve the PR owner's identity to fetch the blob from their PDS
152 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
153 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
154 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
155 }
156
157 if len(record.Rounds) == 0 {
158 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
159 }
160
161 roundNumber := len(record.Rounds) - 1
162 round := record.Rounds[roundNumber]
163
164 // fetch the blob from the PR owner's PDS
165 prOwnerPds := prOwnerIdent.PDSEndpoint()
166 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
167 if err != nil {
168 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
169 }
170 q := blobUrl.Query()
171 q.Set("cid", round.PatchBlob.Ref.String())
172 q.Set("did", did)
173 blobUrl.RawQuery = q.Encode()
174
175 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
176 if err != nil {
177 return nil, fmt.Errorf("failed to create blob request: %w", err)
178 }
179 req.Header.Set("Content-Type", "application/json")
180
181 blobResp, err := http.DefaultClient.Do(req)
182 if err != nil {
183 return nil, fmt.Errorf("failed to fetch blob: %w", err)
184 }
185 defer blobResp.Body.Close()
186
187 blob := io.ReadCloser(blobResp.Body)
188 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
189 if err != nil {
190 return nil, fmt.Errorf("failed to parse submission: %w", err)
191 }
192
193 return latestSubmission, nil
194}
195
196func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
197 gr, err := git.Open(repoPath, sha)
198 if err != nil {
199 return nil, fmt.Errorf("failed to open git repository: %w", err)
200 }
201
202 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
203 if err != nil {
204 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
205 }
206
207 var pipeline workflow.RawPipeline
208 for _, e := range workflowDir {
209 if !e.IsFile() {
210 continue
211 }
212
213 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
214 contents, err := gr.RawContent(fpath)
215 if err != nil {
216 continue
217 }
218
219 pipeline = append(pipeline, workflow.RawWorkflow{
220 Name: e.Name,
221 Contents: contents,
222 })
223 }
224
225 return pipeline, nil
226}
227
228func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
229 l := log.FromContext(ctx)
230
231 trigger := tangled.Pipeline_PullRequestTriggerData{
232 Action: "create",
233 SourceBranch: sourceBranch,
234 SourceSha: sourceSha,
235 TargetBranch: targetBranch,
236 }
237
238 compiler := workflow.Compiler{
239 Trigger: tangled.Pipeline_TriggerMetadata{
240 Kind: string(workflow.TriggerKindPullRequest),
241 PullRequest: &trigger,
242 Repo: &tangled.Pipeline_TriggerRepo{
243 Knot: h.c.Server.Hostname,
244 RepoDid: &targetRepo.RepoDid,
245 Did: targetRepo.OwnerDid,
246 Repo: &targetRepo.RepoName,
247 DefaultBranch: targetRepo.DefaultBranch,
248 },
249 },
250 }
251
252 l.Info("raw", "raw", rawPipeline)
253 parsed := compiler.Parse(rawPipeline)
254 l.Info("parsed", "parsed", parsed)
255 compiled := compiler.Compile(parsed)
256
257 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
258
259 return compiled
260}
261
262func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
263 raw := json.RawMessage(event.Commit.Record)
264 rkey := event.Commit.RKey
265 did := event.Did
266
267 var record tangled.RepoPull
268 if err := json.Unmarshal(raw, &record); err != nil {
269 return fmt.Errorf("failed to unmarshal record: %w", err)
270 }
271
272 l := log.FromContext(ctx)
273 l = l.With("handler", "processPull")
274 l = l.With("did", did)
275
276 l.Info("validating pull record")
277 targetRepo, err := h.validatePullRecord(ctx, &record)
278 if err != nil {
279 l.Warn("pull record did not validate, skipping...")
280 return err
281 }
282
283 l = l.With("target_repo", record.Target.Repo)
284 l = l.With("target_branch", record.Target.Branch)
285
286 l.Info("fetching latest submission")
287 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
288 if err != nil {
289 return err
290 }
291
292 sha := latestSubmission.SourceRev
293 if sha == "" {
294 return fmt.Errorf("failed to extract source SHA from pull submission")
295 }
296 l = l.With("sha", sha)
297
298 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
299 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
300 if err != nil {
301 return err
302 }
303
304 l.Info("compiling pipeline", "workflow_count", len(pipeline))
305 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
306
307 // do not run empty pipelines
308 if cp.Workflows == nil {
309 l.Info("skipping empty pipeline")
310 return nil
311 }
312
313 l.Info("marshaling pipeline event")
314 eventJson, err := json.Marshal(cp)
315 if err != nil {
316 return fmt.Errorf("failed to marshal pipeline event: %w", err)
317 }
318
319 ev := eventstream.Event{
320 Rkey: tid.TID(),
321 Nsid: tangled.PipelineNSID,
322 EventJson: eventJson,
323 }
324
325 l.Info("inserting pipeline event")
326 return h.db.InsertEvent(ev, h.n)
327}
328
329func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
330 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
331
332 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
333 if rkey == "" {
334 return nil
335 }
336
337 if event.Commit.Operation == jmodels.CommitOperationDelete {
338 return nil
339 }
340
341 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
342 return nil
343 }
344
345 raw := json.RawMessage(event.Commit.Record)
346 var record tangled.Repo
347 if err := json.Unmarshal(raw, &record); err != nil {
348 return fmt.Errorf("failed to unmarshal repo record: %w", err)
349 }
350
351 if record.Knot != h.c.Server.Hostname {
352 return nil
353 }
354 if record.RepoDid == nil || *record.RepoDid == "" {
355 l.Info("skipping repo event without repoDid")
356 return nil
357 }
358 repoDid := *record.RepoDid
359
360 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
361 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
362 return nil
363 }
364
365 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
366 if lookupErr != nil {
367 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
368 return nil
369 }
370 if ownerDid != event.Did {
371 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
372 return nil
373 }
374
375 alias := db.RepoAlias{
376 OwnerDid: event.Did,
377 Rkey: rkey,
378 RepoDid: repoDid,
379 Rev: event.Commit.Rev,
380 }
381 if err := h.db.UpsertRepoAlias(alias); err != nil {
382 l.Warn("failed to upsert repo alias", "err", err)
383 return nil
384 }
385
386 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
387 return nil
388}
389
390func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
391 var err error
392 switch event.Kind {
393 case jmodels.EventKindIdentity:
394 err = h.resolver.InvalidateIdent(ctx, event.Did)
395 case jmodels.EventKindCommit:
396 switch event.Commit.Collection {
397 case tangled.PublicKeyNSID:
398 err = h.processPublicKey(ctx, event)
399 case tangled.RepoNSID:
400 err = h.processRepo(ctx, event)
401 case tangled.RepoPullNSID:
402 err = h.processPull(ctx, event)
403 }
404 default:
405 return nil
406 }
407
408 if err != nil {
409 args := []any{"kind", event.Kind, "err", err}
410 if event.Kind == jmodels.EventKindCommit {
411 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey)
412 }
413 h.l.Warn("failed to process event, skipping", args...)
414 }
415
416 lastTimeUs := event.TimeUS + 1
417 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
418 h.l.Error("failed to save cursor", "err", saveErr)
419 }
420
421 return nil
422}