Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 17 jmodels "github.com/bluesky-social/jetstream/pkg/models" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/knotserver/db" 21 "tangled.org/core/knotserver/git" 22 knotxrpc "tangled.org/core/knotserver/xrpc" 23 "tangled.org/core/log" 24 "tangled.org/core/rbac" 25 "tangled.org/core/tid" 26 "tangled.org/core/workflow" 27) 28 29func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 30 l := log.FromContext(ctx) 31 raw := json.RawMessage(event.Commit.Record) 32 did := event.Did 33 34 var record tangled.PublicKey 35 if err := json.Unmarshal(raw, &record); err != nil { 36 return fmt.Errorf("failed to unmarshal record: %w", err) 37 } 38 39 pk := db.PublicKey{ 40 Did: did, 41 PublicKey: record, 42 } 43 if err := h.db.AddPublicKey(pk); err != nil { 44 l.Error("failed to add public key", "error", err) 45 return fmt.Errorf("failed to add public key: %w", err) 46 } 47 l.Info("added public key from firehose", "did", did) 48 return nil 49} 50 51func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 52 l := log.FromContext(ctx) 53 raw := json.RawMessage(event.Commit.Record) 54 did := event.Did 55 56 var record tangled.KnotMember 57 if err := json.Unmarshal(raw, &record); err != nil { 58 return fmt.Errorf("failed to unmarshal record: %w", err) 59 } 60 61 if record.Domain != h.c.Server.Hostname { 62 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 63 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 64 } 65 66 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 67 if err != nil || !ok { 68 l.Error("failed to add member", "did", did) 69 return fmt.Errorf("failed to enforce permissions: %w", err) 70 } 71 72 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 73 l.Error("failed to add member", "error", err) 74 return fmt.Errorf("failed to add member: %w", err) 75 } 76 l.Info("added member from firehose", "member", record.Subject) 77 78 if err := h.db.AddDid(record.Subject); err != nil { 79 l.Error("failed to add did", "error", err) 80 return fmt.Errorf("failed to add did: %w", err) 81 } 82 h.jc.AddDid(record.Subject) 83 84 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 85 return fmt.Errorf("failed to fetch and add keys: %w", err) 86 } 87 88 return nil 89} 90 91// returns a repo path on disk if present, and error if not 92type targetRepo struct { 93 RepoPath string 94 OwnerDid string 95 RepoName string 96 RepoDid string 97 DefaultBranch string // default branch 98} 99 100func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 101 if record.Target == nil { 102 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 103 } 104 105 l := log.FromContext(ctx).With("handler", "validatePullRecord") 106 l = l.With("target_repo", record.Target.Repo) 107 l = l.With("target_branch", record.Target.Branch) 108 109 if record.Source == nil { 110 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 111 } 112 113 if record.Source.Repo != nil { 114 return nil, fmt.Errorf("ignoring pull record: fork based pull") 115 } 116 117 var repoPath, ownerDid, repoName, repoDid string 118 switch { 119 case strings.HasPrefix(record.Target.Repo, "did:"): 120 repoDid = record.Target.Repo 121 var lookupErr error 122 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 123 if lookupErr != nil { 124 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 125 } 126 127 case strings.Contains(record.Target.Repo, "/"): 128 // TODO: get rid of this PDS fetch once all repos have DIDs 129 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 130 if parseErr != nil { 131 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 132 } 133 134 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 135 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 136 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 137 } 138 139 xrpcc := xrpc.Client{ 140 Host: ident.PDSEndpoint(), 141 } 142 143 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 144 if getErr != nil { 145 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 146 } 147 148 repo, ok := resp.Value.Val.(*tangled.Repo) 149 if !ok { 150 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 151 } 152 153 if repo.Knot != h.c.Server.Hostname { 154 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 155 } 156 157 ownerDid = ident.DID.String() 158 repoName = repoAt.RecordKey().String() 159 160 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 161 if didErr != nil { 162 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 163 } 164 165 var lookupErr error 166 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 167 if lookupErr != nil { 168 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 169 } 170 171 default: 172 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 173 } 174 175 gr, err := git.Open(repoPath, record.Source.Branch) 176 if err != nil { 177 return nil, fmt.Errorf("failed to open git repository: %w", err) 178 } 179 180 defaultBranch, _ := gr.FindMainBranch() 181 182 return &targetRepo{ 183 RepoPath: repoPath, 184 OwnerDid: ownerDid, 185 RepoName: repoName, 186 RepoDid: repoDid, 187 DefaultBranch: defaultBranch, 188 }, nil 189} 190 191func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 192 // resolve the PR owner's identity to fetch the blob from their PDS 193 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 194 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 195 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 196 } 197 198 if len(record.Rounds) == 0 { 199 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 200 } 201 202 roundNumber := len(record.Rounds) - 1 203 round := record.Rounds[roundNumber] 204 205 // fetch the blob from the PR owner's PDS 206 prOwnerPds := prOwnerIdent.PDSEndpoint() 207 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 208 if err != nil { 209 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 210 } 211 q := blobUrl.Query() 212 q.Set("cid", round.PatchBlob.Ref.String()) 213 q.Set("did", did) 214 blobUrl.RawQuery = q.Encode() 215 216 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 217 if err != nil { 218 return nil, fmt.Errorf("failed to create blob request: %w", err) 219 } 220 req.Header.Set("Content-Type", "application/json") 221 222 blobResp, err := http.DefaultClient.Do(req) 223 if err != nil { 224 return nil, fmt.Errorf("failed to fetch blob: %w", err) 225 } 226 defer blobResp.Body.Close() 227 228 blob := io.ReadCloser(blobResp.Body) 229 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 230 if err != nil { 231 return nil, fmt.Errorf("failed to parse submission: %w", err) 232 } 233 234 return latestSubmission, nil 235} 236 237func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 238 gr, err := git.Open(repoPath, sha) 239 if err != nil { 240 return nil, fmt.Errorf("failed to open git repository: %w", err) 241 } 242 243 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 244 if err != nil { 245 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 246 } 247 248 var pipeline workflow.RawPipeline 249 for _, e := range workflowDir { 250 if !e.IsFile() { 251 continue 252 } 253 254 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 255 contents, err := gr.RawContent(fpath) 256 if err != nil { 257 continue 258 } 259 260 pipeline = append(pipeline, workflow.RawWorkflow{ 261 Name: e.Name, 262 Contents: contents, 263 }) 264 } 265 266 return pipeline, nil 267} 268 269func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 270 l := log.FromContext(ctx) 271 272 trigger := tangled.Pipeline_PullRequestTriggerData{ 273 Action: "create", 274 SourceBranch: sourceBranch, 275 SourceSha: sourceSha, 276 TargetBranch: targetBranch, 277 } 278 279 compiler := workflow.Compiler{ 280 Trigger: tangled.Pipeline_TriggerMetadata{ 281 Kind: string(workflow.TriggerKindPullRequest), 282 PullRequest: &trigger, 283 Repo: &tangled.Pipeline_TriggerRepo{ 284 Knot: h.c.Server.Hostname, 285 RepoDid: &targetRepo.RepoDid, 286 Did: targetRepo.OwnerDid, 287 Repo: &targetRepo.RepoName, 288 DefaultBranch: targetRepo.DefaultBranch, 289 }, 290 }, 291 } 292 293 l.Info("raw", "raw", rawPipeline) 294 parsed := compiler.Parse(rawPipeline) 295 l.Info("parsed", "parsed", parsed) 296 compiled := compiler.Compile(parsed) 297 298 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 299 300 return compiled 301} 302 303func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 304 raw := json.RawMessage(event.Commit.Record) 305 rkey := event.Commit.RKey 306 did := event.Did 307 308 var record tangled.RepoPull 309 if err := json.Unmarshal(raw, &record); err != nil { 310 return fmt.Errorf("failed to unmarshal record: %w", err) 311 } 312 313 l := log.FromContext(ctx) 314 l = l.With("handler", "processPull") 315 l = l.With("did", did) 316 317 l.Info("validating pull record") 318 targetRepo, err := h.validatePullRecord(ctx, &record) 319 if err != nil { 320 l.Warn("pull record did not validate, skipping...") 321 return err 322 } 323 324 l = l.With("target_repo", record.Target.Repo) 325 l = l.With("target_branch", record.Target.Branch) 326 327 l.Info("fetching latest submission") 328 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 329 if err != nil { 330 return err 331 } 332 333 sha := latestSubmission.SourceRev 334 if sha == "" { 335 return fmt.Errorf("failed to extract source SHA from pull submission") 336 } 337 l = l.With("sha", sha) 338 339 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 340 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 341 if err != nil { 342 return err 343 } 344 345 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 346 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 347 348 // do not run empty pipelines 349 if cp.Workflows == nil { 350 l.Info("skipping empty pipeline") 351 return nil 352 } 353 354 l.Info("marshaling pipeline event") 355 eventJson, err := json.Marshal(cp) 356 if err != nil { 357 return fmt.Errorf("failed to marshal pipeline event: %w", err) 358 } 359 360 ev := db.Event{ 361 Rkey: tid.TID(), 362 Nsid: tangled.PipelineNSID, 363 EventJson: string(eventJson), 364 } 365 366 l.Info("inserting pipeline event") 367 return h.db.InsertEvent(ev, h.n) 368} 369 370// duplicated from add collaborator 371func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 372 raw := json.RawMessage(event.Commit.Record) 373 did := event.Did 374 375 var record tangled.RepoCollaborator 376 if err := json.Unmarshal(raw, &record); err != nil { 377 return fmt.Errorf("failed to unmarshal record: %w", err) 378 } 379 380 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 381 if err != nil || subjectId.Handle.IsInvalidHandle() { 382 return err 383 } 384 385 var rbacResource string 386 switch { 387 case strings.HasPrefix(record.Repo, "did:"): 388 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo) 389 if lookupErr != nil { 390 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr) 391 } 392 if ownerDid != did { 393 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo) 394 } 395 rbacResource = record.Repo 396 397 case strings.Contains(record.Repo, "/"): 398 // TODO: get rid of this PDS fetch once all repos have DIDs 399 repoAt, parseErr := syntax.ParseATURI(record.Repo) 400 if parseErr != nil { 401 return parseErr 402 } 403 404 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 405 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 406 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 407 } 408 409 xrpcc := xrpc.Client{ 410 Host: owner.PDSEndpoint(), 411 } 412 413 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 414 if getErr != nil { 415 return getErr 416 } 417 418 if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 419 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 420 } 421 rkey := repoAt.RecordKey().String() 422 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey) 423 if didErr != nil { 424 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr) 425 } 426 rbacResource = repoDid 427 428 default: 429 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo) 430 } 431 432 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 433 if err != nil { 434 return fmt.Errorf("failed to check permissions: %w", err) 435 } 436 if !ok { 437 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 438 } 439 440 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 441 return err 442 } 443 h.jc.AddDid(subjectId.DID.String()) 444 445 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 446 return err 447 } 448 449 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 450} 451 452func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 453 l := log.FromContext(ctx) 454 455 id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did)) 456 if err != nil { 457 return fmt.Errorf("lookup did to fetch keys: %w", err) 458 } 459 460 serviceEndpoint, ok := id.Services["atproto_pds"] 461 if !ok { 462 l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did) 463 return nil 464 } 465 466 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL} 467 resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false) 468 if err != nil { 469 return fmt.Errorf("fetching public keys for did: %w", err) 470 } 471 472 for _, record := range resp.Records { 473 if record == nil { 474 continue 475 } 476 key := record.Value.Val.(*tangled.PublicKey) 477 if key == nil { 478 continue 479 } 480 pk := db.PublicKey{ 481 Did: did, 482 PublicKey: *key, 483 } 484 err = h.db.AddPublicKey(pk) 485 if err != nil { 486 return fmt.Errorf("adding public key to db: %w", err) 487 } 488 } 489 return nil 490} 491 492func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 493 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 494 495 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 496 if rkey == "" { 497 return nil 498 } 499 500 if event.Commit.Operation == jmodels.CommitOperationDelete { 501 return nil 502 } 503 504 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 505 return nil 506 } 507 508 raw := json.RawMessage(event.Commit.Record) 509 var record tangled.Repo 510 if err := json.Unmarshal(raw, &record); err != nil { 511 return fmt.Errorf("failed to unmarshal repo record: %w", err) 512 } 513 514 if record.Knot != h.c.Server.Hostname { 515 return nil 516 } 517 if record.RepoDid == nil || *record.RepoDid == "" { 518 l.Info("skipping repo event without repoDid") 519 return nil 520 } 521 repoDid := *record.RepoDid 522 523 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 524 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 525 return nil 526 } 527 528 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 529 if lookupErr != nil { 530 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 531 return nil 532 } 533 if ownerDid != event.Did { 534 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 535 return nil 536 } 537 538 alias := db.RepoAlias{ 539 OwnerDid: event.Did, 540 Rkey: rkey, 541 RepoDid: repoDid, 542 Rev: event.Commit.Rev, 543 } 544 if err := h.db.UpsertRepoAlias(alias); err != nil { 545 l.Warn("failed to upsert repo alias", "err", err) 546 return nil 547 } 548 549 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 550 return nil 551} 552 553func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 554 var err error 555 switch event.Kind { 556 case jmodels.EventKindIdentity: 557 err = h.resolver.InvalidateIdent(ctx, event.Did) 558 case jmodels.EventKindCommit: 559 switch event.Commit.Collection { 560 case tangled.PublicKeyNSID: 561 err = h.processPublicKey(ctx, event) 562 case tangled.KnotMemberNSID: 563 err = h.processKnotMember(ctx, event) 564 case tangled.RepoNSID: 565 err = h.processRepo(ctx, event) 566 case tangled.RepoPullNSID: 567 err = h.processPull(ctx, event) 568 case tangled.RepoCollaboratorNSID: 569 err = h.processCollaborator(ctx, event) 570 } 571 default: 572 return nil 573 } 574 575 if err != nil { 576 args := []any{"kind", event.Kind, "err", err} 577 if event.Kind == jmodels.EventKindCommit { 578 args = append(args, "nsid", event.Commit.Collection) 579 } 580 h.l.Warn("failed to process event, skipping", args...) 581 } 582 583 lastTimeUs := event.TimeUS + 1 584 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 585 h.l.Error("failed to save cursor", "err", saveErr) 586 } 587 588 return nil 589}