Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver,spindle: remove all pipeline logics from knotserver

`sh.tangled.pipeline` events are now completely generated & streamed
from spindle

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 20, 2026, 7:50 PM +0900) commit b656f9e7 parent 520d1ab2 change-id pxqzlqzl
+12 -457
-284
knotserver/ingester.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "fmt" 7 - "io" 8 - "net/http" 9 - "net/url" 10 - "path/filepath" 11 7 "strings" 12 8 13 - comatproto "github.com/bluesky-social/indigo/api/atproto" 14 - "github.com/bluesky-social/indigo/atproto/syntax" 15 - "github.com/bluesky-social/indigo/xrpc" 16 9 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 10 "tangled.org/core/api/tangled" 18 - "tangled.org/core/appview/models" 19 - "tangled.org/core/eventstream" 20 11 "tangled.org/core/knotserver/db" 21 - "tangled.org/core/knotserver/git" 22 12 knotxrpc "tangled.org/core/knotserver/xrpc" 23 13 "tangled.org/core/log" 24 - "tangled.org/core/tid" 25 - "tangled.org/core/workflow" 26 14 ) 27 15 28 16 func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { ··· 56 44 DefaultBranch string // default branch 57 45 } 58 46 59 - func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 60 - if record.Target == nil { 61 - return nil, fmt.Errorf("ignoring pull record: target repo is nil") 62 - } 63 - 64 - l := log.FromContext(ctx).With("handler", "validatePullRecord") 65 - l = l.With("target_repo", record.Target.Repo) 66 - l = l.With("target_branch", record.Target.Branch) 67 - 68 - if record.Source == nil { 69 - return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 70 - } 71 - 72 - if record.Source.Repo != nil { 73 - return nil, fmt.Errorf("ignoring pull record: fork based pull") 74 - } 75 - 76 - var repoPath, ownerDid, repoName, repoDid string 77 - switch { 78 - case strings.HasPrefix(record.Target.Repo, "did:"): 79 - repoDid = record.Target.Repo 80 - var lookupErr error 81 - repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 82 - if lookupErr != nil { 83 - return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 84 - } 85 - 86 - case strings.Contains(record.Target.Repo, "/"): 87 - // TODO: get rid of this PDS fetch once all repos have DIDs 88 - repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 89 - if parseErr != nil { 90 - return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 91 - } 92 - 93 - ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 94 - if resolveErr != nil || ident.Handle.IsInvalidHandle() { 95 - return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 96 - } 97 - 98 - xrpcc := xrpc.Client{ 99 - Host: ident.PDSEndpoint(), 100 - } 101 - 102 - resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 103 - if getErr != nil { 104 - return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 105 - } 106 - 107 - repo, ok := resp.Value.Val.(*tangled.Repo) 108 - if !ok { 109 - return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 110 - } 111 - 112 - if repo.Knot != h.c.Server.Hostname { 113 - return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 114 - } 115 - 116 - ownerDid = ident.DID.String() 117 - repoName = repoAt.RecordKey().String() 118 - 119 - repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 120 - if didErr != nil { 121 - return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 122 - } 123 - 124 - var lookupErr error 125 - repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 126 - if lookupErr != nil { 127 - return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 128 - } 129 - 130 - default: 131 - return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 132 - } 133 - 134 - gr, err := git.Open(repoPath, record.Source.Branch) 135 - if err != nil { 136 - return nil, fmt.Errorf("failed to open git repository: %w", err) 137 - } 138 - 139 - defaultBranch, _ := gr.FindMainBranch() 140 - 141 - return &targetRepo{ 142 - RepoPath: repoPath, 143 - OwnerDid: ownerDid, 144 - RepoName: repoName, 145 - RepoDid: repoDid, 146 - DefaultBranch: defaultBranch, 147 - }, nil 148 - } 149 - 150 - func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 151 - // resolve the PR owner's identity to fetch the blob from their PDS 152 - prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 153 - if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 154 - return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 155 - } 156 - 157 - if len(record.Rounds) == 0 { 158 - return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 159 - } 160 - 161 - roundNumber := len(record.Rounds) - 1 162 - round := record.Rounds[roundNumber] 163 - 164 - // fetch the blob from the PR owner's PDS 165 - prOwnerPds := prOwnerIdent.PDSEndpoint() 166 - blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 167 - if err != nil { 168 - return nil, fmt.Errorf("failed to construct blob URL: %w", err) 169 - } 170 - q := blobUrl.Query() 171 - q.Set("cid", round.PatchBlob.Ref.String()) 172 - q.Set("did", did) 173 - blobUrl.RawQuery = q.Encode() 174 - 175 - req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 176 - if err != nil { 177 - return nil, fmt.Errorf("failed to create blob request: %w", err) 178 - } 179 - req.Header.Set("Content-Type", "application/json") 180 - 181 - blobResp, err := http.DefaultClient.Do(req) 182 - if err != nil { 183 - return nil, fmt.Errorf("failed to fetch blob: %w", err) 184 - } 185 - defer blobResp.Body.Close() 186 - 187 - blob := io.ReadCloser(blobResp.Body) 188 - latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 189 - if err != nil { 190 - return nil, fmt.Errorf("failed to parse submission: %w", err) 191 - } 192 - 193 - return latestSubmission, nil 194 - } 195 - 196 - func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 197 - gr, err := git.Open(repoPath, sha) 198 - if err != nil { 199 - return nil, fmt.Errorf("failed to open git repository: %w", err) 200 - } 201 - 202 - workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 203 - if err != nil { 204 - return nil, fmt.Errorf("failed to open workflow directory: %w", err) 205 - } 206 - 207 - var pipeline workflow.RawPipeline 208 - for _, e := range workflowDir { 209 - if !e.IsFile() { 210 - continue 211 - } 212 - 213 - fpath := filepath.Join(workflow.WorkflowDir, e.Name) 214 - contents, err := gr.RawContent(fpath) 215 - if err != nil { 216 - continue 217 - } 218 - 219 - pipeline = append(pipeline, workflow.RawWorkflow{ 220 - Name: e.Name, 221 - Contents: contents, 222 - }) 223 - } 224 - 225 - return pipeline, nil 226 - } 227 - 228 - func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 229 - l := log.FromContext(ctx) 230 - 231 - trigger := tangled.Pipeline_PullRequestTriggerData{ 232 - Action: "create", 233 - SourceBranch: sourceBranch, 234 - SourceSha: sourceSha, 235 - TargetBranch: targetBranch, 236 - } 237 - 238 - compiler := workflow.Compiler{ 239 - Trigger: tangled.Pipeline_TriggerMetadata{ 240 - Kind: string(workflow.TriggerKindPullRequest), 241 - PullRequest: &trigger, 242 - Repo: &tangled.Pipeline_TriggerRepo{ 243 - Knot: h.c.Server.Hostname, 244 - RepoDid: &targetRepo.RepoDid, 245 - Did: targetRepo.OwnerDid, 246 - Repo: &targetRepo.RepoName, 247 - DefaultBranch: targetRepo.DefaultBranch, 248 - }, 249 - }, 250 - } 251 - 252 - l.Info("raw", "raw", rawPipeline) 253 - parsed := compiler.Parse(rawPipeline) 254 - l.Info("parsed", "parsed", parsed) 255 - compiled := compiler.Compile(parsed) 256 - 257 - l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 258 - 259 - return compiled 260 - } 261 - 262 - func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 263 - raw := json.RawMessage(event.Commit.Record) 264 - rkey := event.Commit.RKey 265 - did := event.Did 266 - 267 - var record tangled.RepoPull 268 - if err := json.Unmarshal(raw, &record); err != nil { 269 - return fmt.Errorf("failed to unmarshal record: %w", err) 270 - } 271 - 272 - l := log.FromContext(ctx) 273 - l = l.With("handler", "processPull") 274 - l = l.With("did", did) 275 - 276 - l.Info("validating pull record") 277 - targetRepo, err := h.validatePullRecord(ctx, &record) 278 - if err != nil { 279 - l.Warn("pull record did not validate, skipping...") 280 - return err 281 - } 282 - 283 - l = l.With("target_repo", record.Target.Repo) 284 - l = l.With("target_branch", record.Target.Branch) 285 - 286 - l.Info("fetching latest submission") 287 - latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 288 - if err != nil { 289 - return err 290 - } 291 - 292 - sha := latestSubmission.SourceRev 293 - if sha == "" { 294 - return fmt.Errorf("failed to extract source SHA from pull submission") 295 - } 296 - l = l.With("sha", sha) 297 - 298 - l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 299 - pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 300 - if err != nil { 301 - return err 302 - } 303 - 304 - l.Info("compiling pipeline", "workflow_count", len(pipeline)) 305 - cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 306 - 307 - // do not run empty pipelines 308 - if cp.Workflows == nil { 309 - l.Info("skipping empty pipeline") 310 - return nil 311 - } 312 - 313 - l.Info("marshaling pipeline event") 314 - eventJson, err := json.Marshal(cp) 315 - if err != nil { 316 - return fmt.Errorf("failed to marshal pipeline event: %w", err) 317 - } 318 - 319 - ev := eventstream.Event{ 320 - Rkey: tid.TID(), 321 - Nsid: tangled.PipelineNSID, 322 - EventJson: eventJson, 323 - } 324 - 325 - l.Info("inserting pipeline event") 326 - return h.db.InsertEvent(ev, h.n) 327 - } 328 - 329 47 func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 330 48 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 331 49 ··· 398 116 err = h.processPublicKey(ctx, event) 399 117 case tangled.RepoNSID: 400 118 err = h.processRepo(ctx, event) 401 - case tangled.RepoPullNSID: 402 - err = h.processPull(ctx, event) 403 119 } 404 120 default: 405 121 return nil
+7 -136
knotserver/internal.go
··· 28 28 "tangled.org/core/notifier" 29 29 "tangled.org/core/rbac" 30 30 "tangled.org/core/tid" 31 - "tangled.org/core/workflow" 32 31 ) 33 32 34 33 type InternalHandle struct { ··· 188 187 fmt.Fprint(w, diskRelative) 189 188 } 190 189 191 - type PushOptions struct { 192 - skipCi bool 193 - verboseCi bool 194 - } 195 - 196 190 func (h *InternalHandle) PostReceiveHook(w http.ResponseWriter, r *http.Request) { 197 191 l := h.l.With("handler", "PostReceiveHook") 198 192 ··· 263 257 l.Error("failed to reply with pull request link", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 264 258 } 265 259 266 - err = h.triggerPipeline(&resp.Messages, line, gitUserDid, ownerDid, repoName, repoDid, pushOptions) 267 - if err != nil { 268 - l.Error("failed to trigger pipeline", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir) 260 + // emit pipeline logs link 261 + if h.c.LogsAddr != "" { 262 + host, port, err := net.SplitHostPort(h.c.LogsAddr) 263 + if err == nil { 264 + resp.Messages = append(resp.Messages, "→ Browse CI logs in your terminal:") 265 + resp.Messages = append(resp.Messages, fmt.Sprintf(" ssh -t -p %s %s %s %s", port, host, repoDid, line.NewSha)) 266 + } 269 267 } 270 268 } 271 269 ··· 319 317 Rkey: tid.TID(), 320 318 Nsid: tangled.GitRefUpdateNSID, 321 319 EventJson: eventJson, 322 - } 323 - 324 - return h.db.InsertEvent(event, h.n) 325 - } 326 - 327 - func (h *InternalHandle) triggerPipeline( 328 - clientMsgs *[]string, 329 - line git.PostReceiveLine, 330 - gitUserDid string, 331 - ownerDid string, 332 - repoName string, 333 - repoDid string, 334 - pushOptionsRaw []string, 335 - ) error { 336 - var pushOptions PushOptions 337 - for _, option := range pushOptionsRaw { 338 - if option == "skip-ci" || option == "ci-skip" { 339 - pushOptions.skipCi = true 340 - } 341 - if option == "verbose-ci" || option == "ci-verbose" { 342 - pushOptions.verboseCi = true 343 - } 344 - } 345 - if pushOptions.skipCi { 346 - return nil 347 - } 348 - 349 - repoPath, _, _, resolveErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 350 - if resolveErr != nil { 351 - return fmt.Errorf("failed to resolve repo on disk: %w", resolveErr) 352 - } 353 - 354 - gr, err := git.Open(repoPath, line.Ref) 355 - if err != nil { 356 - return err 357 - } 358 - 359 - workflowDir, err := gr.FileTree(context.Background(), workflow.WorkflowDir) 360 - if err != nil { 361 - return err 362 - } 363 - 364 - var pipeline workflow.RawPipeline 365 - for _, e := range workflowDir { 366 - if !e.IsFile() { 367 - continue 368 - } 369 - 370 - fpath := filepath.Join(workflow.WorkflowDir, e.Name) 371 - contents, err := gr.RawContent(fpath) 372 - if err != nil { 373 - continue 374 - } 375 - 376 - pipeline = append(pipeline, workflow.RawWorkflow{ 377 - Name: e.Name, 378 - Contents: contents, 379 - }) 380 - } 381 - 382 - defaultBranch, _ := gr.FindMainBranch() 383 - 384 - trigger := tangled.Pipeline_PushTriggerData{ 385 - Ref: line.Ref, 386 - OldSha: line.OldSha.String(), 387 - NewSha: line.NewSha.String(), 388 - } 389 - 390 - triggerRepo := &tangled.Pipeline_TriggerRepo{ 391 - Did: ownerDid, 392 - Knot: h.c.Server.Hostname, 393 - Repo: &repoName, 394 - RepoDid: &repoDid, 395 - DefaultBranch: defaultBranch, 396 - } 397 - 398 - changedFiles, err := gr.ChangedFilesBetween(line.OldSha.String(), line.NewSha.String()) 399 - if err != nil { 400 - return fmt.Errorf("getting changed files: %w", err) 401 - } 402 - 403 - compiler := workflow.Compiler{ 404 - Trigger: tangled.Pipeline_TriggerMetadata{ 405 - Kind: string(workflow.TriggerKindPush), 406 - Push: &trigger, 407 - Repo: triggerRepo, 408 - }, 409 - ChangedFiles: changedFiles, 410 - } 411 - 412 - cp := compiler.Compile(compiler.Parse(pipeline)) 413 - eventJson, err := json.Marshal(cp) 414 - if err != nil { 415 - return err 416 - } 417 - 418 - for _, e := range compiler.Diagnostics.Errors { 419 - *clientMsgs = append(*clientMsgs, e.String()) 420 - } 421 - 422 - if pushOptions.verboseCi { 423 - if compiler.Diagnostics.IsEmpty() { 424 - *clientMsgs = append(*clientMsgs, "success: pipeline compiled with no diagnostics") 425 - } 426 - 427 - for _, w := range compiler.Diagnostics.Warnings { 428 - *clientMsgs = append(*clientMsgs, w.String()) 429 - } 430 - } 431 - 432 - // do not run empty pipelines 433 - if cp.Workflows == nil { 434 - return nil 435 - } 436 - 437 - event := eventstream.Event{ 438 - Rkey: tid.TID(), 439 - Nsid: tangled.PipelineNSID, 440 - EventJson: eventJson, 441 - } 442 - 443 - if h.c.LogsAddr != "" { 444 - host, port, err := net.SplitHostPort(h.c.LogsAddr) 445 - if err == nil { 446 - *clientMsgs = append(*clientMsgs, "→ Browse CI logs in your terminal:") 447 - *clientMsgs = append(*clientMsgs, fmt.Sprintf(" ssh -t -p %s %s %s %s", port, host, repoDid, line.NewSha)) 448 - } 449 320 } 450 321 451 322 return h.db.InsertEvent(event, h.n)
-1
knotserver/server.go
··· 96 96 jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ 97 97 tangled.PublicKeyNSID, 98 98 tangled.RepoNSID, 99 - tangled.RepoPullNSID, 100 99 }, nil, log.SubLogger(logger, "jetstream"), db, true, c.Server.LogDids) 101 100 if err != nil { 102 101 logger.Error("failed to setup jetstream", "error", err)
+5 -36
spindle/server.go
··· 404 404 func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 405 405 l := log.FromContext(ctx).With("handler", "processKnotStream") 406 406 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 407 - if msg.Nsid == tangled.PipelineNSID { 408 - return nil 409 - tpl := tangled.Pipeline{} 410 - err := json.Unmarshal(msg.EventJson, &tpl) 411 - if err != nil { 412 - s.l.Error("failed to unmarshal pipeline event", "err", err) 413 - return err 414 - } 415 - 416 - if tpl.TriggerMetadata == nil { 417 - return fmt.Errorf("no trigger metadata found") 418 - } 419 - 420 - if tpl.TriggerMetadata.Repo == nil { 421 - return fmt.Errorf("no repo data found") 422 - } 423 - 424 - if src.Host != tpl.TriggerMetadata.Repo.Knot { 425 - return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, tpl.TriggerMetadata.Repo.Knot) 426 - } 427 - 428 - repoDid, err := s.resolvePipelineRepoDid(tpl.TriggerMetadata.Repo) 429 - if err != nil { 430 - return err 431 - } 432 - 433 - pipelineId := models.PipelineId{ 434 - Knot: src.Host, 435 - Rkey: msg.Rkey, 436 - } 437 - 438 - err = s.processPipeline(ctx, repoDid, tpl, pipelineId) 439 - if err != nil { 440 - return err 441 - } 442 - } else if msg.Nsid == tangled.GitRefUpdateNSID { 407 + if msg.Nsid == tangled.GitRefUpdateNSID { 443 408 event := tangled.GitRefUpdate{} 444 409 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 445 410 l.Error("error unmarshalling", "err", err) ··· 452 417 repo, err := s.db.GetRepoByDid(repoDid) 453 418 if err != nil { 454 419 return fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 420 + } 421 + 422 + if src.Host != repo.Knot { 423 + return fmt.Errorf("repo knot does not match event source: %s != %s", src.Host, repo.Knot) 455 424 } 456 425 457 426 // NOTE: we are blindly trusting the knot that it will return only repos it own