Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at icy/sntnrt 22 kB View raw
1package knotserver 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "net/http" 11 "net/url" 12 "path/filepath" 13 "strings" 14 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/xrpc" 18 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 19 jmodels "github.com/bluesky-social/jetstream/pkg/models" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/eventstream" 23 "tangled.org/core/knotserver/db" 24 "tangled.org/core/knotserver/git" 25 knotxrpc "tangled.org/core/knotserver/xrpc" 26 "tangled.org/core/log" 27 "tangled.org/core/rbac" 28 "tangled.org/core/tid" 29 "tangled.org/core/workflow" 30) 31 32func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 33 l := log.FromContext(ctx) 34 raw := json.RawMessage(event.Commit.Record) 35 did := event.Did 36 37 var record tangled.PublicKey 38 if err := json.Unmarshal(raw, &record); err != nil { 39 return fmt.Errorf("failed to unmarshal record: %w", err) 40 } 41 42 pk := db.PublicKey{ 43 Did: did, 44 PublicKey: record, 45 } 46 if err := h.db.AddPublicKey(pk); err != nil { 47 l.Error("failed to add public key", "error", err) 48 return fmt.Errorf("failed to add public key: %w", err) 49 } 50 l.Info("added public key from firehose", "did", did) 51 return nil 52} 53 54func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 55 did := event.Did 56 rkey := event.Commit.RKey 57 l := log.FromContext(ctx).With("handler", "processKnotMember", "did", did, "rkey", rkey) 58 59 switch event.Commit.Operation { 60 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 61 raw := json.RawMessage(event.Commit.Record) 62 var record tangled.KnotMember 63 if err := json.Unmarshal(raw, &record); err != nil { 64 return fmt.Errorf("failed to unmarshal record: %w", err) 65 } 66 67 if record.Domain != h.c.Server.Hostname { 68 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 69 } 70 71 subject, err := syntax.ParseDID(record.Subject) 72 if err != nil { 73 return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err) 74 } 75 76 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 77 if err != nil { 78 return fmt.Errorf("failed to enforce permissions: %w", err) 79 } 80 if !ok { 81 return fmt.Errorf("permission denied for %s", did) 82 } 83 84 sqlTx, err := h.db.BeginTx(ctx, nil) 85 if err != nil { 86 return fmt.Errorf("failed to start txn: %w", err) 87 } 88 committed := false 89 defer func() { 90 if !committed { 91 sqlTx.Rollback() 92 } 93 }() 94 95 existing, err := db.GetKnotMember(sqlTx, did, rkey) 96 if err != nil && !errors.Is(err, sql.ErrNoRows) { 97 return fmt.Errorf("failed to look up existing member: %w", err) 98 } 99 100 var staleSubject string 101 if existing != nil && existing.Subject != subject { 102 staleSubject = existing.Subject.String() 103 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 104 return fmt.Errorf("failed to remove stale member row: %w", err) 105 } 106 } 107 108 if err := db.AddKnotMember(sqlTx, db.KnotMember{ 109 Did: syntax.DID(did), 110 Rkey: rkey, 111 Subject: subject, 112 }); err != nil { 113 return fmt.Errorf("failed to persist member row: %w", err) 114 } 115 116 if err := db.AddDid(sqlTx, subject.String()); err != nil { 117 return fmt.Errorf("failed to add did: %w", err) 118 } 119 120 dropStaleAcl := false 121 var staleDidDropped bool 122 if staleSubject != "" { 123 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 124 if err != nil { 125 return fmt.Errorf("failed to count stale subject rows: %w", err) 126 } 127 if remaining == 0 { 128 dropStaleAcl = true 129 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 130 if err != nil { 131 return fmt.Errorf("failed to check residual policies for stale subject: %w", err) 132 } 133 if !stillNeeded { 134 if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 135 return fmt.Errorf("failed to remove stale did: %w", err) 136 } 137 staleDidDropped = true 138 } 139 } 140 l.Info("replaced stale knot member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped) 141 } 142 143 if err := sqlTx.Commit(); err != nil { 144 return fmt.Errorf("failed to commit txn: %w", err) 145 } 146 committed = true 147 148 if dropStaleAcl { 149 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 150 l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err) 151 } 152 } 153 if _, err := h.e.TryAddKnotMember(rbac.ThisServer, subject.String()); err != nil { 154 l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err) 155 } 156 157 if staleDidDropped { 158 h.jc.RemoveDid(staleSubject) 159 } 160 h.jc.AddDid(subject.String()) 161 l.Info("added member from firehose", "member", subject) 162 163 if err := h.fetchAndAddKeys(ctx, subject.String()); err != nil { 164 return fmt.Errorf("failed to fetch and add keys: %w", err) 165 } 166 return nil 167 168 case jmodels.CommitOperationDelete: 169 sqlTx, err := h.db.BeginTx(ctx, nil) 170 if err != nil { 171 return fmt.Errorf("failed to start txn: %w", err) 172 } 173 committed := false 174 defer func() { 175 if !committed { 176 sqlTx.Rollback() 177 } 178 }() 179 180 member, err := db.GetKnotMember(sqlTx, did, rkey) 181 if errors.Is(err, sql.ErrNoRows) { 182 l.Info("knot member already removed") 183 return nil 184 } 185 if err != nil { 186 return fmt.Errorf("failed to look up knot member: %w", err) 187 } 188 189 staleSubject := member.Subject.String() 190 191 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 192 return fmt.Errorf("failed to remove member row: %w", err) 193 } 194 195 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 196 if err != nil { 197 return fmt.Errorf("failed to count remaining member rows: %w", err) 198 } 199 200 dropAcl := false 201 var staleDidDropped bool 202 if remaining == 0 { 203 dropAcl = true 204 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 205 if err != nil { 206 return fmt.Errorf("failed to check residual policies: %w", err) 207 } 208 if !stillNeeded { 209 if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 210 return fmt.Errorf("failed to remove did: %w", err) 211 } 212 staleDidDropped = true 213 } 214 } 215 216 if err := sqlTx.Commit(); err != nil { 217 return fmt.Errorf("failed to commit txn: %w", err) 218 } 219 committed = true 220 221 if dropAcl { 222 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 223 l.Error("post-commit: failed to remove ACL", "subject", staleSubject, "err", err) 224 } 225 } 226 227 if staleDidDropped { 228 h.jc.RemoveDid(staleSubject) 229 } 230 l.Info("removed knot member from firehose", "member", member.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped) 231 return nil 232 } 233 234 return nil 235} 236 237// returns a repo path on disk if present, and error if not 238type targetRepo struct { 239 RepoPath string 240 OwnerDid string 241 RepoName string 242 RepoDid string 243 DefaultBranch string // default branch 244} 245 246func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 247 if record.Target == nil { 248 return nil, fmt.Errorf("ignoring pull record: target repo is nil") 249 } 250 251 l := log.FromContext(ctx).With("handler", "validatePullRecord") 252 l = l.With("target_repo", record.Target.Repo) 253 l = l.With("target_branch", record.Target.Branch) 254 255 if record.Source == nil { 256 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 257 } 258 259 if record.Source.Repo != nil { 260 return nil, fmt.Errorf("ignoring pull record: fork based pull") 261 } 262 263 var repoPath, ownerDid, repoName, repoDid string 264 switch { 265 case strings.HasPrefix(record.Target.Repo, "did:"): 266 repoDid = record.Target.Repo 267 var lookupErr error 268 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 269 if lookupErr != nil { 270 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 271 } 272 273 case strings.Contains(record.Target.Repo, "/"): 274 // TODO: get rid of this PDS fetch once all repos have DIDs 275 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo) 276 if parseErr != nil { 277 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 278 } 279 280 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 281 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 282 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 283 } 284 285 xrpcc := xrpc.Client{ 286 Host: ident.PDSEndpoint(), 287 } 288 289 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 290 if getErr != nil { 291 return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 292 } 293 294 repo, ok := resp.Value.Val.(*tangled.Repo) 295 if !ok { 296 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 297 } 298 299 if repo.Knot != h.c.Server.Hostname { 300 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 301 } 302 303 ownerDid = ident.DID.String() 304 repoName = repoAt.RecordKey().String() 305 306 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 307 if didErr != nil { 308 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 309 } 310 311 var lookupErr error 312 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 313 if lookupErr != nil { 314 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 315 } 316 317 default: 318 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo) 319 } 320 321 gr, err := git.Open(repoPath, record.Source.Branch) 322 if err != nil { 323 return nil, fmt.Errorf("failed to open git repository: %w", err) 324 } 325 326 defaultBranch, _ := gr.FindMainBranch() 327 328 return &targetRepo{ 329 RepoPath: repoPath, 330 OwnerDid: ownerDid, 331 RepoName: repoName, 332 RepoDid: repoDid, 333 DefaultBranch: defaultBranch, 334 }, nil 335} 336 337func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 338 // resolve the PR owner's identity to fetch the blob from their PDS 339 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 340 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 341 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 342 } 343 344 if len(record.Rounds) == 0 { 345 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record") 346 } 347 348 roundNumber := len(record.Rounds) - 1 349 round := record.Rounds[roundNumber] 350 351 // fetch the blob from the PR owner's PDS 352 prOwnerPds := prOwnerIdent.PDSEndpoint() 353 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 354 if err != nil { 355 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 356 } 357 q := blobUrl.Query() 358 q.Set("cid", round.PatchBlob.Ref.String()) 359 q.Set("did", did) 360 blobUrl.RawQuery = q.Encode() 361 362 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 363 if err != nil { 364 return nil, fmt.Errorf("failed to create blob request: %w", err) 365 } 366 req.Header.Set("Content-Type", "application/json") 367 368 blobResp, err := http.DefaultClient.Do(req) 369 if err != nil { 370 return nil, fmt.Errorf("failed to fetch blob: %w", err) 371 } 372 defer blobResp.Body.Close() 373 374 blob := io.ReadCloser(blobResp.Body) 375 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 376 if err != nil { 377 return nil, fmt.Errorf("failed to parse submission: %w", err) 378 } 379 380 return latestSubmission, nil 381} 382 383func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 384 gr, err := git.Open(repoPath, sha) 385 if err != nil { 386 return nil, fmt.Errorf("failed to open git repository: %w", err) 387 } 388 389 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 390 if err != nil { 391 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 392 } 393 394 var pipeline workflow.RawPipeline 395 for _, e := range workflowDir { 396 if !e.IsFile() { 397 continue 398 } 399 400 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 401 contents, err := gr.RawContent(fpath) 402 if err != nil { 403 continue 404 } 405 406 pipeline = append(pipeline, workflow.RawWorkflow{ 407 Name: e.Name, 408 Contents: contents, 409 }) 410 } 411 412 return pipeline, nil 413} 414 415func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 416 l := log.FromContext(ctx) 417 418 trigger := tangled.Pipeline_PullRequestTriggerData{ 419 Action: "create", 420 SourceBranch: sourceBranch, 421 SourceSha: sourceSha, 422 TargetBranch: targetBranch, 423 } 424 425 compiler := workflow.Compiler{ 426 Trigger: tangled.Pipeline_TriggerMetadata{ 427 Kind: string(workflow.TriggerKindPullRequest), 428 PullRequest: &trigger, 429 Repo: &tangled.Pipeline_TriggerRepo{ 430 Knot: h.c.Server.Hostname, 431 RepoDid: &targetRepo.RepoDid, 432 Did: targetRepo.OwnerDid, 433 Repo: &targetRepo.RepoName, 434 DefaultBranch: targetRepo.DefaultBranch, 435 }, 436 }, 437 } 438 439 l.Info("raw", "raw", rawPipeline) 440 parsed := compiler.Parse(rawPipeline) 441 l.Info("parsed", "parsed", parsed) 442 compiled := compiler.Compile(parsed) 443 444 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 445 446 return compiled 447} 448 449func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 450 raw := json.RawMessage(event.Commit.Record) 451 rkey := event.Commit.RKey 452 did := event.Did 453 454 var record tangled.RepoPull 455 if err := json.Unmarshal(raw, &record); err != nil { 456 return fmt.Errorf("failed to unmarshal record: %w", err) 457 } 458 459 l := log.FromContext(ctx) 460 l = l.With("handler", "processPull") 461 l = l.With("did", did) 462 463 l.Info("validating pull record") 464 targetRepo, err := h.validatePullRecord(ctx, &record) 465 if err != nil { 466 l.Warn("pull record did not validate, skipping...") 467 return err 468 } 469 470 l = l.With("target_repo", record.Target.Repo) 471 l = l.With("target_branch", record.Target.Branch) 472 473 l.Info("fetching latest submission") 474 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 475 if err != nil { 476 return err 477 } 478 479 sha := latestSubmission.SourceRev 480 if sha == "" { 481 return fmt.Errorf("failed to extract source SHA from pull submission") 482 } 483 l = l.With("sha", sha) 484 485 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 486 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 487 if err != nil { 488 return err 489 } 490 491 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 492 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 493 494 // do not run empty pipelines 495 if cp.Workflows == nil { 496 l.Info("skipping empty pipeline") 497 return nil 498 } 499 500 l.Info("marshaling pipeline event") 501 eventJson, err := json.Marshal(cp) 502 if err != nil { 503 return fmt.Errorf("failed to marshal pipeline event: %w", err) 504 } 505 506 ev := eventstream.Event{ 507 Rkey: tid.TID(), 508 Nsid: tangled.PipelineNSID, 509 EventJson: eventJson, 510 } 511 512 l.Info("inserting pipeline event") 513 return h.db.InsertEvent(ev, h.n) 514} 515 516// duplicated from add collaborator 517func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 518 raw := json.RawMessage(event.Commit.Record) 519 did := event.Did 520 521 var record tangled.RepoCollaborator 522 if err := json.Unmarshal(raw, &record); err != nil { 523 return fmt.Errorf("failed to unmarshal record: %w", err) 524 } 525 526 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 527 if err != nil || subjectId.Handle.IsInvalidHandle() { 528 return err 529 } 530 531 var rbacResource string 532 switch { 533 case strings.HasPrefix(record.Repo, "did:"): 534 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo) 535 if lookupErr != nil { 536 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr) 537 } 538 if ownerDid != did { 539 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo) 540 } 541 rbacResource = record.Repo 542 543 case strings.Contains(record.Repo, "/"): 544 // TODO: get rid of this PDS fetch once all repos have DIDs 545 repoAt, parseErr := syntax.ParseATURI(record.Repo) 546 if parseErr != nil { 547 return parseErr 548 } 549 550 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 551 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 552 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 553 } 554 555 xrpcc := xrpc.Client{ 556 Host: owner.PDSEndpoint(), 557 } 558 559 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 560 if getErr != nil { 561 return getErr 562 } 563 564 if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 565 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 566 } 567 rkey := repoAt.RecordKey().String() 568 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey) 569 if didErr != nil { 570 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr) 571 } 572 rbacResource = repoDid 573 574 default: 575 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo) 576 } 577 578 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 579 if err != nil { 580 return fmt.Errorf("failed to check permissions: %w", err) 581 } 582 if !ok { 583 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 584 } 585 586 if err := db.AddDid(h.db, subjectId.DID.String()); err != nil { 587 return err 588 } 589 h.jc.AddDid(subjectId.DID.String()) 590 591 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 592 return err 593 } 594 595 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 596} 597 598func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 599 l := log.FromContext(ctx) 600 601 id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did)) 602 if err != nil { 603 return fmt.Errorf("lookup did to fetch keys: %w", err) 604 } 605 606 serviceEndpoint, ok := id.Services["atproto_pds"] 607 if !ok { 608 l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did) 609 return nil 610 } 611 612 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL} 613 resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false) 614 if err != nil { 615 return fmt.Errorf("fetching public keys for did: %w", err) 616 } 617 618 for _, record := range resp.Records { 619 if record == nil { 620 continue 621 } 622 key := record.Value.Val.(*tangled.PublicKey) 623 if key == nil { 624 continue 625 } 626 pk := db.PublicKey{ 627 Did: did, 628 PublicKey: *key, 629 } 630 err = h.db.AddPublicKey(pk) 631 if err != nil { 632 return fmt.Errorf("adding public key to db: %w", err) 633 } 634 } 635 return nil 636} 637 638func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 639 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 640 641 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 642 if rkey == "" { 643 return nil 644 } 645 646 if event.Commit.Operation == jmodels.CommitOperationDelete { 647 return nil 648 } 649 650 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 651 return nil 652 } 653 654 raw := json.RawMessage(event.Commit.Record) 655 var record tangled.Repo 656 if err := json.Unmarshal(raw, &record); err != nil { 657 return fmt.Errorf("failed to unmarshal repo record: %w", err) 658 } 659 660 if record.Knot != h.c.Server.Hostname { 661 return nil 662 } 663 if record.RepoDid == nil || *record.RepoDid == "" { 664 l.Info("skipping repo event without repoDid") 665 return nil 666 } 667 repoDid := *record.RepoDid 668 669 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 670 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 671 return nil 672 } 673 674 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 675 if lookupErr != nil { 676 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 677 return nil 678 } 679 if ownerDid != event.Did { 680 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 681 return nil 682 } 683 684 alias := db.RepoAlias{ 685 OwnerDid: event.Did, 686 Rkey: rkey, 687 RepoDid: repoDid, 688 Rev: event.Commit.Rev, 689 } 690 if err := h.db.UpsertRepoAlias(alias); err != nil { 691 l.Warn("failed to upsert repo alias", "err", err) 692 return nil 693 } 694 695 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 696 return nil 697} 698 699func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 700 var err error 701 switch event.Kind { 702 case jmodels.EventKindIdentity: 703 err = h.resolver.InvalidateIdent(ctx, event.Did) 704 case jmodels.EventKindCommit: 705 switch event.Commit.Collection { 706 case tangled.PublicKeyNSID: 707 err = h.processPublicKey(ctx, event) 708 case tangled.KnotMemberNSID: 709 err = h.processKnotMember(ctx, event) 710 case tangled.RepoNSID: 711 err = h.processRepo(ctx, event) 712 case tangled.RepoPullNSID: 713 err = h.processPull(ctx, event) 714 case tangled.RepoCollaboratorNSID: 715 err = h.processCollaborator(ctx, event) 716 } 717 default: 718 return nil 719 } 720 721 if err != nil { 722 args := []any{"kind", event.Kind, "err", err} 723 if event.Kind == jmodels.EventKindCommit { 724 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 725 } 726 h.l.Warn("failed to process event, skipping", args...) 727 } 728 729 lastTimeUs := event.TimeUS + 1 730 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 731 h.l.Error("failed to save cursor", "err", saveErr) 732 } 733 734 return nil 735}