Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/eventstream" 20 "tangled.org/core/knotserver/db" 21 "tangled.org/core/knotserver/git" 22 knotxrpc "tangled.org/core/knotserver/xrpc" 23 "tangled.org/core/log" 24 "tangled.org/core/tid" 25 "tangled.org/core/workflow" 26) 27 28func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 29 l := log.FromContext(ctx) 30 raw := json.RawMessage(event.Commit.Record) 31 did := event.Did 32 33 var record tangled.PublicKey 34 if err := json.Unmarshal(raw, &record); err != nil { 35 return fmt.Errorf("failed to unmarshal record: %w", err) 36 } 37 38 pk := db.PublicKey{ 39 Did: did, 40 PublicKey: record, 41 } 42 if err := h.db.AddPublicKey(pk); err != nil { 43 l.Error("failed to add public key", "error", err) 44 return fmt.Errorf("failed to add public key: %w", err) 45 } 46 l.Info("added public key from firehose", "did", did) 47 return nil 48} 49 50// returns a repo path on disk if present, and error if not 51type targetRepo struct { 52 RepoPath string 53 OwnerDid string 54 RepoName string 55 RepoDid string 56 DefaultBranch string // default branch 57} 58 59func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 60 if record.Target == nil { 61 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 62 } 63 64 l := log.FromContext(ctx).With("handler", "validatePullRecord") 65 l = l.With("target_repo", record.Target.Repo) 66 l = l.With("target_branch", record.Target.Branch) 67 68 if record.Source == nil { 69 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 70 } 71 72 if record.Source.Repo != nil { 73 return nil, fmt.Errorf("ignoring pull record: fork based pull") 74 } 75 76 var repoPath, ownerDid, repoName, repoDid string 77 switch { 78 case strings.HasPrefix(record.Target.Repo, "did:"): 79 repoDid = record.Target.Repo 80 var lookupErr error 81 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 82 if lookupErr != nil { 83 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 84 } 85 86 case strings.Contains(record.Target.Repo, "/"): 87 // TODO: get rid of this PDS fetch once all repos have DIDs 88 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 89 if parseErr != nil { 90 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 91 } 92 93 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 94 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 95 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 96 } 97 98 xrpcc := xrpc.Client{ 99 Host: ident.PDSEndpoint(), 100 } 101 102 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 103 if getErr != nil { 104 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 105 } 106 107 repo, ok := resp.Value.Val.(*tangled.Repo) 108 if !ok { 109 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 110 } 111 112 if repo.Knot != h.c.Server.Hostname { 113 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 114 } 115 116 ownerDid = ident.DID.String() 117 repoName = repoAt.RecordKey().String() 118 119 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 120 if didErr != nil { 121 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 122 } 123 124 var lookupErr error 125 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 126 if lookupErr != nil { 127 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 128 } 129 130 default: 131 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 132 } 133 134 gr, err := git.Open(repoPath, record.Source.Branch) 135 if err != nil { 136 return nil, fmt.Errorf("failed to open git repository: %w", err) 137 } 138 139 defaultBranch, _ := gr.FindMainBranch() 140 141 return &targetRepo{ 142 RepoPath: repoPath, 143 OwnerDid: ownerDid, 144 RepoName: repoName, 145 RepoDid: repoDid, 146 DefaultBranch: defaultBranch, 147 }, nil 148} 149 150func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 151 // resolve the PR owner's identity to fetch the blob from their PDS 152 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 153 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 154 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 155 } 156 157 if len(record.Rounds) == 0 { 158 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 159 } 160 161 roundNumber := len(record.Rounds) - 1 162 round := record.Rounds[roundNumber] 163 164 // fetch the blob from the PR owner's PDS 165 prOwnerPds := prOwnerIdent.PDSEndpoint() 166 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 167 if err != nil { 168 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 169 } 170 q := blobUrl.Query() 171 q.Set("cid", round.PatchBlob.Ref.String()) 172 q.Set("did", did) 173 blobUrl.RawQuery = q.Encode() 174 175 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 176 if err != nil { 177 return nil, fmt.Errorf("failed to create blob request: %w", err) 178 } 179 req.Header.Set("Content-Type", "application/json") 180 181 blobResp, err := http.DefaultClient.Do(req) 182 if err != nil { 183 return nil, fmt.Errorf("failed to fetch blob: %w", err) 184 } 185 defer blobResp.Body.Close() 186 187 blob := io.ReadCloser(blobResp.Body) 188 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 189 if err != nil { 190 return nil, fmt.Errorf("failed to parse submission: %w", err) 191 } 192 193 return latestSubmission, nil 194} 195 196func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 197 gr, err := git.Open(repoPath, sha) 198 if err != nil { 199 return nil, fmt.Errorf("failed to open git repository: %w", err) 200 } 201 202 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 203 if err != nil { 204 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 205 } 206 207 var pipeline workflow.RawPipeline 208 for _, e := range workflowDir { 209 if !e.IsFile() { 210 continue 211 } 212 213 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 214 contents, err := gr.RawContent(fpath) 215 if err != nil { 216 continue 217 } 218 219 pipeline = append(pipeline, workflow.RawWorkflow{ 220 Name: e.Name, 221 Contents: contents, 222 }) 223 } 224 225 return pipeline, nil 226} 227 228func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 229 l := log.FromContext(ctx) 230 231 trigger := tangled.Pipeline_PullRequestTriggerData{ 232 Action: "create", 233 SourceBranch: sourceBranch, 234 SourceSha: sourceSha, 235 TargetBranch: targetBranch, 236 } 237 238 compiler := workflow.Compiler{ 239 Trigger: tangled.Pipeline_TriggerMetadata{ 240 Kind: string(workflow.TriggerKindPullRequest), 241 PullRequest: &trigger, 242 Repo: &tangled.Pipeline_TriggerRepo{ 243 Knot: h.c.Server.Hostname, 244 RepoDid: &targetRepo.RepoDid, 245 Did: targetRepo.OwnerDid, 246 Repo: &targetRepo.RepoName, 247 DefaultBranch: targetRepo.DefaultBranch, 248 }, 249 }, 250 } 251 252 l.Info("raw", "raw", rawPipeline) 253 parsed := compiler.Parse(rawPipeline) 254 l.Info("parsed", "parsed", parsed) 255 compiled := compiler.Compile(parsed) 256 257 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 258 259 return compiled 260} 261 262func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 263 raw := json.RawMessage(event.Commit.Record) 264 rkey := event.Commit.RKey 265 did := event.Did 266 267 var record tangled.RepoPull 268 if err := json.Unmarshal(raw, &record); err != nil { 269 return fmt.Errorf("failed to unmarshal record: %w", err) 270 } 271 272 l := log.FromContext(ctx) 273 l = l.With("handler", "processPull") 274 l = l.With("did", did) 275 276 l.Info("validating pull record") 277 targetRepo, err := h.validatePullRecord(ctx, &record) 278 if err != nil { 279 l.Warn("pull record did not validate, skipping...") 280 return err 281 } 282 283 l = l.With("target_repo", record.Target.Repo) 284 l = l.With("target_branch", record.Target.Branch) 285 286 l.Info("fetching latest submission") 287 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 288 if err != nil { 289 return err 290 } 291 292 sha := latestSubmission.SourceRev 293 if sha == "" { 294 return fmt.Errorf("failed to extract source SHA from pull submission") 295 } 296 l = l.With("sha", sha) 297 298 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 299 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 300 if err != nil { 301 return err 302 } 303 304 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 305 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 306 307 // do not run empty pipelines 308 if cp.Workflows == nil { 309 l.Info("skipping empty pipeline") 310 return nil 311 } 312 313 l.Info("marshaling pipeline event") 314 eventJson, err := json.Marshal(cp) 315 if err != nil { 316 return fmt.Errorf("failed to marshal pipeline event: %w", err) 317 } 318 319 ev := eventstream.Event{ 320 Rkey: tid.TID(), 321 Nsid: tangled.PipelineNSID, 322 EventJson: eventJson, 323 } 324 325 l.Info("inserting pipeline event") 326 return h.db.InsertEvent(ev, h.n) 327} 328 329func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 330 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 331 332 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 333 if rkey == "" { 334 return nil 335 } 336 337 if event.Commit.Operation == jmodels.CommitOperationDelete { 338 return nil 339 } 340 341 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 342 return nil 343 } 344 345 raw := json.RawMessage(event.Commit.Record) 346 var record tangled.Repo 347 if err := json.Unmarshal(raw, &record); err != nil { 348 return fmt.Errorf("failed to unmarshal repo record: %w", err) 349 } 350 351 if record.Knot != h.c.Server.Hostname { 352 return nil 353 } 354 if record.RepoDid == nil || *record.RepoDid == "" { 355 l.Info("skipping repo event without repoDid") 356 return nil 357 } 358 repoDid := *record.RepoDid 359 360 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 361 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 362 return nil 363 } 364 365 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 366 if lookupErr != nil { 367 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 368 return nil 369 } 370 if ownerDid != event.Did { 371 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 372 return nil 373 } 374 375 alias := db.RepoAlias{ 376 OwnerDid: event.Did, 377 Rkey: rkey, 378 RepoDid: repoDid, 379 Rev: event.Commit.Rev, 380 } 381 if err := h.db.UpsertRepoAlias(alias); err != nil { 382 l.Warn("failed to upsert repo alias", "err", err) 383 return nil 384 } 385 386 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 387 return nil 388} 389 390func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 391 var err error 392 switch event.Kind { 393 case jmodels.EventKindIdentity: 394 err = h.resolver.InvalidateIdent(ctx, event.Did) 395 case jmodels.EventKindCommit: 396 switch event.Commit.Collection { 397 case tangled.PublicKeyNSID: 398 err = h.processPublicKey(ctx, event) 399 case tangled.RepoNSID: 400 err = h.processRepo(ctx, event) 401 case tangled.RepoPullNSID: 402 err = h.processPull(ctx, event) 403 } 404 default: 405 return nil 406 } 407 408 if err != nil { 409 args := []any{"kind", event.Kind, "err", err} 410 if event.Kind == jmodels.EventKindCommit { 411 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 412 } 413 h.l.Warn("failed to process event, skipping", args...) 414 } 415 416 lastTimeUs := event.TimeUS + 1 417 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 418 h.l.Error("failed to save cursor", "err", saveErr) 419 } 420 421 return nil 422}