Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "net/http"
11 "net/url"
12 "path/filepath"
13 "strings"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/bluesky-social/indigo/xrpc"
18 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
19 jmodels "github.com/bluesky-social/jetstream/pkg/models"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/knotserver/db"
23 "tangled.org/core/knotserver/git"
24 knotxrpc "tangled.org/core/knotserver/xrpc"
25 "tangled.org/core/log"
26 "tangled.org/core/rbac"
27 "tangled.org/core/tid"
28 "tangled.org/core/workflow"
29)
30
31func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
32 l := log.FromContext(ctx)
33 raw := json.RawMessage(event.Commit.Record)
34 did := event.Did
35
36 var record tangled.PublicKey
37 if err := json.Unmarshal(raw, &record); err != nil {
38 return fmt.Errorf("failed to unmarshal record: %w", err)
39 }
40
41 pk := db.PublicKey{
42 Did: did,
43 PublicKey: record,
44 }
45 if err := h.db.AddPublicKey(pk); err != nil {
46 l.Error("failed to add public key", "error", err)
47 return fmt.Errorf("failed to add public key: %w", err)
48 }
49 l.Info("added public key from firehose", "did", did)
50 return nil
51}
52
53func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
54 did := event.Did
55 rkey := event.Commit.RKey
56 l := log.FromContext(ctx).With("handler", "processKnotMember", "did", did, "rkey", rkey)
57
58 switch event.Commit.Operation {
59 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
60 raw := json.RawMessage(event.Commit.Record)
61 var record tangled.KnotMember
62 if err := json.Unmarshal(raw, &record); err != nil {
63 return fmt.Errorf("failed to unmarshal record: %w", err)
64 }
65
66 if record.Domain != h.c.Server.Hostname {
67 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
68 }
69
70 subject, err := syntax.ParseDID(record.Subject)
71 if err != nil {
72 return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err)
73 }
74
75 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
76 if err != nil {
77 return fmt.Errorf("failed to enforce permissions: %w", err)
78 }
79 if !ok {
80 return fmt.Errorf("permission denied for %s", did)
81 }
82
83 sqlTx, err := h.db.BeginTx(ctx, nil)
84 if err != nil {
85 return fmt.Errorf("failed to start txn: %w", err)
86 }
87 committed := false
88 defer func() {
89 if !committed {
90 sqlTx.Rollback()
91 }
92 }()
93
94 existing, err := db.GetKnotMember(sqlTx, did, rkey)
95 if err != nil && !errors.Is(err, sql.ErrNoRows) {
96 return fmt.Errorf("failed to look up existing member: %w", err)
97 }
98
99 var staleSubject string
100 if existing != nil && existing.Subject != subject {
101 staleSubject = existing.Subject.String()
102 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil {
103 return fmt.Errorf("failed to remove stale member row: %w", err)
104 }
105 }
106
107 if err := db.AddKnotMember(sqlTx, db.KnotMember{
108 Did: syntax.DID(did),
109 Rkey: rkey,
110 Subject: subject,
111 }); err != nil {
112 return fmt.Errorf("failed to persist member row: %w", err)
113 }
114
115 if err := db.AddDid(sqlTx, subject.String()); err != nil {
116 return fmt.Errorf("failed to add did: %w", err)
117 }
118
119 dropStaleAcl := false
120 var staleDidDropped bool
121 if staleSubject != "" {
122 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject)
123 if err != nil {
124 return fmt.Errorf("failed to count stale subject rows: %w", err)
125 }
126 if remaining == 0 {
127 dropStaleAcl = true
128 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer)
129 if err != nil {
130 return fmt.Errorf("failed to check residual policies for stale subject: %w", err)
131 }
132 if !stillNeeded {
133 if err := db.RemoveDid(sqlTx, staleSubject); err != nil {
134 return fmt.Errorf("failed to remove stale did: %w", err)
135 }
136 staleDidDropped = true
137 }
138 }
139 l.Info("replaced stale knot member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped)
140 }
141
142 if err := sqlTx.Commit(); err != nil {
143 return fmt.Errorf("failed to commit txn: %w", err)
144 }
145 committed = true
146
147 if dropStaleAcl {
148 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil {
149 l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err)
150 }
151 }
152 if _, err := h.e.TryAddKnotMember(rbac.ThisServer, subject.String()); err != nil {
153 l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err)
154 }
155
156 if staleDidDropped {
157 h.jc.RemoveDid(staleSubject)
158 }
159 h.jc.AddDid(subject.String())
160 l.Info("added member from firehose", "member", subject)
161
162 if err := h.fetchAndAddKeys(ctx, subject.String()); err != nil {
163 return fmt.Errorf("failed to fetch and add keys: %w", err)
164 }
165 return nil
166
167 case jmodels.CommitOperationDelete:
168 sqlTx, err := h.db.BeginTx(ctx, nil)
169 if err != nil {
170 return fmt.Errorf("failed to start txn: %w", err)
171 }
172 committed := false
173 defer func() {
174 if !committed {
175 sqlTx.Rollback()
176 }
177 }()
178
179 member, err := db.GetKnotMember(sqlTx, did, rkey)
180 if errors.Is(err, sql.ErrNoRows) {
181 l.Info("knot member already removed")
182 return nil
183 }
184 if err != nil {
185 return fmt.Errorf("failed to look up knot member: %w", err)
186 }
187
188 staleSubject := member.Subject.String()
189
190 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil {
191 return fmt.Errorf("failed to remove member row: %w", err)
192 }
193
194 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject)
195 if err != nil {
196 return fmt.Errorf("failed to count remaining member rows: %w", err)
197 }
198
199 dropAcl := false
200 var staleDidDropped bool
201 if remaining == 0 {
202 dropAcl = true
203 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer)
204 if err != nil {
205 return fmt.Errorf("failed to check residual policies: %w", err)
206 }
207 if !stillNeeded {
208 if err := db.RemoveDid(sqlTx, staleSubject); err != nil {
209 return fmt.Errorf("failed to remove did: %w", err)
210 }
211 staleDidDropped = true
212 }
213 }
214
215 if err := sqlTx.Commit(); err != nil {
216 return fmt.Errorf("failed to commit txn: %w", err)
217 }
218 committed = true
219
220 if dropAcl {
221 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil {
222 l.Error("post-commit: failed to remove ACL", "subject", staleSubject, "err", err)
223 }
224 }
225
226 if staleDidDropped {
227 h.jc.RemoveDid(staleSubject)
228 }
229 l.Info("removed knot member from firehose", "member", member.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped)
230 return nil
231 }
232
233 return nil
234}
235
236// returns a repo path on disk if present, and error if not
237type targetRepo struct {
238 RepoPath string
239 OwnerDid string
240 RepoName string
241 RepoDid string
242 DefaultBranch string // default branch
243}
244
245func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
246 if record.Target == nil {
247 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
248 }
249
250 l := log.FromContext(ctx).With("handler", "validatePullRecord")
251 l = l.With("target_repo", record.Target.Repo)
252 l = l.With("target_branch", record.Target.Branch)
253
254 if record.Source == nil {
255 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
256 }
257
258 if record.Source.Repo != nil {
259 return nil, fmt.Errorf("ignoring pull record: fork based pull")
260 }
261
262 var repoPath, ownerDid, repoName, repoDid string
263 switch {
264 case strings.HasPrefix(record.Target.Repo, "did:"):
265 repoDid = record.Target.Repo
266 var lookupErr error
267 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
268 if lookupErr != nil {
269 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
270 }
271
272 case strings.Contains(record.Target.Repo, "/"):
273 // TODO: get rid of this PDS fetch once all repos have DIDs
274 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo)
275 if parseErr != nil {
276 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
277 }
278
279 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
280 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
281 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
282 }
283
284 xrpcc := xrpc.Client{
285 Host: ident.PDSEndpoint(),
286 }
287
288 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
289 if getErr != nil {
290 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
291 }
292
293 repo, ok := resp.Value.Val.(*tangled.Repo)
294 if !ok {
295 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
296 }
297
298 if repo.Knot != h.c.Server.Hostname {
299 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
300 }
301
302 ownerDid = ident.DID.String()
303 repoName = repoAt.RecordKey().String()
304
305 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
306 if didErr != nil {
307 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
308 }
309
310 var lookupErr error
311 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
312 if lookupErr != nil {
313 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
314 }
315
316 default:
317 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo)
318 }
319
320 gr, err := git.Open(repoPath, record.Source.Branch)
321 if err != nil {
322 return nil, fmt.Errorf("failed to open git repository: %w", err)
323 }
324
325 defaultBranch, _ := gr.FindMainBranch()
326
327 return &targetRepo{
328 RepoPath: repoPath,
329 OwnerDid: ownerDid,
330 RepoName: repoName,
331 RepoDid: repoDid,
332 DefaultBranch: defaultBranch,
333 }, nil
334}
335
336func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
337 // resolve the PR owner's identity to fetch the blob from their PDS
338 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
339 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
340 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
341 }
342
343 if len(record.Rounds) == 0 {
344 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
345 }
346
347 roundNumber := len(record.Rounds) - 1
348 round := record.Rounds[roundNumber]
349
350 // fetch the blob from the PR owner's PDS
351 prOwnerPds := prOwnerIdent.PDSEndpoint()
352 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
353 if err != nil {
354 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
355 }
356 q := blobUrl.Query()
357 q.Set("cid", round.PatchBlob.Ref.String())
358 q.Set("did", did)
359 blobUrl.RawQuery = q.Encode()
360
361 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
362 if err != nil {
363 return nil, fmt.Errorf("failed to create blob request: %w", err)
364 }
365 req.Header.Set("Content-Type", "application/json")
366
367 blobResp, err := http.DefaultClient.Do(req)
368 if err != nil {
369 return nil, fmt.Errorf("failed to fetch blob: %w", err)
370 }
371 defer blobResp.Body.Close()
372
373 blob := io.ReadCloser(blobResp.Body)
374 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
375 if err != nil {
376 return nil, fmt.Errorf("failed to parse submission: %w", err)
377 }
378
379 return latestSubmission, nil
380}
381
382func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
383 gr, err := git.Open(repoPath, sha)
384 if err != nil {
385 return nil, fmt.Errorf("failed to open git repository: %w", err)
386 }
387
388 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
389 if err != nil {
390 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
391 }
392
393 var pipeline workflow.RawPipeline
394 for _, e := range workflowDir {
395 if !e.IsFile() {
396 continue
397 }
398
399 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
400 contents, err := gr.RawContent(fpath)
401 if err != nil {
402 continue
403 }
404
405 pipeline = append(pipeline, workflow.RawWorkflow{
406 Name: e.Name,
407 Contents: contents,
408 })
409 }
410
411 return pipeline, nil
412}
413
414func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
415 l := log.FromContext(ctx)
416
417 trigger := tangled.Pipeline_PullRequestTriggerData{
418 Action: "create",
419 SourceBranch: sourceBranch,
420 SourceSha: sourceSha,
421 TargetBranch: targetBranch,
422 }
423
424 compiler := workflow.Compiler{
425 Trigger: tangled.Pipeline_TriggerMetadata{
426 Kind: string(workflow.TriggerKindPullRequest),
427 PullRequest: &trigger,
428 Repo: &tangled.Pipeline_TriggerRepo{
429 Knot: h.c.Server.Hostname,
430 RepoDid: &targetRepo.RepoDid,
431 Did: targetRepo.OwnerDid,
432 Repo: &targetRepo.RepoName,
433 DefaultBranch: targetRepo.DefaultBranch,
434 },
435 },
436 }
437
438 l.Info("raw", "raw", rawPipeline)
439 parsed := compiler.Parse(rawPipeline)
440 l.Info("parsed", "parsed", parsed)
441 compiled := compiler.Compile(parsed)
442
443 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
444
445 return compiled
446}
447
448func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
449 raw := json.RawMessage(event.Commit.Record)
450 rkey := event.Commit.RKey
451 did := event.Did
452
453 var record tangled.RepoPull
454 if err := json.Unmarshal(raw, &record); err != nil {
455 return fmt.Errorf("failed to unmarshal record: %w", err)
456 }
457
458 l := log.FromContext(ctx)
459 l = l.With("handler", "processPull")
460 l = l.With("did", did)
461
462 l.Info("validating pull record")
463 targetRepo, err := h.validatePullRecord(ctx, &record)
464 if err != nil {
465 l.Warn("pull record did not validate, skipping...")
466 return err
467 }
468
469 l = l.With("target_repo", record.Target.Repo)
470 l = l.With("target_branch", record.Target.Branch)
471
472 l.Info("fetching latest submission")
473 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
474 if err != nil {
475 return err
476 }
477
478 sha := latestSubmission.SourceRev
479 if sha == "" {
480 return fmt.Errorf("failed to extract source SHA from pull submission")
481 }
482 l = l.With("sha", sha)
483
484 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
485 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
486 if err != nil {
487 return err
488 }
489
490 l.Info("compiling pipeline", "workflow_count", len(pipeline))
491 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
492
493 // do not run empty pipelines
494 if cp.Workflows == nil {
495 l.Info("skipping empty pipeline")
496 return nil
497 }
498
499 l.Info("marshaling pipeline event")
500 eventJson, err := json.Marshal(cp)
501 if err != nil {
502 return fmt.Errorf("failed to marshal pipeline event: %w", err)
503 }
504
505 ev := db.Event{
506 Rkey: tid.TID(),
507 Nsid: tangled.PipelineNSID,
508 EventJson: string(eventJson),
509 }
510
511 l.Info("inserting pipeline event")
512 return h.db.InsertEvent(ev, h.n)
513}
514
515// duplicated from add collaborator
516func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
517 raw := json.RawMessage(event.Commit.Record)
518 did := event.Did
519
520 var record tangled.RepoCollaborator
521 if err := json.Unmarshal(raw, &record); err != nil {
522 return fmt.Errorf("failed to unmarshal record: %w", err)
523 }
524
525 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
526 if err != nil || subjectId.Handle.IsInvalidHandle() {
527 return err
528 }
529
530 var rbacResource string
531 switch {
532 case strings.HasPrefix(record.Repo, "did:"):
533 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo)
534 if lookupErr != nil {
535 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr)
536 }
537 if ownerDid != did {
538 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo)
539 }
540 rbacResource = record.Repo
541
542 case strings.Contains(record.Repo, "/"):
543 // TODO: get rid of this PDS fetch once all repos have DIDs
544 repoAt, parseErr := syntax.ParseATURI(record.Repo)
545 if parseErr != nil {
546 return parseErr
547 }
548
549 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
550 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
551 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
552 }
553
554 xrpcc := xrpc.Client{
555 Host: owner.PDSEndpoint(),
556 }
557
558 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
559 if getErr != nil {
560 return getErr
561 }
562
563 if _, ok := resp.Value.Val.(*tangled.Repo); !ok {
564 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
565 }
566 rkey := repoAt.RecordKey().String()
567 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey)
568 if didErr != nil {
569 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr)
570 }
571 rbacResource = repoDid
572
573 default:
574 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo)
575 }
576
577 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
578 if err != nil {
579 return fmt.Errorf("failed to check permissions: %w", err)
580 }
581 if !ok {
582 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
583 }
584
585 if err := db.AddDid(h.db, subjectId.DID.String()); err != nil {
586 return err
587 }
588 h.jc.AddDid(subjectId.DID.String())
589
590 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
591 return err
592 }
593
594 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
595}
596
597func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
598 l := log.FromContext(ctx)
599
600 id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did))
601 if err != nil {
602 return fmt.Errorf("lookup did to fetch keys: %w", err)
603 }
604
605 serviceEndpoint, ok := id.Services["atproto_pds"]
606 if !ok {
607 l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did)
608 return nil
609 }
610
611 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL}
612 resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false)
613 if err != nil {
614 return fmt.Errorf("fetching public keys for did: %w", err)
615 }
616
617 for _, record := range resp.Records {
618 if record == nil {
619 continue
620 }
621 key := record.Value.Val.(*tangled.PublicKey)
622 if key == nil {
623 continue
624 }
625 pk := db.PublicKey{
626 Did: did,
627 PublicKey: *key,
628 }
629 err = h.db.AddPublicKey(pk)
630 if err != nil {
631 return fmt.Errorf("adding public key to db: %w", err)
632 }
633 }
634 return nil
635}
636
637func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
638 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
639
640 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
641 if rkey == "" {
642 return nil
643 }
644
645 if event.Commit.Operation == jmodels.CommitOperationDelete {
646 return nil
647 }
648
649 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
650 return nil
651 }
652
653 raw := json.RawMessage(event.Commit.Record)
654 var record tangled.Repo
655 if err := json.Unmarshal(raw, &record); err != nil {
656 return fmt.Errorf("failed to unmarshal repo record: %w", err)
657 }
658
659 if record.Knot != h.c.Server.Hostname {
660 return nil
661 }
662 if record.RepoDid == nil || *record.RepoDid == "" {
663 l.Info("skipping repo event without repoDid")
664 return nil
665 }
666 repoDid := *record.RepoDid
667
668 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
669 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
670 return nil
671 }
672
673 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
674 if lookupErr != nil {
675 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
676 return nil
677 }
678 if ownerDid != event.Did {
679 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
680 return nil
681 }
682
683 alias := db.RepoAlias{
684 OwnerDid: event.Did,
685 Rkey: rkey,
686 RepoDid: repoDid,
687 Rev: event.Commit.Rev,
688 }
689 if err := h.db.UpsertRepoAlias(alias); err != nil {
690 l.Warn("failed to upsert repo alias", "err", err)
691 return nil
692 }
693
694 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
695 return nil
696}
697
698func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
699 var err error
700 switch event.Kind {
701 case jmodels.EventKindIdentity:
702 err = h.resolver.InvalidateIdent(ctx, event.Did)
703 case jmodels.EventKindCommit:
704 switch event.Commit.Collection {
705 case tangled.PublicKeyNSID:
706 err = h.processPublicKey(ctx, event)
707 case tangled.KnotMemberNSID:
708 err = h.processKnotMember(ctx, event)
709 case tangled.RepoNSID:
710 err = h.processRepo(ctx, event)
711 case tangled.RepoPullNSID:
712 err = h.processPull(ctx, event)
713 case tangled.RepoCollaboratorNSID:
714 err = h.processCollaborator(ctx, event)
715 }
716 default:
717 return nil
718 }
719
720 if err != nil {
721 args := []any{"kind", event.Kind, "err", err}
722 if event.Kind == jmodels.EventKindCommit {
723 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey)
724 }
725 h.l.Warn("failed to process event, skipping", args...)
726 }
727
728 lastTimeUs := event.TimeUS + 1
729 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
730 h.l.Error("failed to save cursor", "err", saveErr)
731 }
732
733 return nil
734}