Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

spindle: create pipeline events from spindle

spindle will emit `sh.tangled.pipeline` event on:
- `sh.tangled.git.refUpdate` events from knot stream
- live create/update events of `sh.tangled.repo.pull` records

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 26, 2026, 4:32 PM +0900) commit 13a940d7 parent 8683da2c change-id vlousxlw
+410 -84
+14
spindle/db/events.go
··· 19 19 return eventstream.List(d, cursor, limit) 20 20 } 21 21 22 + func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 23 + eventJson, err := json.Marshal(pipeline) 24 + if err != nil { 25 + return err 26 + } 27 + event := eventstream.Event{ 28 + Rkey: rkey, 29 + Nsid: tangled.PipelineNSID, 30 + Created: time.Now().UnixNano(), 31 + EventJson: eventJson, 32 + } 33 + return d.insertEvent(event, n) 34 + } 35 + 22 36 func (d *DB) createStatusEvent( 23 37 workflowId models.WorkflowId, 24 38 statusKind models.StatusKind,
+2 -2
spindle/embedtap.go
··· 67 67 RepoFetchTimeout: 5 * time.Minute, 68 68 IdentityCacheSize: 50_000, 69 69 EventCacheSize: 10_000, 70 - SignalCollection: tangled.RepoNSID, 71 - CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID}, 70 + SignalCollection: tangled.RepoPullNSID, // HACK: to ingest PRs from any users 71 + CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID}, 72 72 AdminPassword: cfg.Server.Tap.AdminPassword, 73 73 RetryTimeout: 60 * time.Second, 74 74 }
+209 -82
spindle/server.go
··· 4 4 "context" 5 5 _ "embed" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "log/slog" 9 10 "maps" ··· 13 14 "time" 14 15 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 16 18 "github.com/go-chi/chi/v5" 19 + "github.com/go-git/go-git/v5/plumbing/object" 17 20 "github.com/hashicorp/go-version" 18 21 "tangled.org/core/api/tangled" 19 22 "tangled.org/core/eventconsumer" ··· 21 24 "tangled.org/core/eventstream" 22 25 "tangled.org/core/idresolver" 23 26 "tangled.org/core/jetstream" 27 + kgit "tangled.org/core/knotserver/git" 24 28 "tangled.org/core/log" 25 29 "tangled.org/core/notifier" 26 30 "tangled.org/core/rbac" ··· 35 39 "tangled.org/core/spindle/queue" 36 40 "tangled.org/core/spindle/secrets" 37 41 "tangled.org/core/spindle/xrpc" 42 + "tangled.org/core/tid" 43 + "tangled.org/core/workflow" 38 44 "tangled.org/core/xrpc/serviceauth" 39 45 ) 40 46 ··· 175 181 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 176 182 } 177 183 178 - // for each incoming sh.tangled.pipeline, we execute 179 - // spindle.processPipeline, which in turn enqueues the pipeline 180 - // job in the above registered queue. 184 + // spindle listen to knot stream for sh.tangled.git.refUpdate 185 + // which will sync the local workflow files in spindle and enqueues the 186 + // pipeline job for on-push workflows 181 187 ccfg := eventconsumer.NewConsumerConfig() 182 188 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 183 - ccfg.ProcessFunc = spindle.processPipeline 189 + ccfg.ProcessFunc = spindle.processKnotStream 184 190 ccfg.CursorStore = cursorStore 185 191 if cfg.Server.Dev { 186 192 ccfg.RetryInterval = 5 * time.Second ··· 395 401 return x.Router() 396 402 } 397 403 398 - func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 404 + func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventstream.Event) error { 399 405 l := log.FromContext(ctx).With("handler", "processKnotStream") 400 406 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 401 407 if msg.Nsid == tangled.PipelineNSID { ··· 429 435 Rkey: msg.Rkey, 430 436 } 431 437 432 - workflows := make(map[models.Engine][]models.Workflow) 438 + err = s.processPipeline(ctx, repoDid, tpl, pipelineId) 439 + if err != nil { 440 + return err 441 + } 442 + } else if msg.Nsid == tangled.GitRefUpdateNSID { 443 + event := tangled.GitRefUpdate{} 444 + if err := json.Unmarshal(msg.EventJson, &event); err != nil { 445 + l.Error("error unmarshalling", "err", err) 446 + return err 447 + } 448 + l = l.With("repo", event.Repo, "ref", event.Ref, "newSha", event.NewSha) 449 + l.Debug("debug") 433 450 434 - // Build pipeline environment variables once for all workflows 435 - pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId) 451 + repoDid := syntax.DID(event.Repo) 452 + repo, err := s.db.GetRepoByDid(repoDid) 453 + if err != nil { 454 + return fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 455 + } 436 456 437 - for _, w := range tpl.Workflows { 438 - if w != nil { 439 - if _, ok := s.engs[w.Engine]; !ok { 440 - err = s.db.StatusFailed(models.WorkflowId{ 441 - PipelineId: pipelineId, 442 - Name: w.Name, 443 - }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 444 - if err != nil { 445 - return fmt.Errorf("db.StatusFailed: %w", err) 446 - } 457 + // NOTE: we are blindly trusting the knot that it will return only repos it own 458 + repoCloneUri := s.newRepoCloneUrl(src.Key(), repoDid) 459 + repoPath := s.newRepoPath(repoDid) 460 + if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 461 + return fmt.Errorf("sync git repo: %w", err) 462 + } 463 + l.Info("synced git repo") 447 464 448 - continue 449 - } 465 + scheme := "https" 466 + if s.cfg.Server.Dev { 467 + scheme = "http" 468 + } 469 + client := &indigoxrpc.Client{Host: fmt.Sprintf("%s://%s", scheme, repo.Knot)} 450 470 451 - eng := s.engs[w.Engine] 471 + // HACK: fetch current default branch 472 + // TODO: this should be included in refUpdate event 473 + defaultBranch, _ := func(repo syntax.DID) (string, error) { 474 + defaultBranchOut, err := tangled.RepoGetDefaultBranch(ctx, client, repo.String()) 475 + if err != nil { 476 + return "", err 477 + } 478 + return defaultBranchOut.Name, nil 479 + }(repoDid) 452 480 453 - if _, ok := workflows[eng]; !ok { 454 - workflows[eng] = []models.Workflow{} 455 - } 481 + compiler := workflow.Compiler{ 482 + ChangedFiles: event.ChangedFiles, 483 + Trigger: tangled.Pipeline_TriggerMetadata{ 484 + Kind: string(workflow.TriggerKindPush), 485 + Push: &tangled.Pipeline_PushTriggerData{ 486 + Ref: event.Ref, 487 + OldSha: event.OldSha, 488 + NewSha: event.NewSha, 489 + }, 490 + Repo: &tangled.Pipeline_TriggerRepo{ 491 + Did: repo.Owner.String(), 492 + Knot: repo.Knot, 493 + Repo: (*string)(&repo.Rkey), 494 + RepoDid: (*string)(&repoDid), 495 + DefaultBranch: defaultBranch, 496 + }, 497 + }, 498 + } 456 499 457 - ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 458 - if err != nil { 459 - err = s.db.StatusFailed(models.WorkflowId{ 460 - PipelineId: pipelineId, 461 - Name: w.Name, 462 - }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 463 - if err != nil { 464 - return fmt.Errorf("db.StatusFailed: %w", err) 465 - } 500 + // load workflow definitions from rev (without spindle context) 501 + rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 502 + if err != nil { 503 + return fmt.Errorf("loading pipeline: %w", err) 504 + } 505 + if len(rawPipeline) == 0 { 506 + l.Info("no workflow definition find for the repo. skipping the event") 507 + return nil 508 + } 509 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 510 + // TODO: pass compile error to workflow log 511 + for _, w := range compiler.Diagnostics.Errors { 512 + l.Error(w.String()) 513 + } 514 + for _, w := range compiler.Diagnostics.Warnings { 515 + l.Warn(w.String()) 516 + } 517 + if len(tpl.Workflows) == 0 { 518 + l.Info("no workflow matching trigger 'push'. skipping the event") 519 + return nil 520 + } 466 521 467 - continue 468 - } 522 + pipelineId := models.PipelineId{ 523 + Knot: tpl.TriggerMetadata.Repo.Knot, 524 + Rkey: tid.TID(), 525 + } 526 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 527 + l.Error("failed to create pipeline event", "err", err) 528 + return nil 529 + } 530 + err = s.processPipeline(ctx, repoDid, tpl, pipelineId) 531 + if err != nil { 532 + return err 533 + } 534 + } 535 + 536 + return nil 537 + } 538 + 539 + func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 540 + if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 541 + return nil, fmt.Errorf("syncing git repo: %w", err) 542 + } 543 + gr, err := kgit.Open(repoPath, rev) 544 + if err != nil { 545 + return nil, fmt.Errorf("opening git repo: %w", err) 546 + } 469 547 470 - // inject TANGLED_* env vars after InitWorkflow 471 - // This prevents user-defined env vars from overriding them 472 - if ewf.Environment == nil { 473 - ewf.Environment = make(map[string]string) 474 - } 475 - maps.Copy(ewf.Environment, pipelineEnv) 548 + workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 549 + if errors.Is(err, object.ErrDirectoryNotFound) { 550 + // return empty RawPipeline when directory doesn't exist 551 + return nil, nil 552 + } else if err != nil { 553 + return nil, fmt.Errorf("loading file tree: %w", err) 554 + } 476 555 477 - workflows[eng] = append(workflows[eng], *ewf) 556 + var rawPipeline workflow.RawPipeline 557 + for _, e := range workflowDir { 558 + if !e.IsFile() { 559 + continue 560 + } 478 561 479 - err = s.db.StatusPending(models.WorkflowId{ 480 - PipelineId: pipelineId, 481 - Name: w.Name, 482 - }, s.n) 483 - if err != nil { 484 - return fmt.Errorf("db.StatusPending: %w", err) 485 - } 486 - } 562 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 563 + contents, err := gr.RawContent(fpath) 564 + if err != nil { 565 + return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 487 566 } 488 567 489 - ok := s.jq.Enqueue(repoDid, queue.Job{ 490 - Run: func() error { 491 - engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 492 - RepoDid: repoDid, 493 - Workflows: workflows, 494 - }, pipelineId) 495 - return nil 496 - }, 497 - OnFail: func(jobError error) { 498 - s.l.Error("pipeline run failed", "error", jobError) 499 - }, 568 + rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 569 + Name: e.Name, 570 + Contents: contents, 500 571 }) 501 - if ok { 502 - s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 503 - } else { 504 - s.l.Error("failed to enqueue pipeline: queue is full") 572 + } 573 + 574 + return rawPipeline, nil 575 + } 576 + 577 + func (s *Spindle) processPipeline(ctx context.Context, repoDid syntax.DID, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 578 + // Build pipeline environment variables once for all workflows 579 + pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId) 580 + 581 + // filter & init workflows 582 + workflows := make(map[models.Engine][]models.Workflow) 583 + for _, w := range tpl.Workflows { 584 + if w == nil { 585 + continue 586 + } 587 + if _, ok := s.engs[w.Engine]; !ok { 588 + err := s.db.StatusFailed(models.WorkflowId{ 589 + PipelineId: pipelineId, 590 + Name: w.Name, 591 + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 592 + if err != nil { 593 + return fmt.Errorf("db.StatusFailed: %w", err) 594 + } 595 + 596 + continue 505 597 } 506 - } else if msg.Nsid == tangled.GitRefUpdateNSID { 507 - event := tangled.GitRefUpdate{} 508 - if err := json.Unmarshal(msg.EventJson, &event); err != nil { 509 - l.Error("error unmarshalling", "err", err) 510 - return err 598 + 599 + eng := s.engs[w.Engine] 600 + 601 + if _, ok := workflows[eng]; !ok { 602 + workflows[eng] = []models.Workflow{} 511 603 } 512 - l = l.With("repo", event.Repo, "ref", event.Ref, "newSha", event.NewSha) 513 - l.Debug("debug") 514 604 515 - repoDid := syntax.DID(event.Repo) 516 - if _, err := s.db.GetRepoByDid(repoDid); err != nil { 517 - return fmt.Errorf("unknown repoDid %s: %w", repoDid, err) 605 + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 606 + if err != nil { 607 + err = s.db.StatusFailed(models.WorkflowId{ 608 + PipelineId: pipelineId, 609 + Name: w.Name, 610 + }, fmt.Sprintf("init workflow: %s", err), -1, s.n) 611 + if err != nil { 612 + return fmt.Errorf("db.StatusFailed: %w", err) 613 + } 614 + 615 + continue 518 616 } 519 617 520 - // NOTE: we are blindly trusting the knot that it will return only repos it own 521 - repoCloneUri := s.newRepoCloneUrl(src.Key(), syntax.DID(event.Repo)) 522 - repoPath := s.newRepoPath(syntax.DID(event.Repo)) 523 - if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 524 - return fmt.Errorf("sync git repo: %w", err) 618 + // inject TANGLED_* env vars after InitWorkflow 619 + // This prevents user-defined env vars from overriding them 620 + if ewf.Environment == nil { 621 + ewf.Environment = make(map[string]string) 525 622 } 526 - l.Info("synced git repo") 623 + maps.Copy(ewf.Environment, pipelineEnv) 527 624 528 - // TODO: plan the pipeline 625 + workflows[eng] = append(workflows[eng], *ewf) 529 626 } 530 627 628 + // enqueue pipeline 629 + ok := s.jq.Enqueue(repoDid, queue.Job{ 630 + Run: func() error { 631 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 632 + RepoDid: repoDid, 633 + Workflows: workflows, 634 + }, pipelineId) 635 + return nil 636 + }, 637 + OnFail: func(jobError error) { 638 + s.l.Error("pipeline run failed", "error", jobError) 639 + }, 640 + }) 641 + if !ok { 642 + return fmt.Errorf("failed to enqueue pipeline: queue is full") 643 + } 644 + s.l.Info("pipeline enqueued successfully", "id", pipelineId) 645 + 646 + // after successful enqueue, emit StatusPending for all workflows 647 + for _, ewfs := range workflows { 648 + for _, ewf := range ewfs { 649 + err := s.db.StatusPending(models.WorkflowId{ 650 + PipelineId: pipelineId, 651 + Name: ewf.Name, 652 + }, s.n) 653 + if err != nil { 654 + return fmt.Errorf("db.StatusPending: %w", err) 655 + } 656 + } 657 + } 531 658 return nil 532 659 } 533 660
+185
spindle/tapclient.go
··· 6 6 "encoding/json" 7 7 "errors" 8 8 "fmt" 9 + "io" 9 10 "log/slog" 11 + "net/http" 12 + "net/url" 10 13 "sync" 11 14 "time" 12 15 13 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 14 18 "tangled.org/core/api/tangled" 19 + avmodels "tangled.org/core/appview/models" 15 20 "tangled.org/core/eventconsumer" 16 21 "tangled.org/core/log" 17 22 "tangled.org/core/rbac" 18 23 "tangled.org/core/spindle/db" 19 24 "tangled.org/core/spindle/git" 25 + "tangled.org/core/spindle/models" 20 26 "tangled.org/core/tapc" 27 + "tangled.org/core/tid" 28 + "tangled.org/core/workflow" 21 29 ) 22 30 23 31 const ( ··· 75 83 return t.processRepo(ctx, evt.Record) 76 84 case tangled.RepoCollaboratorNSID: 77 85 return t.processCollaborator(ctx, evt.Record) 86 + case tangled.RepoPullNSID: 87 + return t.processPull(ctx, evt.Record) 78 88 } 79 89 return nil 80 90 } ··· 306 316 return nil 307 317 } 308 318 319 + func (t *Tap) processPull(ctx context.Context, evt *tapc.RecordEventData) error { 320 + l := t.logger.With("collection", evt.Collection, "did", evt.Did, "rkey", evt.Rkey) 321 + 322 + // only listen to live events 323 + if !evt.Live { 324 + l.Info("skipping backfill event", "event", evt.AtUri()) 325 + return nil 326 + } 327 + 328 + switch evt.Action { 329 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 330 + record := tangled.RepoPull{} 331 + if err := json.Unmarshal(evt.Record, &record); err != nil { 332 + l.Error("invalid record", "err", err) 333 + return fmt.Errorf("parsing record: %w", err) 334 + } 335 + 336 + // ignore legacy records 337 + if record.Target == nil { 338 + l.Info("ignoring pull record: target repo is nil") 339 + return nil 340 + } 341 + 342 + // ignore patch-based and fork-based PRs 343 + if record.Source == nil || record.Source.Repo != nil { 344 + l.Info("ignoring pull record: not a branch-based pull request") 345 + return nil 346 + } 347 + 348 + // skip if target repo is unknown 349 + repo, err := t.spindle.db.GetRepoByDid(syntax.DID(record.Target.Repo)) 350 + if err != nil { 351 + l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 352 + return fmt.Errorf("target repo is unknown") 353 + } 354 + 355 + // only accept branch-based PR (excluding patch-based and fork-based) 356 + if record.Source == nil || record.Source.Repo != nil { 357 + l.Warn("skipping non-branch-based PR") 358 + return nil 359 + } 360 + 361 + latestSubmission, err := t.fetchLatestSubmission(ctx, evt.Did.String(), evt.Rkey.String(), &record) 362 + if err != nil { 363 + return err 364 + } 365 + sourceSha := latestSubmission.SourceRev 366 + 367 + scheme := "https" 368 + if t.spindle.cfg.Server.Dev { 369 + scheme = "http" 370 + } 371 + client := &indigoxrpc.Client{Host: fmt.Sprintf("%s://%s", scheme, repo.Knot)} 372 + 373 + // fetch current default branch 374 + defaultBranch, _ := func(repo syntax.DID) (string, error) { 375 + defaultBranchOut, err := tangled.RepoGetDefaultBranch(ctx, client, repo.String()) 376 + if err != nil { 377 + return "", err 378 + } 379 + return defaultBranchOut.Name, nil 380 + }(repo.RepoDid) 381 + 382 + compiler := workflow.Compiler{ 383 + Trigger: tangled.Pipeline_TriggerMetadata{ 384 + Kind: string(workflow.TriggerKindPullRequest), 385 + PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 386 + Action: "create", 387 + SourceBranch: record.Source.Branch, 388 + SourceSha: sourceSha, 389 + TargetBranch: record.Target.Branch, 390 + }, 391 + Repo: &tangled.Pipeline_TriggerRepo{ 392 + Did: repo.Owner.String(), 393 + Knot: repo.Knot, 394 + Repo: (*string)(&repo.Rkey), 395 + RepoDid: (*string)(&repo.RepoDid), 396 + DefaultBranch: defaultBranch, 397 + }, 398 + }, 399 + } 400 + 401 + repoUri := t.spindle.newRepoCloneUrl(repo.Knot, repo.RepoDid) 402 + repoPath := t.spindle.newRepoPath(repo.RepoDid) 403 + 404 + // load workflow definitions from rev (without spindle context) 405 + rawPipeline, err := t.spindle.loadPipeline(ctx, repoUri, repoPath, sourceSha) 406 + if err != nil { 407 + // don't retry 408 + l.Error("failed loading pipeline", "err", err) 409 + return nil 410 + } 411 + if len(rawPipeline) == 0 { 412 + l.Info("no workflow definition find for the repo. skipping the event") 413 + return nil 414 + } 415 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 416 + // TODO: pass compile error to workflow log 417 + for _, w := range compiler.Diagnostics.Errors { 418 + l.Error(w.String()) 419 + } 420 + for _, w := range compiler.Diagnostics.Warnings { 421 + l.Warn(w.String()) 422 + } 423 + if len(tpl.Workflows) == 0 { 424 + l.Info("no workflow matching trigger 'pull_request'. skipping the event") 425 + return nil 426 + } 427 + 428 + pipelineId := models.PipelineId{ 429 + Knot: tpl.TriggerMetadata.Repo.Knot, 430 + Rkey: tid.TID(), 431 + } 432 + if err := t.spindle.db.CreatePipelineEvent(pipelineId.Rkey, tpl, t.spindle.n); err != nil { 433 + l.Error("failed to create pipeline event", "err", err) 434 + return nil 435 + } 436 + err = t.spindle.processPipeline(ctx, repo.RepoDid, tpl, pipelineId) 437 + if err != nil { 438 + // don't retry 439 + l.Error("failed processing pipeline", "err", err) 440 + return nil 441 + } 442 + case tapc.RecordDeleteAction: 443 + // no-op 444 + } 445 + return nil 446 + } 447 + 309 448 func (t *Tap) bufferCollab(repoDid syntax.DID, evt *tapc.RecordEventData) { 310 449 t.pendingMu.Lock() 311 450 defer t.pendingMu.Unlock() ··· 373 512 t.logger.Warn("expired buffered collaborator events without matching repo arrival", "count", expired, "ttl", pendingCollabTTL) 374 513 } 375 514 } 515 + 516 + func (t *Tap) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*avmodels.PullSubmission, error) { 517 + // resolve the PR owner's identity to fetch the blob from their PDS 518 + prOwnerIdent, err := t.spindle.res.ResolveIdent(ctx, did) 519 + if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 520 + return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 521 + } 522 + 523 + if len(record.Rounds) == 0 { 524 + return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 525 + } 526 + 527 + roundNumber := len(record.Rounds) - 1 528 + round := record.Rounds[roundNumber] 529 + 530 + // fetch the blob from the PR owner's PDS 531 + prOwnerPds := prOwnerIdent.PDSEndpoint() 532 + blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 533 + if err != nil { 534 + return nil, fmt.Errorf("failed to construct blob URL: %w", err) 535 + } 536 + q := blobUrl.Query() 537 + q.Set("cid", round.PatchBlob.Ref.String()) 538 + q.Set("did", did) 539 + blobUrl.RawQuery = q.Encode() 540 + 541 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 542 + if err != nil { 543 + return nil, fmt.Errorf("failed to create blob request: %w", err) 544 + } 545 + req.Header.Set("Content-Type", "application/json") 546 + 547 + blobResp, err := http.DefaultClient.Do(req) 548 + if err != nil { 549 + return nil, fmt.Errorf("failed to fetch blob: %w", err) 550 + } 551 + defer blobResp.Body.Close() 552 + 553 + blob := io.ReadCloser(blobResp.Body) 554 + latestSubmission, err := avmodels.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 555 + if err != nil { 556 + return nil, fmt.Errorf("failed to parse submission: %w", err) 557 + } 558 + 559 + return latestSubmission, nil 560 + }