Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "net/http"
11 "net/url"
12 "path/filepath"
13 "strings"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/bluesky-social/indigo/xrpc"
18 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
19 jmodels "github.com/bluesky-social/jetstream/pkg/models"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/eventstream"
23 "tangled.org/core/knotserver/db"
24 "tangled.org/core/knotserver/git"
25 knotxrpc "tangled.org/core/knotserver/xrpc"
26 "tangled.org/core/log"
27 "tangled.org/core/rbac"
28 "tangled.org/core/tid"
29 "tangled.org/core/workflow"
30)
31
32func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
33 l := log.FromContext(ctx)
34 raw := json.RawMessage(event.Commit.Record)
35 did := event.Did
36
37 var record tangled.PublicKey
38 if err := json.Unmarshal(raw, &record); err != nil {
39 return fmt.Errorf("failed to unmarshal record: %w", err)
40 }
41
42 pk := db.PublicKey{
43 Did: did,
44 PublicKey: record,
45 }
46 if err := h.db.AddPublicKey(pk); err != nil {
47 l.Error("failed to add public key", "error", err)
48 return fmt.Errorf("failed to add public key: %w", err)
49 }
50 l.Info("added public key from firehose", "did", did)
51 return nil
52}
53
54func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
55 did := event.Did
56 rkey := event.Commit.RKey
57 l := log.FromContext(ctx).With("handler", "processKnotMember", "did", did, "rkey", rkey)
58
59 switch event.Commit.Operation {
60 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
61 raw := json.RawMessage(event.Commit.Record)
62 var record tangled.KnotMember
63 if err := json.Unmarshal(raw, &record); err != nil {
64 return fmt.Errorf("failed to unmarshal record: %w", err)
65 }
66
67 if record.Domain != h.c.Server.Hostname {
68 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
69 }
70
71 subject, err := syntax.ParseDID(record.Subject)
72 if err != nil {
73 return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err)
74 }
75
76 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
77 if err != nil {
78 return fmt.Errorf("failed to enforce permissions: %w", err)
79 }
80 if !ok {
81 return fmt.Errorf("permission denied for %s", did)
82 }
83
84 sqlTx, err := h.db.BeginTx(ctx, nil)
85 if err != nil {
86 return fmt.Errorf("failed to start txn: %w", err)
87 }
88 committed := false
89 defer func() {
90 if !committed {
91 sqlTx.Rollback()
92 }
93 }()
94
95 existing, err := db.GetKnotMember(sqlTx, did, rkey)
96 if err != nil && !errors.Is(err, sql.ErrNoRows) {
97 return fmt.Errorf("failed to look up existing member: %w", err)
98 }
99
100 var staleSubject string
101 if existing != nil && existing.Subject != subject {
102 staleSubject = existing.Subject.String()
103 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil {
104 return fmt.Errorf("failed to remove stale member row: %w", err)
105 }
106 }
107
108 if err := db.AddKnotMember(sqlTx, db.KnotMember{
109 Did: syntax.DID(did),
110 Rkey: rkey,
111 Subject: subject,
112 }); err != nil {
113 return fmt.Errorf("failed to persist member row: %w", err)
114 }
115
116 if err := db.AddDid(sqlTx, subject.String()); err != nil {
117 return fmt.Errorf("failed to add did: %w", err)
118 }
119
120 dropStaleAcl := false
121 var staleDidDropped bool
122 if staleSubject != "" {
123 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject)
124 if err != nil {
125 return fmt.Errorf("failed to count stale subject rows: %w", err)
126 }
127 if remaining == 0 {
128 dropStaleAcl = true
129 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer)
130 if err != nil {
131 return fmt.Errorf("failed to check residual policies for stale subject: %w", err)
132 }
133 if !stillNeeded {
134 if err := db.RemoveDid(sqlTx, staleSubject); err != nil {
135 return fmt.Errorf("failed to remove stale did: %w", err)
136 }
137 staleDidDropped = true
138 }
139 }
140 l.Info("replaced stale knot member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped)
141 }
142
143 if err := sqlTx.Commit(); err != nil {
144 return fmt.Errorf("failed to commit txn: %w", err)
145 }
146 committed = true
147
148 if dropStaleAcl {
149 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil {
150 l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err)
151 }
152 }
153 if _, err := h.e.TryAddKnotMember(rbac.ThisServer, subject.String()); err != nil {
154 l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err)
155 }
156
157 if staleDidDropped {
158 h.jc.RemoveDid(staleSubject)
159 }
160 h.jc.AddDid(subject.String())
161 l.Info("added member from firehose", "member", subject)
162
163 if err := h.fetchAndAddKeys(ctx, subject.String()); err != nil {
164 return fmt.Errorf("failed to fetch and add keys: %w", err)
165 }
166 return nil
167
168 case jmodels.CommitOperationDelete:
169 sqlTx, err := h.db.BeginTx(ctx, nil)
170 if err != nil {
171 return fmt.Errorf("failed to start txn: %w", err)
172 }
173 committed := false
174 defer func() {
175 if !committed {
176 sqlTx.Rollback()
177 }
178 }()
179
180 member, err := db.GetKnotMember(sqlTx, did, rkey)
181 if errors.Is(err, sql.ErrNoRows) {
182 l.Info("knot member already removed")
183 return nil
184 }
185 if err != nil {
186 return fmt.Errorf("failed to look up knot member: %w", err)
187 }
188
189 staleSubject := member.Subject.String()
190
191 if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil {
192 return fmt.Errorf("failed to remove member row: %w", err)
193 }
194
195 remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject)
196 if err != nil {
197 return fmt.Errorf("failed to count remaining member rows: %w", err)
198 }
199
200 dropAcl := false
201 var staleDidDropped bool
202 if remaining == 0 {
203 dropAcl = true
204 stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer)
205 if err != nil {
206 return fmt.Errorf("failed to check residual policies: %w", err)
207 }
208 if !stillNeeded {
209 if err := db.RemoveDid(sqlTx, staleSubject); err != nil {
210 return fmt.Errorf("failed to remove did: %w", err)
211 }
212 staleDidDropped = true
213 }
214 }
215
216 if err := sqlTx.Commit(); err != nil {
217 return fmt.Errorf("failed to commit txn: %w", err)
218 }
219 committed = true
220
221 if dropAcl {
222 if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil {
223 l.Error("post-commit: failed to remove ACL", "subject", staleSubject, "err", err)
224 }
225 }
226
227 if staleDidDropped {
228 h.jc.RemoveDid(staleSubject)
229 }
230 l.Info("removed knot member from firehose", "member", member.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped)
231 return nil
232 }
233
234 return nil
235}
236
237// returns a repo path on disk if present, and error if not
238type targetRepo struct {
239 RepoPath string
240 OwnerDid string
241 RepoName string
242 RepoDid string
243 DefaultBranch string // default branch
244}
245
246func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) {
247 if record.Target == nil {
248 return nil, fmt.Errorf("ignoring pull record: target repo is nil")
249 }
250
251 l := log.FromContext(ctx).With("handler", "validatePullRecord")
252 l = l.With("target_repo", record.Target.Repo)
253 l = l.With("target_branch", record.Target.Branch)
254
255 if record.Source == nil {
256 return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request")
257 }
258
259 if record.Source.Repo != nil {
260 return nil, fmt.Errorf("ignoring pull record: fork based pull")
261 }
262
263 var repoPath, ownerDid, repoName, repoDid string
264 switch {
265 case strings.HasPrefix(record.Target.Repo, "did:"):
266 repoDid = record.Target.Repo
267 var lookupErr error
268 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
269 if lookupErr != nil {
270 return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr)
271 }
272
273 case strings.Contains(record.Target.Repo, "/"):
274 // TODO: get rid of this PDS fetch once all repos have DIDs
275 repoAt, parseErr := syntax.ParseATURI(record.Target.Repo)
276 if parseErr != nil {
277 return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr)
278 }
279
280 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
281 if resolveErr != nil || ident.Handle.IsInvalidHandle() {
282 return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr)
283 }
284
285 xrpcc := xrpc.Client{
286 Host: ident.PDSEndpoint(),
287 }
288
289 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
290 if getErr != nil {
291 return nil, fmt.Errorf("failed to resolve repo: %w", getErr)
292 }
293
294 repo, ok := resp.Value.Val.(*tangled.Repo)
295 if !ok {
296 return nil, fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
297 }
298
299 if repo.Knot != h.c.Server.Hostname {
300 return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
301 }
302
303 ownerDid = ident.DID.String()
304 repoName = repoAt.RecordKey().String()
305
306 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
307 if didErr != nil {
308 return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr)
309 }
310
311 var lookupErr error
312 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
313 if lookupErr != nil {
314 return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr)
315 }
316
317 default:
318 return nil, fmt.Errorf("ignoring pull record: target repo has unrecognized format: %s", record.Target.Repo)
319 }
320
321 gr, err := git.Open(repoPath, record.Source.Branch)
322 if err != nil {
323 return nil, fmt.Errorf("failed to open git repository: %w", err)
324 }
325
326 defaultBranch, _ := gr.FindMainBranch()
327
328 return &targetRepo{
329 RepoPath: repoPath,
330 OwnerDid: ownerDid,
331 RepoName: repoName,
332 RepoDid: repoDid,
333 DefaultBranch: defaultBranch,
334 }, nil
335}
336
337func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
338 // resolve the PR owner's identity to fetch the blob from their PDS
339 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
340 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
341 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
342 }
343
344 if len(record.Rounds) == 0 {
345 return nil, fmt.Errorf("failed to fetch latest submission, no rounds in record")
346 }
347
348 roundNumber := len(record.Rounds) - 1
349 round := record.Rounds[roundNumber]
350
351 // fetch the blob from the PR owner's PDS
352 prOwnerPds := prOwnerIdent.PDSEndpoint()
353 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
354 if err != nil {
355 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
356 }
357 q := blobUrl.Query()
358 q.Set("cid", round.PatchBlob.Ref.String())
359 q.Set("did", did)
360 blobUrl.RawQuery = q.Encode()
361
362 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
363 if err != nil {
364 return nil, fmt.Errorf("failed to create blob request: %w", err)
365 }
366 req.Header.Set("Content-Type", "application/json")
367
368 blobResp, err := http.DefaultClient.Do(req)
369 if err != nil {
370 return nil, fmt.Errorf("failed to fetch blob: %w", err)
371 }
372 defer blobResp.Body.Close()
373
374 blob := io.ReadCloser(blobResp.Body)
375 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
376 if err != nil {
377 return nil, fmt.Errorf("failed to parse submission: %w", err)
378 }
379
380 return latestSubmission, nil
381}
382
383func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
384 gr, err := git.Open(repoPath, sha)
385 if err != nil {
386 return nil, fmt.Errorf("failed to open git repository: %w", err)
387 }
388
389 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
390 if err != nil {
391 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
392 }
393
394 var pipeline workflow.RawPipeline
395 for _, e := range workflowDir {
396 if !e.IsFile() {
397 continue
398 }
399
400 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
401 contents, err := gr.RawContent(fpath)
402 if err != nil {
403 continue
404 }
405
406 pipeline = append(pipeline, workflow.RawWorkflow{
407 Name: e.Name,
408 Contents: contents,
409 })
410 }
411
412 return pipeline, nil
413}
414
415func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
416 l := log.FromContext(ctx)
417
418 trigger := tangled.Pipeline_PullRequestTriggerData{
419 Action: "create",
420 SourceBranch: sourceBranch,
421 SourceSha: sourceSha,
422 TargetBranch: targetBranch,
423 }
424
425 compiler := workflow.Compiler{
426 Trigger: tangled.Pipeline_TriggerMetadata{
427 Kind: string(workflow.TriggerKindPullRequest),
428 PullRequest: &trigger,
429 Repo: &tangled.Pipeline_TriggerRepo{
430 Knot: h.c.Server.Hostname,
431 RepoDid: &targetRepo.RepoDid,
432 Did: targetRepo.OwnerDid,
433 Repo: &targetRepo.RepoName,
434 DefaultBranch: targetRepo.DefaultBranch,
435 },
436 },
437 }
438
439 l.Info("raw", "raw", rawPipeline)
440 parsed := compiler.Parse(rawPipeline)
441 l.Info("parsed", "parsed", parsed)
442 compiled := compiler.Compile(parsed)
443
444 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
445
446 return compiled
447}
448
449func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
450 raw := json.RawMessage(event.Commit.Record)
451 rkey := event.Commit.RKey
452 did := event.Did
453
454 var record tangled.RepoPull
455 if err := json.Unmarshal(raw, &record); err != nil {
456 return fmt.Errorf("failed to unmarshal record: %w", err)
457 }
458
459 l := log.FromContext(ctx)
460 l = l.With("handler", "processPull")
461 l = l.With("did", did)
462
463 l.Info("validating pull record")
464 targetRepo, err := h.validatePullRecord(ctx, &record)
465 if err != nil {
466 l.Warn("pull record did not validate, skipping...")
467 return err
468 }
469
470 l = l.With("target_repo", record.Target.Repo)
471 l = l.With("target_branch", record.Target.Branch)
472
473 l.Info("fetching latest submission")
474 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
475 if err != nil {
476 return err
477 }
478
479 sha := latestSubmission.SourceRev
480 if sha == "" {
481 return fmt.Errorf("failed to extract source SHA from pull submission")
482 }
483 l = l.With("sha", sha)
484
485 l.Info("discovering workflows", "repo_path", targetRepo.RepoPath)
486 pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha)
487 if err != nil {
488 return err
489 }
490
491 l.Info("compiling pipeline", "workflow_count", len(pipeline))
492 cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline)
493
494 // do not run empty pipelines
495 if cp.Workflows == nil {
496 l.Info("skipping empty pipeline")
497 return nil
498 }
499
500 l.Info("marshaling pipeline event")
501 eventJson, err := json.Marshal(cp)
502 if err != nil {
503 return fmt.Errorf("failed to marshal pipeline event: %w", err)
504 }
505
506 ev := eventstream.Event{
507 Rkey: tid.TID(),
508 Nsid: tangled.PipelineNSID,
509 EventJson: eventJson,
510 }
511
512 l.Info("inserting pipeline event")
513 return h.db.InsertEvent(ev, h.n)
514}
515
516// duplicated from add collaborator
517func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
518 raw := json.RawMessage(event.Commit.Record)
519 did := event.Did
520
521 var record tangled.RepoCollaborator
522 if err := json.Unmarshal(raw, &record); err != nil {
523 return fmt.Errorf("failed to unmarshal record: %w", err)
524 }
525
526 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
527 if err != nil || subjectId.Handle.IsInvalidHandle() {
528 return err
529 }
530
531 var rbacResource string
532 switch {
533 case strings.HasPrefix(record.Repo, "did:"):
534 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo)
535 if lookupErr != nil {
536 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr)
537 }
538 if ownerDid != did {
539 return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo)
540 }
541 rbacResource = record.Repo
542
543 case strings.Contains(record.Repo, "/"):
544 // TODO: get rid of this PDS fetch once all repos have DIDs
545 repoAt, parseErr := syntax.ParseATURI(record.Repo)
546 if parseErr != nil {
547 return parseErr
548 }
549
550 owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
551 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
552 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
553 }
554
555 xrpcc := xrpc.Client{
556 Host: owner.PDSEndpoint(),
557 }
558
559 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
560 if getErr != nil {
561 return getErr
562 }
563
564 if _, ok := resp.Value.Val.(*tangled.Repo); !ok {
565 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
566 }
567 rkey := repoAt.RecordKey().String()
568 repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey)
569 if didErr != nil {
570 return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr)
571 }
572 rbacResource = repoDid
573
574 default:
575 return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo)
576 }
577
578 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource)
579 if err != nil {
580 return fmt.Errorf("failed to check permissions: %w", err)
581 }
582 if !ok {
583 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource)
584 }
585
586 if err := db.AddDid(h.db, subjectId.DID.String()); err != nil {
587 return err
588 }
589 h.jc.AddDid(subjectId.DID.String())
590
591 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil {
592 return err
593 }
594
595 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
596}
597
598func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
599 l := log.FromContext(ctx)
600
601 id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did))
602 if err != nil {
603 return fmt.Errorf("lookup did to fetch keys: %w", err)
604 }
605
606 serviceEndpoint, ok := id.Services["atproto_pds"]
607 if !ok {
608 l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did)
609 return nil
610 }
611
612 xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL}
613 resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false)
614 if err != nil {
615 return fmt.Errorf("fetching public keys for did: %w", err)
616 }
617
618 for _, record := range resp.Records {
619 if record == nil {
620 continue
621 }
622 key := record.Value.Val.(*tangled.PublicKey)
623 if key == nil {
624 continue
625 }
626 pk := db.PublicKey{
627 Did: did,
628 PublicKey: *key,
629 }
630 err = h.db.AddPublicKey(pk)
631 if err != nil {
632 return fmt.Errorf("adding public key to db: %w", err)
633 }
634 }
635 return nil
636}
637
638func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
639 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
640
641 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
642 if rkey == "" {
643 return nil
644 }
645
646 if event.Commit.Operation == jmodels.CommitOperationDelete {
647 return nil
648 }
649
650 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
651 return nil
652 }
653
654 raw := json.RawMessage(event.Commit.Record)
655 var record tangled.Repo
656 if err := json.Unmarshal(raw, &record); err != nil {
657 return fmt.Errorf("failed to unmarshal repo record: %w", err)
658 }
659
660 if record.Knot != h.c.Server.Hostname {
661 return nil
662 }
663 if record.RepoDid == nil || *record.RepoDid == "" {
664 l.Info("skipping repo event without repoDid")
665 return nil
666 }
667 repoDid := *record.RepoDid
668
669 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
670 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
671 return nil
672 }
673
674 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
675 if lookupErr != nil {
676 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
677 return nil
678 }
679 if ownerDid != event.Did {
680 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
681 return nil
682 }
683
684 alias := db.RepoAlias{
685 OwnerDid: event.Did,
686 Rkey: rkey,
687 RepoDid: repoDid,
688 Rev: event.Commit.Rev,
689 }
690 if err := h.db.UpsertRepoAlias(alias); err != nil {
691 l.Warn("failed to upsert repo alias", "err", err)
692 return nil
693 }
694
695 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
696 return nil
697}
698
699func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
700 var err error
701 switch event.Kind {
702 case jmodels.EventKindIdentity:
703 err = h.resolver.InvalidateIdent(ctx, event.Did)
704 case jmodels.EventKindCommit:
705 switch event.Commit.Collection {
706 case tangled.PublicKeyNSID:
707 err = h.processPublicKey(ctx, event)
708 case tangled.KnotMemberNSID:
709 err = h.processKnotMember(ctx, event)
710 case tangled.RepoNSID:
711 err = h.processRepo(ctx, event)
712 case tangled.RepoPullNSID:
713 err = h.processPull(ctx, event)
714 case tangled.RepoCollaboratorNSID:
715 err = h.processCollaborator(ctx, event)
716 }
717 default:
718 return nil
719 }
720
721 if err != nil {
722 args := []any{"kind", event.Kind, "err", err}
723 if event.Kind == jmodels.EventKindCommit {
724 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey)
725 }
726 h.l.Warn("failed to process event, skipping", args...)
727 }
728
729 lastTimeUs := event.TimeUS + 1
730 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
731 h.l.Error("failed to save cursor", "err", saveErr)
732 }
733
734 return nil
735}