Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/eventstream" 20 "tangled.org/core/knotserver/db" 21 "tangled.org/core/knotserver/git" 22 knotxrpc "tangled.org/core/knotserver/xrpc" 23 "tangled.org/core/log" 24 "tangled.org/core/tid" 25 "tangled.org/core/workflow" 26) 27 28func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 29 l := log.FromContext(ctx).With("handler", "processPublicKey", "did", event.Did, "rkey", event.Commit.RKey) 30 did := syntax.DID(event.Did) 31 rkey := syntax.RecordKey(event.Commit.RKey) 32 33 switch event.Commit.Operation { 34 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 35 var record tangled.PublicKey 36 if err := json.Unmarshal(json.RawMessage(event.Commit.Record), &record); err != nil { 37 return fmt.Errorf("failed to unmarshal record: %w", err) 38 } 39 40 pk := db.PublicKey{ 41 Did: did, 42 Rkey: rkey, 43 PublicKey: record, 44 } 45 if err := h.db.UpsertPublicKey(pk); err != nil { 46 return fmt.Errorf("failed to upsert public key: %w", err) 47 } 48 l.Info("upserted public key from firehose") 49 case jmodels.CommitOperationDelete: 50 if err := h.db.DeletePublicKeyByRkey(did, rkey); err != nil { 51 return fmt.Errorf("failed to delete public key: %w", err) 52 } 53 l.Info("deleted public key from firehose") 54 } 55 56 return nil 57} 58 59// returns a repo path on disk if present, and error if not 60type targetRepo struct { 61 RepoPath string 62 OwnerDid string 63 RepoName string 64 RepoDid string 65 DefaultBranch string // default branch 66} 67 68func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 69 if record.Target == nil { 70 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 71 } 72 73 l := log.FromContext(ctx).With("handler", "validatePullRecord") 74 l = l.With("target_repo", record.Target.Repo) 75 l = l.With("target_branch", record.Target.Branch) 76 77 if record.Source == nil { 78 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 79 } 80 81 if record.Source.Repo != nil { 82 return nil, fmt.Errorf("ignoring pull record: fork based pull") 83 } 84 85 var repoPath, ownerDid, repoName, repoDid string 86 switch { 87 case strings.HasPrefix(record.Target.Repo, "did:"): 88 repoDid = record.Target.Repo 89 var lookupErr error 90 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 91 if lookupErr != nil { 92 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 93 } 94 95 case strings.Contains(record.Target.Repo, "/"): 96 // TODO: get rid of this PDS fetch once all repos have DIDs 97 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 98 if parseErr != nil { 99 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 100 } 101 102 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 103 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 104 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 105 } 106 107 xrpcc := xrpc.Client{ 108 Host: ident.PDSEndpoint(), 109 } 110 111 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 112 if getErr != nil { 113 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 114 } 115 116 repo, ok := resp.Value.Val.(*tangled.Repo) 117 if !ok { 118 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 119 } 120 121 if repo.Knot != h.c.Server.Hostname { 122 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 123 } 124 125 ownerDid = ident.DID.String() 126 repoName = repoAt.RecordKey().String() 127 128 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 129 if didErr != nil { 130 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 131 } 132 133 var lookupErr error 134 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 135 if lookupErr != nil { 136 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 137 } 138 139 default: 140 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 141 } 142 143 gr, err := git.Open(repoPath, record.Source.Branch) 144 if err != nil { 145 return nil, fmt.Errorf("failed to open git repository: %w", err) 146 } 147 148 defaultBranch, _ := gr.FindMainBranch() 149 150 return &targetRepo{ 151 RepoPath: repoPath, 152 OwnerDid: ownerDid, 153 RepoName: repoName, 154 RepoDid: repoDid, 155 DefaultBranch: defaultBranch, 156 }, nil 157} 158 159func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 160 // resolve the PR owner's identity to fetch the blob from their PDS 161 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 162 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 163 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 164 } 165 166 if len(record.Rounds) == 0 { 167 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 168 } 169 170 roundNumber := len(record.Rounds) - 1 171 round := record.Rounds[roundNumber] 172 173 // fetch the blob from the PR owner's PDS 174 prOwnerPds := prOwnerIdent.PDSEndpoint() 175 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 176 if err != nil { 177 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 178 } 179 q := blobUrl.Query() 180 q.Set("cid", round.PatchBlob.Ref.String()) 181 q.Set("did", did) 182 blobUrl.RawQuery = q.Encode() 183 184 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 185 if err != nil { 186 return nil, fmt.Errorf("failed to create blob request: %w", err) 187 } 188 req.Header.Set("Content-Type", "application/json") 189 190 blobResp, err := http.DefaultClient.Do(req) 191 if err != nil { 192 return nil, fmt.Errorf("failed to fetch blob: %w", err) 193 } 194 defer blobResp.Body.Close() 195 196 blob := io.ReadCloser(blobResp.Body) 197 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 198 if err != nil { 199 return nil, fmt.Errorf("failed to parse submission: %w", err) 200 } 201 202 return latestSubmission, nil 203} 204 205func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 206 gr, err := git.Open(repoPath, sha) 207 if err != nil { 208 return nil, fmt.Errorf("failed to open git repository: %w", err) 209 } 210 211 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 212 if err != nil { 213 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 214 } 215 216 var pipeline workflow.RawPipeline 217 for _, e := range workflowDir { 218 if !e.IsFile() { 219 continue 220 } 221 222 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 223 contents, err := gr.RawContent(fpath) 224 if err != nil { 225 continue 226 } 227 228 pipeline = append(pipeline, workflow.RawWorkflow{ 229 Name: e.Name, 230 Contents: contents, 231 }) 232 } 233 234 return pipeline, nil 235} 236 237func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 238 l := log.FromContext(ctx) 239 240 trigger := tangled.Pipeline_PullRequestTriggerData{ 241 Action: "create", 242 SourceBranch: sourceBranch, 243 SourceSha: sourceSha, 244 TargetBranch: targetBranch, 245 } 246 247 compiler := workflow.Compiler{ 248 Trigger: tangled.Pipeline_TriggerMetadata{ 249 Kind: string(workflow.TriggerKindPullRequest), 250 PullRequest: &trigger, 251 Repo: &tangled.Pipeline_TriggerRepo{ 252 Knot: h.c.Server.Hostname, 253 RepoDid: &targetRepo.RepoDid, 254 Did: targetRepo.OwnerDid, 255 Repo: &targetRepo.RepoName, 256 DefaultBranch: targetRepo.DefaultBranch, 257 }, 258 }, 259 } 260 261 l.Info("raw", "raw", rawPipeline) 262 parsed := compiler.Parse(rawPipeline) 263 l.Info("parsed", "parsed", parsed) 264 compiled := compiler.Compile(parsed) 265 266 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 267 268 return compiled 269} 270 271func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 272 raw := json.RawMessage(event.Commit.Record) 273 rkey := event.Commit.RKey 274 did := event.Did 275 276 var record tangled.RepoPull 277 if err := json.Unmarshal(raw, &record); err != nil { 278 return fmt.Errorf("failed to unmarshal record: %w", err) 279 } 280 281 l := log.FromContext(ctx) 282 l = l.With("handler", "processPull") 283 l = l.With("did", did) 284 285 l.Info("validating pull record") 286 targetRepo, err := h.validatePullRecord(ctx, &record) 287 if err != nil { 288 l.Warn("pull record did not validate, skipping...") 289 return err 290 } 291 292 l = l.With("target_repo", record.Target.Repo) 293 l = l.With("target_branch", record.Target.Branch) 294 295 l.Info("fetching latest submission") 296 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 297 if err != nil { 298 return err 299 } 300 301 sha := latestSubmission.SourceRev 302 if sha == "" { 303 return fmt.Errorf("failed to extract source SHA from pull submission") 304 } 305 l = l.With("sha", sha) 306 307 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 308 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 309 if err != nil { 310 return err 311 } 312 313 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 314 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 315 316 // do not run empty pipelines 317 if cp.Workflows == nil { 318 l.Info("skipping empty pipeline") 319 return nil 320 } 321 322 l.Info("marshaling pipeline event") 323 eventJson, err := json.Marshal(cp) 324 if err != nil { 325 return fmt.Errorf("failed to marshal pipeline event: %w", err) 326 } 327 328 ev := eventstream.Event{ 329 Rkey: tid.TID(), 330 Nsid: tangled.PipelineNSID, 331 EventJson: eventJson, 332 } 333 334 l.Info("inserting pipeline event") 335 return h.db.InsertEvent(ev, h.n) 336} 337 338func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 339 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 340 341 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 342 if rkey == "" { 343 return nil 344 } 345 346 if event.Commit.Operation == jmodels.CommitOperationDelete { 347 return nil 348 } 349 350 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 351 return nil 352 } 353 354 raw := json.RawMessage(event.Commit.Record) 355 var record tangled.Repo 356 if err := json.Unmarshal(raw, &record); err != nil { 357 return fmt.Errorf("failed to unmarshal repo record: %w", err) 358 } 359 360 if record.Knot != h.c.Server.Hostname { 361 return nil 362 } 363 if record.RepoDid == nil || *record.RepoDid == "" { 364 l.Info("skipping repo event without repoDid") 365 return nil 366 } 367 repoDid := *record.RepoDid 368 369 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 370 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 371 return nil 372 } 373 374 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 375 if lookupErr != nil { 376 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 377 return nil 378 } 379 if ownerDid != event.Did { 380 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 381 return nil 382 } 383 384 alias := db.RepoAlias{ 385 OwnerDid: event.Did, 386 Rkey: rkey, 387 RepoDid: repoDid, 388 Rev: event.Commit.Rev, 389 } 390 if err := h.db.UpsertRepoAlias(alias); err != nil { 391 l.Warn("failed to upsert repo alias", "err", err) 392 return nil 393 } 394 395 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 396 return nil 397} 398 399func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 400 var err error 401 switch event.Kind { 402 case jmodels.EventKindIdentity: 403 err = h.resolver.InvalidateIdent(ctx, event.Did) 404 case jmodels.EventKindCommit: 405 switch event.Commit.Collection { 406 case tangled.PublicKeyNSID: 407 err = h.processPublicKey(ctx, event) 408 case tangled.RepoNSID: 409 err = h.processRepo(ctx, event) 410 case tangled.RepoPullNSID: 411 err = h.processPull(ctx, event) 412 } 413 default: 414 return nil 415 } 416 417 if err != nil { 418 args := []any{"kind", event.Kind, "err", err} 419 if event.Kind == jmodels.EventKindCommit { 420 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 421 } 422 h.l.Warn("failed to process event, skipping", args...) 423 } 424 425 lastTimeUs := event.TimeUS + 1 426 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 427 h.l.Error("failed to save cursor", "err", saveErr) 428 } 429 430 return nil 431}