Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "net/http" 11 "net/url" 12 "path/filepath" 13 "strings" 14 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/xrpc" 18 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 19 jmodels "github.com/bluesky-social/jetstream/pkg/models" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/knotserver/db" 23 "tangled.org/core/knotserver/git" 24 knotxrpc "tangled.org/core/knotserver/xrpc" 25 "tangled.org/core/log" 26 "tangled.org/core/rbac" 27 "tangled.org/core/tid" 28 "tangled.org/core/workflow" 29) 30 31func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 32 l := log.FromContext(ctx) 33 raw := json.RawMessage(event.Commit.Record) 34 did := event.Did 35 36 var record tangled.PublicKey 37 if err := json.Unmarshal(raw, &record); err != nil { 38 return fmt.Errorf("failed to unmarshal record: %w", err) 39 } 40 41 pk := db.PublicKey{ 42 Did: did, 43 PublicKey: record, 44 } 45 if err := h.db.AddPublicKey(pk); err != nil { 46 l.Error("failed to add public key", "error", err) 47 return fmt.Errorf("failed to add public key: %w", err) 48 } 49 l.Info("added public key from firehose", "did", did) 50 return nil 51} 52 53func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 54 did := event.Did 55 rkey := event.Commit.RKey 56 l := log.FromContext(ctx).With("handler", "processKnotMember", "did", did, "rkey", rkey) 57 58 switch event.Commit.Operation { 59 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 60 raw := json.RawMessage(event.Commit.Record) 61 var record tangled.KnotMember 62 if err := json.Unmarshal(raw, &record); err != nil { 63 return fmt.Errorf("failed to unmarshal record: %w", err) 64 } 65 66 if record.Domain != h.c.Server.Hostname { 67 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 68 } 69 70 subject, err := syntax.ParseDID(record.Subject) 71 if err != nil { 72 return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err) 73 } 74 75 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 76 if err != nil { 77 return fmt.Errorf("failed to enforce permissions: %w", err) 78 } 79 if !ok { 80 return fmt.Errorf("permission denied for %s", did) 81 } 82 83 sqlTx, err := h.db.BeginTx(ctx, nil) 84 if err != nil { 85 return fmt.Errorf("failed to start txn: %w", err) 86 } 87 committed := false 88 defer func() { 89 if !committed { 90 sqlTx.Rollback() 91 } 92 }() 93 94 existing, err := db.GetKnotMember(sqlTx, did, rkey) 95 if err != nil && !errors.Is(err, sql.ErrNoRows) { 96 return fmt.Errorf("failed to look up existing member: %w", err) 97 } 98 99 var staleSubject string 100 if existing != nil && existing.Subject != subject { 101 staleSubject = existing.Subject.String() 102 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 103 return fmt.Errorf("failed to remove stale member row: %w", err) 104 } 105 } 106 107 if err := db.AddKnotMember(sqlTx, db.KnotMember{ 108 Did: syntax.DID(did), 109 Rkey: rkey, 110 Subject: subject, 111 }); err != nil { 112 return fmt.Errorf("failed to persist member row: %w", err) 113 } 114 115 if err := db.AddDid(sqlTx, subject.String()); err != nil { 116 return fmt.Errorf("failed to add did: %w", err) 117 } 118 119 dropStaleAcl := false 120 var staleDidDropped bool 121 if staleSubject != "" { 122 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 123 if err != nil { 124 return fmt.Errorf("failed to count stale subject rows: %w", err) 125 } 126 if remaining == 0 { 127 dropStaleAcl = true 128 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 129 if err != nil { 130 return fmt.Errorf("failed to check residual policies for stale subject: %w", err) 131 } 132 if !stillNeeded { 133 if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 134 return fmt.Errorf("failed to remove stale did: %w", err) 135 } 136 staleDidDropped = true 137 } 138 } 139 l.Info("replaced stale knot member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped) 140 } 141 142 if err := sqlTx.Commit(); err != nil { 143 return fmt.Errorf("failed to commit txn: %w", err) 144 } 145 committed = true 146 147 if dropStaleAcl { 148 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 149 l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err) 150 } 151 } 152 if _, err := h.e.TryAddKnotMember(rbac.ThisServer, subject.String()); err != nil { 153 l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err) 154 } 155 156 if staleDidDropped { 157 h.jc.RemoveDid(staleSubject) 158 } 159 h.jc.AddDid(subject.String()) 160 l.Info("added member from firehose", "member", subject) 161 162 if err := h.fetchAndAddKeys(ctx, subject.String()); err != nil { 163 return fmt.Errorf("failed to fetch and add keys: %w", err) 164 } 165 return nil 166 167 case jmodels.CommitOperationDelete: 168 sqlTx, err := h.db.BeginTx(ctx, nil) 169 if err != nil { 170 return fmt.Errorf("failed to start txn: %w", err) 171 } 172 committed := false 173 defer func() { 174 if !committed { 175 sqlTx.Rollback() 176 } 177 }() 178 179 member, err := db.GetKnotMember(sqlTx, did, rkey) 180 if errors.Is(err, sql.ErrNoRows) { 181 l.Info("knot member already removed") 182 return nil 183 } 184 if err != nil { 185 return fmt.Errorf("failed to look up knot member: %w", err) 186 } 187 188 staleSubject := member.Subject.String() 189 190 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 191 return fmt.Errorf("failed to remove member row: %w", err) 192 } 193 194 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 195 if err != nil { 196 return fmt.Errorf("failed to count remaining member rows: %w", err) 197 } 198 199 dropAcl := false 200 var staleDidDropped bool 201 if remaining == 0 { 202 dropAcl = true 203 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 204 if err != nil { 205 return fmt.Errorf("failed to check residual policies: %w", err) 206 } 207 if !stillNeeded { 208 if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 209 return fmt.Errorf("failed to remove did: %w", err) 210 } 211 staleDidDropped = true 212 } 213 } 214 215 if err := sqlTx.Commit(); err != nil { 216 return fmt.Errorf("failed to commit txn: %w", err) 217 } 218 committed = true 219 220 if dropAcl { 221 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 222 l.Error("post-commit: failed to remove ACL", "subject", staleSubject, "err", err) 223 } 224 } 225 226 if staleDidDropped { 227 h.jc.RemoveDid(staleSubject) 228 } 229 l.Info("removed knot member from firehose", "member", member.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped) 230 return nil 231 } 232 233 return nil 234} 235 236// returns a repo path on disk if present, and error if not 237type targetRepo struct { 238 RepoPath string 239 OwnerDid string 240 RepoName string 241 RepoDid string 242 DefaultBranch string // default branch 243} 244 245func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 246 if record.Target == nil { 247 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 248 } 249 250 l := log.FromContext(ctx).With("handler", "validatePullRecord") 251 l = l.With("target_repo", record.Target.Repo) 252 l = l.With("target_branch", record.Target.Branch) 253 254 if record.Source == nil { 255 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 256 } 257 258 if record.Source.Repo != nil { 259 return nil, fmt.Errorf("ignoring pull record: fork based pull") 260 } 261 262 var repoPath, ownerDid, repoName, repoDid string 263 switch { 264 case strings.HasPrefix(record.Target.Repo, "did:"): 265 repoDid = record.Target.Repo 266 var lookupErr error 267 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 268 if lookupErr != nil { 269 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 270 } 271 272 case strings.Contains(record.Target.Repo, "/"): 273 // TODO: get rid of this PDS fetch once all repos have DIDs 274 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 275 if parseErr != nil { 276 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 277 } 278 279 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 280 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 281 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 282 } 283 284 xrpcc := xrpc.Client{ 285 Host: ident.PDSEndpoint(), 286 } 287 288 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 289 if getErr != nil { 290 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 291 } 292 293 repo, ok := resp.Value.Val.(*tangled.Repo) 294 if !ok { 295 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 296 } 297 298 if repo.Knot != h.c.Server.Hostname { 299 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 300 } 301 302 ownerDid = ident.DID.String() 303 repoName = repoAt.RecordKey().String() 304 305 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 306 if didErr != nil { 307 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 308 } 309 310 var lookupErr error 311 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 312 if lookupErr != nil { 313 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 314 } 315 316 default: 317 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 318 } 319 320 gr, err := git.Open(repoPath, record.Source.Branch) 321 if err != nil { 322 return nil, fmt.Errorf("failed to open git repository: %w", err) 323 } 324 325 defaultBranch, _ := gr.FindMainBranch() 326 327 return &targetRepo{ 328 RepoPath: repoPath, 329 OwnerDid: ownerDid, 330 RepoName: repoName, 331 RepoDid: repoDid, 332 DefaultBranch: defaultBranch, 333 }, nil 334} 335 336func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 337 // resolve the PR owner's identity to fetch the blob from their PDS 338 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 339 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 340 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 341 } 342 343 if len(record.Rounds) == 0 { 344 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 345 } 346 347 roundNumber := len(record.Rounds) - 1 348 round := record.Rounds[roundNumber] 349 350 // fetch the blob from the PR owner's PDS 351 prOwnerPds := prOwnerIdent.PDSEndpoint() 352 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 353 if err != nil { 354 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 355 } 356 q := blobUrl.Query() 357 q.Set("cid", round.PatchBlob.Ref.String()) 358 q.Set("did", did) 359 blobUrl.RawQuery = q.Encode() 360 361 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 362 if err != nil { 363 return nil, fmt.Errorf("failed to create blob request: %w", err) 364 } 365 req.Header.Set("Content-Type", "application/json") 366 367 blobResp, err := http.DefaultClient.Do(req) 368 if err != nil { 369 return nil, fmt.Errorf("failed to fetch blob: %w", err) 370 } 371 defer blobResp.Body.Close() 372 373 blob := io.ReadCloser(blobResp.Body) 374 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 375 if err != nil { 376 return nil, fmt.Errorf("failed to parse submission: %w", err) 377 } 378 379 return latestSubmission, nil 380} 381 382func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 383 gr, err := git.Open(repoPath, sha) 384 if err != nil { 385 return nil, fmt.Errorf("failed to open git repository: %w", err) 386 } 387 388 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 389 if err != nil { 390 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 391 } 392 393 var pipeline workflow.RawPipeline 394 for _, e := range workflowDir { 395 if !e.IsFile() { 396 continue 397 } 398 399 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 400 contents, err := gr.RawContent(fpath) 401 if err != nil { 402 continue 403 } 404 405 pipeline = append(pipeline, workflow.RawWorkflow{ 406 Name: e.Name, 407 Contents: contents, 408 }) 409 } 410 411 return pipeline, nil 412} 413 414func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 415 l := log.FromContext(ctx) 416 417 trigger := tangled.Pipeline_PullRequestTriggerData{ 418 Action: "create", 419 SourceBranch: sourceBranch, 420 SourceSha: sourceSha, 421 TargetBranch: targetBranch, 422 } 423 424 compiler := workflow.Compiler{ 425 Trigger: tangled.Pipeline_TriggerMetadata{ 426 Kind: string(workflow.TriggerKindPullRequest), 427 PullRequest: &trigger, 428 Repo: &tangled.Pipeline_TriggerRepo{ 429 Knot: h.c.Server.Hostname, 430 RepoDid: &targetRepo.RepoDid, 431 Did: targetRepo.OwnerDid, 432 Repo: &targetRepo.RepoName, 433 DefaultBranch: targetRepo.DefaultBranch, 434 }, 435 }, 436 } 437 438 l.Info("raw", "raw", rawPipeline) 439 parsed := compiler.Parse(rawPipeline) 440 l.Info("parsed", "parsed", parsed) 441 compiled := compiler.Compile(parsed) 442 443 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 444 445 return compiled 446} 447 448func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 449 raw := json.RawMessage(event.Commit.Record) 450 rkey := event.Commit.RKey 451 did := event.Did 452 453 var record tangled.RepoPull 454 if err := json.Unmarshal(raw, &record); err != nil { 455 return fmt.Errorf("failed to unmarshal record: %w", err) 456 } 457 458 l := log.FromContext(ctx) 459 l = l.With("handler", "processPull") 460 l = l.With("did", did) 461 462 l.Info("validating pull record") 463 targetRepo, err := h.validatePullRecord(ctx, &record) 464 if err != nil { 465 l.Warn("pull record did not validate, skipping...") 466 return err 467 } 468 469 l = l.With("target_repo", record.Target.Repo) 470 l = l.With("target_branch", record.Target.Branch) 471 472 l.Info("fetching latest submission") 473 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 474 if err != nil { 475 return err 476 } 477 478 sha := latestSubmission.SourceRev 479 if sha == "" { 480 return fmt.Errorf("failed to extract source SHA from pull submission") 481 } 482 l = l.With("sha", sha) 483 484 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 485 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 486 if err != nil { 487 return err 488 } 489 490 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 491 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 492 493 // do not run empty pipelines 494 if cp.Workflows == nil { 495 l.Info("skipping empty pipeline") 496 return nil 497 } 498 499 l.Info("marshaling pipeline event") 500 eventJson, err := json.Marshal(cp) 501 if err != nil { 502 return fmt.Errorf("failed to marshal pipeline event: %w", err) 503 } 504 505 ev := db.Event{ 506 Rkey: tid.TID(), 507 Nsid: tangled.PipelineNSID, 508 EventJson: string(eventJson), 509 } 510 511 l.Info("inserting pipeline event") 512 return h.db.InsertEvent(ev, h.n) 513} 514 515// duplicated from add collaborator 516func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 517 raw := json.RawMessage(event.Commit.Record) 518 did := event.Did 519 520 var record tangled.RepoCollaborator 521 if err := json.Unmarshal(raw, &record); err != nil { 522 return fmt.Errorf("failed to unmarshal record: %w", err) 523 } 524 525 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 526 if err != nil || subjectId.Handle.IsInvalidHandle() { 527 return err 528 } 529 530 var rbacResource string 531 switch { 532 case strings.HasPrefix(record.Repo, "did:"): 533 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo) 534 if lookupErr != nil { 535 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr) 536 } 537 if ownerDid != did { 538 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo) 539 } 540 rbacResource = record.Repo 541 542 case strings.Contains(record.Repo, "/"): 543 // TODO: get rid of this PDS fetch once all repos have DIDs 544 repoAt, parseErr := syntax.ParseATURI(record.Repo) 545 if parseErr != nil { 546 return parseErr 547 } 548 549 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 550 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 551 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 552 } 553 554 xrpcc := xrpc.Client{ 555 Host: owner.PDSEndpoint(), 556 } 557 558 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 559 if getErr != nil { 560 return getErr 561 } 562 563 if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 564 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 565 } 566 rkey := repoAt.RecordKey().String() 567 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey) 568 if didErr != nil { 569 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr) 570 } 571 rbacResource = repoDid 572 573 default: 574 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo) 575 } 576 577 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 578 if err != nil { 579 return fmt.Errorf("failed to check permissions: %w", err) 580 } 581 if !ok { 582 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 583 } 584 585 if err := db.AddDid(h.db, subjectId.DID.String()); err != nil { 586 return err 587 } 588 h.jc.AddDid(subjectId.DID.String()) 589 590 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 591 return err 592 } 593 594 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 595} 596 597func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 598 l := log.FromContext(ctx) 599 600 id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did)) 601 if err != nil { 602 return fmt.Errorf("lookup did to fetch keys: %w", err) 603 } 604 605 serviceEndpoint, ok := id.Services["atproto_pds"] 606 if !ok { 607 l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did) 608 return nil 609 } 610 611 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL} 612 resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false) 613 if err != nil { 614 return fmt.Errorf("fetching public keys for did: %w", err) 615 } 616 617 for _, record := range resp.Records { 618 if record == nil { 619 continue 620 } 621 key := record.Value.Val.(*tangled.PublicKey) 622 if key == nil { 623 continue 624 } 625 pk := db.PublicKey{ 626 Did: did, 627 PublicKey: *key, 628 } 629 err = h.db.AddPublicKey(pk) 630 if err != nil { 631 return fmt.Errorf("adding public key to db: %w", err) 632 } 633 } 634 return nil 635} 636 637func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 638 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 639 640 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 641 if rkey == "" { 642 return nil 643 } 644 645 if event.Commit.Operation == jmodels.CommitOperationDelete { 646 return nil 647 } 648 649 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 650 return nil 651 } 652 653 raw := json.RawMessage(event.Commit.Record) 654 var record tangled.Repo 655 if err := json.Unmarshal(raw, &record); err != nil { 656 return fmt.Errorf("failed to unmarshal repo record: %w", err) 657 } 658 659 if record.Knot != h.c.Server.Hostname { 660 return nil 661 } 662 if record.RepoDid == nil || *record.RepoDid == "" { 663 l.Info("skipping repo event without repoDid") 664 return nil 665 } 666 repoDid := *record.RepoDid 667 668 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 669 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 670 return nil 671 } 672 673 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 674 if lookupErr != nil { 675 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 676 return nil 677 } 678 if ownerDid != event.Did { 679 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 680 return nil 681 } 682 683 alias := db.RepoAlias{ 684 OwnerDid: event.Did, 685 Rkey: rkey, 686 RepoDid: repoDid, 687 Rev: event.Commit.Rev, 688 } 689 if err := h.db.UpsertRepoAlias(alias); err != nil { 690 l.Warn("failed to upsert repo alias", "err", err) 691 return nil 692 } 693 694 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 695 return nil 696} 697 698func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 699 var err error 700 switch event.Kind { 701 case jmodels.EventKindIdentity: 702 err = h.resolver.InvalidateIdent(ctx, event.Did) 703 case jmodels.EventKindCommit: 704 switch event.Commit.Collection { 705 case tangled.PublicKeyNSID: 706 err = h.processPublicKey(ctx, event) 707 case tangled.KnotMemberNSID: 708 err = h.processKnotMember(ctx, event) 709 case tangled.RepoNSID: 710 err = h.processRepo(ctx, event) 711 case tangled.RepoPullNSID: 712 err = h.processPull(ctx, event) 713 case tangled.RepoCollaboratorNSID: 714 err = h.processCollaborator(ctx, event) 715 } 716 default: 717 return nil 718 } 719 720 if err != nil { 721 args := []any{"kind", event.Kind, "err", err} 722 if event.Kind == jmodels.EventKindCommit { 723 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 724 } 725 h.l.Warn("failed to process event, skipping", args...) 726 } 727 728 lastTimeUs := event.TimeUS + 1 729 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 730 h.l.Error("failed to save cursor", "err", saveErr) 731 } 732 733 return nil 734}