Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/knotserver/db" 20 "tangled.org/core/knotserver/git" 21 knotxrpc "tangled.org/core/knotserver/xrpc" 22 "tangled.org/core/log" 23 "tangled.org/core/rbac" 24 "tangled.org/core/workflow" 25) 26 27func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 28 l := log.FromContext(ctx) 29 raw := json.RawMessage(event.Commit.Record) 30 did := event.Did 31 32 var record tangled.PublicKey 33 if err := json.Unmarshal(raw, &record); err != nil { 34 return fmt.Errorf("failed to unmarshal record: %w", err) 35 } 36 37 pk := db.PublicKey{ 38 Did: did, 39 PublicKey: record, 40 } 41 if err := h.db.AddPublicKey(pk); err != nil { 42 l.Error("failed to add public key", "error", err) 43 return fmt.Errorf("failed to add public key: %w", err) 44 } 45 l.Info("added public key from firehose", "did", did) 46 return nil 47} 48 49func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 50 l := log.FromContext(ctx) 51 raw := json.RawMessage(event.Commit.Record) 52 did := event.Did 53 54 var record tangled.KnotMember 55 if err := json.Unmarshal(raw, &record); err != nil { 56 return fmt.Errorf("failed to unmarshal record: %w", err) 57 } 58 59 if record.Domain != h.c.Server.Hostname { 60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 62 } 63 64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 65 if err != nil || !ok { 66 l.Error("failed to add member", "did", did) 67 return fmt.Errorf("failed to enforce permissions: %w", err) 68 } 69 70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 71 l.Error("failed to add member", "error", err) 72 return fmt.Errorf("failed to add member: %w", err) 73 } 74 l.Info("added member from firehose", "member", record.Subject) 75 76 if err := h.db.AddDid(record.Subject); err != nil { 77 l.Error("failed to add did", "error", err) 78 return fmt.Errorf("failed to add did: %w", err) 79 } 80 h.jc.AddDid(record.Subject) 81 82 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 83 return fmt.Errorf("failed to fetch and add keys: %w", err) 84 } 85 86 return nil 87} 88 89// returns a repo path on disk if present, and error if not 90type targetRepo struct { 91 RepoPath string 92 OwnerDid string 93 RepoName string 94 RepoDid string 95 DefaultBranch string // default branch 96} 97 98func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 99 if record.Target == nil { 100 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 101 } 102 103 l := log.FromContext(ctx).With("handler", "validatePullRecord") 104 l = l.With("target_repo", record.Target.Repo) 105 l = l.With("target_branch", record.Target.Branch) 106 107 if record.Source == nil { 108 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 109 } 110 111 if record.Source.Repo != nil { 112 return nil, fmt.Errorf("ignoring pull record: fork based pull") 113 } 114 115 var repoPath, ownerDid, repoName, repoDid string 116 switch { 117 case strings.HasPrefix(record.Target.Repo, "did:"): 118 repoDid = record.Target.Repo 119 var lookupErr error 120 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 121 if lookupErr != nil { 122 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 123 } 124 125 case strings.Contains(record.Target.Repo, "/"): 126 // TODO: get rid of this PDS fetch once all repos have DIDs 127 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 128 if parseErr != nil { 129 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 130 } 131 132 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 133 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 134 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 135 } 136 137 xrpcc := xrpc.Client{ 138 Host: ident.PDSEndpoint(), 139 } 140 141 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 142 if getErr != nil { 143 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 144 } 145 146 repo, ok := resp.Value.Val.(*tangled.Repo) 147 if !ok { 148 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 149 } 150 151 if repo.Knot != h.c.Server.Hostname { 152 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 153 } 154 155 ownerDid = ident.DID.String() 156 repoName = repoAt.RecordKey().String() 157 158 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 159 if didErr != nil { 160 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 161 } 162 163 var lookupErr error 164 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 165 if lookupErr != nil { 166 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 167 } 168 169 default: 170 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 171 } 172 173 gr, err := git.Open(repoPath, record.Source.Branch) 174 if err != nil { 175 return nil, fmt.Errorf("failed to open git repository: %w", err) 176 } 177 178 defaultBranch, _ := gr.FindMainBranch() 179 180 return &targetRepo{ 181 RepoPath: repoPath, 182 OwnerDid: ownerDid, 183 RepoName: repoName, 184 RepoDid: repoDid, 185 DefaultBranch: defaultBranch, 186 }, nil 187} 188 189func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 190 // resolve the PR owner's identity to fetch the blob from their PDS 191 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 192 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 193 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 194 } 195 196 if len(record.Rounds) == 0 { 197 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 198 } 199 200 roundNumber := len(record.Rounds) - 1 201 round := record.Rounds[roundNumber] 202 203 // fetch the blob from the PR owner's PDS 204 prOwnerPds := prOwnerIdent.PDSEndpoint() 205 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 206 if err != nil { 207 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 208 } 209 q := blobUrl.Query() 210 q.Set("cid", round.PatchBlob.Ref.String()) 211 q.Set("did", did) 212 blobUrl.RawQuery = q.Encode() 213 214 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 215 if err != nil { 216 return nil, fmt.Errorf("failed to create blob request: %w", err) 217 } 218 req.Header.Set("Content-Type", "application/json") 219 220 blobResp, err := http.DefaultClient.Do(req) 221 if err != nil { 222 return nil, fmt.Errorf("failed to fetch blob: %w", err) 223 } 224 defer blobResp.Body.Close() 225 226 blob := io.ReadCloser(blobResp.Body) 227 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 228 if err != nil { 229 return nil, fmt.Errorf("failed to parse submission: %w", err) 230 } 231 232 return latestSubmission, nil 233} 234 235func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 236 gr, err := git.Open(repoPath, sha) 237 if err != nil { 238 return nil, fmt.Errorf("failed to open git repository: %w", err) 239 } 240 241 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 242 if err != nil { 243 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 244 } 245 246 var pipeline workflow.RawPipeline 247 for _, e := range workflowDir { 248 if !e.IsFile() { 249 continue 250 } 251 252 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 253 contents, err := gr.RawContent(fpath) 254 if err != nil { 255 continue 256 } 257 258 pipeline = append(pipeline, workflow.RawWorkflow{ 259 Name: e.Name, 260 Contents: contents, 261 }) 262 } 263 264 return pipeline, nil 265} 266 267func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 268 l := log.FromContext(ctx) 269 270 trigger := tangled.Pipeline_PullRequestTriggerData{ 271 Action: "create", 272 SourceBranch: sourceBranch, 273 SourceSha: sourceSha, 274 TargetBranch: targetBranch, 275 } 276 277 compiler := workflow.Compiler{ 278 Trigger: tangled.Pipeline_TriggerMetadata{ 279 Kind: string(workflow.TriggerKindPullRequest), 280 PullRequest: &trigger, 281 Repo: &tangled.Pipeline_TriggerRepo{ 282 Knot: h.c.Server.Hostname, 283 RepoDid: &targetRepo.RepoDid, 284 Did: targetRepo.OwnerDid, 285 Repo: &targetRepo.RepoName, 286 DefaultBranch: targetRepo.DefaultBranch, 287 }, 288 }, 289 } 290 291 l.Info("raw", "raw", rawPipeline) 292 parsed := compiler.Parse(rawPipeline) 293 l.Info("parsed", "parsed", parsed) 294 compiled := compiler.Compile(parsed) 295 296 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 297 298 return compiled 299} 300 301func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 302 raw := json.RawMessage(event.Commit.Record) 303 rkey := event.Commit.RKey 304 did := event.Did 305 306 var record tangled.RepoPull 307 if err := json.Unmarshal(raw, &record); err != nil { 308 return fmt.Errorf("failed to unmarshal record: %w", err) 309 } 310 311 l := log.FromContext(ctx) 312 l = l.With("handler", "processPull") 313 l = l.With("did", did) 314 315 l.Info("validating pull record") 316 targetRepo, err := h.validatePullRecord(ctx, &record) 317 if err != nil { 318 l.Warn("pull record did not validate, skipping...") 319 return err 320 } 321 322 l = l.With("target_repo", record.Target.Repo) 323 l = l.With("target_branch", record.Target.Branch) 324 325 l.Info("fetching latest submission") 326 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 327 if err != nil { 328 return err 329 } 330 331 sha := latestSubmission.SourceRev 332 if sha == "" { 333 return fmt.Errorf("failed to extract source SHA from pull submission") 334 } 335 l = l.With("sha", sha) 336 337 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 338 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 339 if err != nil { 340 return err 341 } 342 343 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 344 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 345 346 // do not run empty pipelines 347 if cp.Workflows == nil { 348 l.Info("skipping empty pipeline") 349 return nil 350 } 351 352 l.Info("marshaling pipeline event") 353 eventJson, err := json.Marshal(cp) 354 if err != nil { 355 return fmt.Errorf("failed to marshal pipeline event: %w", err) 356 } 357 358 ev := db.Event{ 359 Rkey: TID(), 360 Nsid: tangled.PipelineNSID, 361 EventJson: string(eventJson), 362 } 363 364 l.Info("inserting pipeline event") 365 return h.db.InsertEvent(ev, h.n) 366} 367 368// duplicated from add collaborator 369func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 370 raw := json.RawMessage(event.Commit.Record) 371 did := event.Did 372 373 var record tangled.RepoCollaborator 374 if err := json.Unmarshal(raw, &record); err != nil { 375 return fmt.Errorf("failed to unmarshal record: %w", err) 376 } 377 378 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 379 if err != nil || subjectId.Handle.IsInvalidHandle() { 380 return err 381 } 382 383 var rbacResource string 384 switch { 385 case strings.HasPrefix(record.Repo, "did:"): 386 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo) 387 if lookupErr != nil { 388 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr) 389 } 390 if ownerDid != did { 391 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo) 392 } 393 rbacResource = record.Repo 394 395 case strings.Contains(record.Repo, "/"): 396 // TODO: get rid of this PDS fetch once all repos have DIDs 397 repoAt, parseErr := syntax.ParseATURI(record.Repo) 398 if parseErr != nil { 399 return parseErr 400 } 401 402 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 403 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 404 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 405 } 406 407 xrpcc := xrpc.Client{ 408 Host: owner.PDSEndpoint(), 409 } 410 411 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 412 if getErr != nil { 413 return getErr 414 } 415 416 if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 417 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 418 } 419 rkey := repoAt.RecordKey().String() 420 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey) 421 if didErr != nil { 422 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr) 423 } 424 rbacResource = repoDid 425 426 default: 427 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo) 428 } 429 430 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 431 if err != nil { 432 return fmt.Errorf("failed to check permissions: %w", err) 433 } 434 if !ok { 435 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 436 } 437 438 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 439 return err 440 } 441 h.jc.AddDid(subjectId.DID.String()) 442 443 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 444 return err 445 } 446 447 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 448} 449 450func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 451 l := log.FromContext(ctx) 452 453 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 454 if err != nil { 455 l.Error("error building endpoint url", "did", did, "error", err.Error()) 456 return fmt.Errorf("error building endpoint url: %w", err) 457 } 458 459 resp, err := http.Get(keysEndpoint) 460 if err != nil { 461 l.Error("error getting keys", "did", did, "error", err) 462 return fmt.Errorf("error getting keys: %w", err) 463 } 464 defer resp.Body.Close() 465 466 if resp.StatusCode == http.StatusNotFound { 467 l.Info("no keys found for did", "did", did) 468 return nil 469 } 470 471 plaintext, err := io.ReadAll(resp.Body) 472 if err != nil { 473 l.Error("error reading response body", "error", err) 474 return fmt.Errorf("error reading response body: %w", err) 475 } 476 477 for key := range strings.SplitSeq(string(plaintext), "\n") { 478 if key == "" { 479 continue 480 } 481 pk := db.PublicKey{ 482 Did: did, 483 } 484 pk.Key = key 485 if err := h.db.AddPublicKey(pk); err != nil { 486 l.Error("failed to add public key", "error", err) 487 return fmt.Errorf("failed to add public key: %w", err) 488 } 489 } 490 return nil 491} 492 493func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 494 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 495 496 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 497 if rkey == "" { 498 return nil 499 } 500 501 if event.Commit.Operation == jmodels.CommitOperationDelete { 502 if err := h.db.DeleteRepoAlias(event.Did, rkey); err != nil { 503 l.Warn("failed to delete repo alias", "err", err) 504 } 505 return nil 506 } 507 508 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 509 return nil 510 } 511 512 raw := json.RawMessage(event.Commit.Record) 513 var record tangled.Repo 514 if err := json.Unmarshal(raw, &record); err != nil { 515 return fmt.Errorf("failed to unmarshal repo record: %w", err) 516 } 517 518 if record.Knot != h.c.Server.Hostname { 519 return nil 520 } 521 if record.RepoDid == nil || *record.RepoDid == "" { 522 l.Info("skipping repo event without repoDid") 523 return nil 524 } 525 repoDid := *record.RepoDid 526 527 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 528 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 529 return nil 530 } 531 532 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 533 if lookupErr != nil { 534 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 535 return nil 536 } 537 if ownerDid != event.Did { 538 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 539 return nil 540 } 541 542 alias := db.RepoAlias{ 543 OwnerDid: event.Did, 544 Rkey: rkey, 545 RepoDid: repoDid, 546 Rev: event.Commit.Rev, 547 } 548 if err := h.db.UpsertRepoAlias(alias); err != nil { 549 l.Warn("failed to upsert repo alias", "err", err) 550 return nil 551 } 552 553 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 554 return nil 555} 556 557func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 558 var err error 559 switch event.Kind { 560 case jmodels.EventKindIdentity: 561 err = h.resolver.InvalidateIdent(ctx, event.Did) 562 case jmodels.EventKindCommit: 563 switch event.Commit.Collection { 564 case tangled.PublicKeyNSID: 565 err = h.processPublicKey(ctx, event) 566 case tangled.KnotMemberNSID: 567 err = h.processKnotMember(ctx, event) 568 case tangled.RepoNSID: 569 err = h.processRepo(ctx, event) 570 case tangled.RepoPullNSID: 571 err = h.processPull(ctx, event) 572 case tangled.RepoCollaboratorNSID: 573 err = h.processCollaborator(ctx, event) 574 } 575 default: 576 return nil 577 } 578 579 if err != nil { 580 args := []any{"kind", event.Kind, "err", err} 581 if event.Kind == jmodels.EventKindCommit { 582 args = append(args, "nsid", event.Commit.Collection) 583 } 584 h.l.Warn("failed to process event, skipping", args...) 585 } 586 587 lastTimeUs := event.TimeUS + 1 588 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 589 h.l.Error("failed to save cursor", "err", saveErr) 590 } 591 592 return nil 593}