Monorepo for Tangled tangled.org
9

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/knotserver/db" 20 "tangled.org/core/knotserver/git" 21 "tangled.org/core/log" 22 "tangled.org/core/rbac" 23 "tangled.org/core/workflow" 24) 25 26func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 27 l := log.FromContext(ctx) 28 raw := json.RawMessage(event.Commit.Record) 29 did := event.Did 30 31 var record tangled.PublicKey 32 if err := json.Unmarshal(raw, &record); err != nil { 33 return fmt.Errorf("failed to unmarshal record: %w", err) 34 } 35 36 pk := db.PublicKey{ 37 Did: did, 38 PublicKey: record, 39 } 40 if err := h.db.AddPublicKey(pk); err != nil { 41 l.Error("failed to add public key", "error", err) 42 return fmt.Errorf("failed to add public key: %w", err) 43 } 44 l.Info("added public key from firehose", "did", did) 45 return nil 46} 47 48func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 49 l := log.FromContext(ctx) 50 raw := json.RawMessage(event.Commit.Record) 51 did := event.Did 52 53 var record tangled.KnotMember 54 if err := json.Unmarshal(raw, &record); err != nil { 55 return fmt.Errorf("failed to unmarshal record: %w", err) 56 } 57 58 if record.Domain != h.c.Server.Hostname { 59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 61 } 62 63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 64 if err != nil || !ok { 65 l.Error("failed to add member", "did", did) 66 return fmt.Errorf("failed to enforce permissions: %w", err) 67 } 68 69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 70 l.Error("failed to add member", "error", err) 71 return fmt.Errorf("failed to add member: %w", err) 72 } 73 l.Info("added member from firehose", "member", record.Subject) 74 75 if err := h.db.AddDid(record.Subject); err != nil { 76 l.Error("failed to add did", "error", err) 77 return fmt.Errorf("failed to add did: %w", err) 78 } 79 h.jc.AddDid(record.Subject) 80 81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 82 return fmt.Errorf("failed to fetch and add keys: %w", err) 83 } 84 85 return nil 86} 87 88// returns a repo path on disk if present, and error if not 89type targetRepo struct { 90 RepoPath string 91 OwnerDid string 92 RepoName string 93 RepoDid string 94 DefaultBranch string // default branch 95} 96 97func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 98 if record.Target == nil { 99 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 100 } 101 102 if record.Source == nil { 103 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 104 } 105 106 if record.Source.Repo != nil || record.Source.RepoDid != nil { 107 return nil, fmt.Errorf("ignoring pull record: fork based pull") 108 } 109 110 var repoPath, ownerDid, repoName, repoDid string 111 switch { 112 case record.Target.RepoDid != nil && *record.Target.RepoDid != "": 113 repoDid = *record.Target.RepoDid 114 var lookupErr error 115 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 116 if lookupErr != nil { 117 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 118 } 119 120 case record.Target.Repo != nil: 121 // TODO: get rid of this PDS fetch once all repos have DIDs 122 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo) 123 if parseErr != nil { 124 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 125 } 126 127 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 128 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 129 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 130 } 131 132 xrpcc := xrpc.Client{ 133 Host: ident.PDSEndpoint(), 134 } 135 136 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 137 if getErr != nil { 138 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 139 } 140 141 repo := resp.Value.Val.(*tangled.Repo) 142 143 if repo.Knot != h.c.Server.Hostname { 144 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 145 } 146 147 ownerDid = ident.DID.String() 148 repoName = repo.Name 149 150 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 151 if didErr != nil { 152 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 153 } 154 155 var lookupErr error 156 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 157 if lookupErr != nil { 158 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 159 } 160 161 default: 162 return nil, fmt.Errorf("ignoring pull record: target has neither repo nor repoDid") 163 } 164 165 gr, err := git.Open(repoPath, record.Source.Branch) 166 if err != nil { 167 return nil, fmt.Errorf("failed to open git repository: %w", err) 168 } 169 170 defaultBranch, _ := gr.FindMainBranch() 171 172 return &targetRepo{ 173 RepoPath: repoPath, 174 OwnerDid: ownerDid, 175 RepoName: repoName, 176 RepoDid: repoDid, 177 DefaultBranch: defaultBranch, 178 }, nil 179} 180 181func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 182 // resolve the PR owner's identity to fetch the blob from their PDS 183 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 184 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 185 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 186 } 187 188 if len(record.Rounds) == 0 { 189 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 190 } 191 192 roundNumber := len(record.Rounds) - 1 193 round := record.Rounds[roundNumber] 194 195 // fetch the blob from the PR owner's PDS 196 prOwnerPds := prOwnerIdent.PDSEndpoint() 197 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 198 if err != nil { 199 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 200 } 201 q := blobUrl.Query() 202 q.Set("cid", round.PatchBlob.Ref.String()) 203 q.Set("did", did) 204 blobUrl.RawQuery = q.Encode() 205 206 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 207 if err != nil { 208 return nil, fmt.Errorf("failed to create blob request: %w", err) 209 } 210 req.Header.Set("Content-Type", "application/json") 211 212 blobResp, err := http.DefaultClient.Do(req) 213 if err != nil { 214 return nil, fmt.Errorf("failed to fetch blob: %w", err) 215 } 216 defer blobResp.Body.Close() 217 218 blob := io.ReadCloser(blobResp.Body) 219 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 220 if err != nil { 221 return nil, fmt.Errorf("failed to parse submission: %w", err) 222 } 223 224 return latestSubmission, nil 225} 226 227func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 228 gr, err := git.Open(repoPath, sha) 229 if err != nil { 230 return nil, fmt.Errorf("failed to open git repository: %w", err) 231 } 232 233 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 234 if err != nil { 235 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 236 } 237 238 var pipeline workflow.RawPipeline 239 for _, e := range workflowDir { 240 if !e.IsFile() { 241 continue 242 } 243 244 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 245 contents, err := gr.RawContent(fpath) 246 if err != nil { 247 continue 248 } 249 250 pipeline = append(pipeline, workflow.RawWorkflow{ 251 Name: e.Name, 252 Contents: contents, 253 }) 254 } 255 256 return pipeline, nil 257} 258 259func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 260 l := log.FromContext(ctx) 261 262 trigger := tangled.Pipeline_PullRequestTriggerData{ 263 Action: "create", 264 SourceBranch: sourceBranch, 265 SourceSha: sourceSha, 266 TargetBranch: targetBranch, 267 } 268 269 compiler := workflow.Compiler{ 270 Trigger: tangled.Pipeline_TriggerMetadata{ 271 Kind: string(workflow.TriggerKindPullRequest), 272 PullRequest: &trigger, 273 Repo: &tangled.Pipeline_TriggerRepo{ 274 Knot: h.c.Server.Hostname, 275 RepoDid: &targetRepo.RepoDid, 276 Did: targetRepo.OwnerDid, 277 Repo: &targetRepo.RepoName, 278 DefaultBranch: targetRepo.DefaultBranch, 279 }, 280 }, 281 } 282 283 l.Info("raw", "raw", rawPipeline) 284 parsed := compiler.Parse(rawPipeline) 285 l.Info("parsed", "parsed", parsed) 286 compiled := compiler.Compile(parsed) 287 288 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 289 290 return compiled 291} 292 293func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 294 raw := json.RawMessage(event.Commit.Record) 295 rkey := event.Commit.RKey 296 did := event.Did 297 298 var record tangled.RepoPull 299 if err := json.Unmarshal(raw, &record); err != nil { 300 return fmt.Errorf("failed to unmarshal record: %w", err) 301 } 302 303 l := log.FromContext(ctx) 304 l = l.With("handler", "processPull") 305 l = l.With("did", did) 306 307 l.Info("validating pull record") 308 targetRepo, err := h.validatePullRecord(ctx, &record) 309 if err != nil { 310 l.Warn("pull record did not validate, skipping...") 311 return err 312 } 313 314 l = l.With("target_repo", record.Target.Repo) 315 l = l.With("target_branch", record.Target.Branch) 316 317 l.Info("fetching latest submission") 318 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 319 if err != nil { 320 return err 321 } 322 323 sha := latestSubmission.SourceRev 324 if sha == "" { 325 return fmt.Errorf("failed to extract source SHA from pull submission") 326 } 327 l = l.With("sha", sha) 328 329 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 330 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 331 if err != nil { 332 return err 333 } 334 335 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 336 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 337 338 // do not run empty pipelines 339 if cp.Workflows == nil { 340 l.Info("skipping empty pipeline") 341 return nil 342 } 343 344 l.Info("marshaling pipeline event") 345 eventJson, err := json.Marshal(cp) 346 if err != nil { 347 return fmt.Errorf("failed to marshal pipeline event: %w", err) 348 } 349 350 ev := db.Event{ 351 Rkey: TID(), 352 Nsid: tangled.PipelineNSID, 353 EventJson: string(eventJson), 354 } 355 356 l.Info("inserting pipeline event") 357 return h.db.InsertEvent(ev, h.n) 358} 359 360// duplicated from add collaborator 361func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 362 raw := json.RawMessage(event.Commit.Record) 363 did := event.Did 364 365 var record tangled.RepoCollaborator 366 if err := json.Unmarshal(raw, &record); err != nil { 367 return fmt.Errorf("failed to unmarshal record: %w", err) 368 } 369 370 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 371 if err != nil || subjectId.Handle.IsInvalidHandle() { 372 return err 373 } 374 375 var rbacResource string 376 switch { 377 case record.RepoDid != nil && *record.RepoDid != "": 378 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(*record.RepoDid) 379 if lookupErr != nil { 380 return fmt.Errorf("unknown repo DID %s: %w", *record.RepoDid, lookupErr) 381 } 382 if ownerDid != did { 383 return fmt.Errorf("collaborator record author %s does not own repo %s", did, *record.RepoDid) 384 } 385 rbacResource = *record.RepoDid 386 387 case record.Repo != nil: 388 // TODO: get rid of this PDS fetch once all repos have DIDs 389 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 390 if parseErr != nil { 391 return parseErr 392 } 393 394 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 395 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 396 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 397 } 398 399 xrpcc := xrpc.Client{ 400 Host: owner.PDSEndpoint(), 401 } 402 403 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 404 if getErr != nil { 405 return getErr 406 } 407 408 repo := resp.Value.Val.(*tangled.Repo) 409 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), repo.Name) 410 if didErr != nil { 411 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), repo.Name, didErr) 412 } 413 rbacResource = repoDid 414 415 default: 416 return fmt.Errorf("collaborator record has neither repo nor repoDid") 417 } 418 419 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 420 if err != nil { 421 return fmt.Errorf("failed to check permissions: %w", err) 422 } 423 if !ok { 424 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 425 } 426 427 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 428 return err 429 } 430 h.jc.AddDid(subjectId.DID.String()) 431 432 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 433 return err 434 } 435 436 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 437} 438 439func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 440 l := log.FromContext(ctx) 441 442 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 443 if err != nil { 444 l.Error("error building endpoint url", "did", did, "error", err.Error()) 445 return fmt.Errorf("error building endpoint url: %w", err) 446 } 447 448 resp, err := http.Get(keysEndpoint) 449 if err != nil { 450 l.Error("error getting keys", "did", did, "error", err) 451 return fmt.Errorf("error getting keys: %w", err) 452 } 453 defer resp.Body.Close() 454 455 if resp.StatusCode == http.StatusNotFound { 456 l.Info("no keys found for did", "did", did) 457 return nil 458 } 459 460 plaintext, err := io.ReadAll(resp.Body) 461 if err != nil { 462 l.Error("error reading response body", "error", err) 463 return fmt.Errorf("error reading response body: %w", err) 464 } 465 466 for key := range strings.SplitSeq(string(plaintext), "\n") { 467 if key == "" { 468 continue 469 } 470 pk := db.PublicKey{ 471 Did: did, 472 } 473 pk.Key = key 474 if err := h.db.AddPublicKey(pk); err != nil { 475 l.Error("failed to add public key", "error", err) 476 return fmt.Errorf("failed to add public key: %w", err) 477 } 478 } 479 return nil 480} 481 482func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 483 var err error 484 switch event.Kind { 485 case jmodels.EventKindIdentity: 486 err = h.resolver.InvalidateIdent(ctx, event.Did) 487 case jmodels.EventKindCommit: 488 switch event.Commit.Collection { 489 case tangled.PublicKeyNSID: 490 err = h.processPublicKey(ctx, event) 491 case tangled.KnotMemberNSID: 492 err = h.processKnotMember(ctx, event) 493 case tangled.RepoPullNSID: 494 err = h.processPull(ctx, event) 495 case tangled.RepoCollaboratorNSID: 496 err = h.processCollaborator(ctx, event) 497 } 498 default: 499 return nil 500 } 501 502 if err != nil { 503 args := []any{"kind", event.Kind, "err", err} 504 if event.Kind == jmodels.EventKindCommit { 505 args = append(args, "nsid", event.Commit.Collection) 506 } 507 h.l.Warn("failed to process event, skipping", args...) 508 } 509 510 lastTimeUs := event.TimeUS + 1 511 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 512 h.l.Error("failed to save cursor", "err", saveErr) 513 } 514 515 return nil 516}