Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "strings"
8
9 jmodels "github.com/bluesky-social/jetstream/pkg/models"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/knotserver/db"
12 knotxrpc "tangled.org/core/knotserver/xrpc"
13 "tangled.org/core/log"
14)
15
16func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
17 l := log.FromContext(ctx)
18 raw := json.RawMessage(event.Commit.Record)
19 did := event.Did
20
21 var record tangled.PublicKey
22 if err := json.Unmarshal(raw, &record); err != nil {
23 return fmt.Errorf("failed to unmarshal record: %w", err)
24 }
25
26 pk := db.PublicKey{
27 Did: did,
28 PublicKey: record,
29 }
30 if err := h.db.AddPublicKey(pk); err != nil {
31 l.Error("failed to add public key", "error", err)
32 return fmt.Errorf("failed to add public key: %w", err)
33 }
34 l.Info("added public key from firehose", "did", did)
35 return nil
36}
37
38// returns a repo path on disk if present, and error if not
39type targetRepo struct {
40 RepoPath string
41 OwnerDid string
42 RepoName string
43 RepoDid string
44 DefaultBranch string // default branch
45}
46
47func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
48 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
49
50 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
51 if rkey == "" {
52 return nil
53 }
54
55 if event.Commit.Operation == jmodels.CommitOperationDelete {
56 return nil
57 }
58
59 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
60 return nil
61 }
62
63 raw := json.RawMessage(event.Commit.Record)
64 var record tangled.Repo
65 if err := json.Unmarshal(raw, &record); err != nil {
66 return fmt.Errorf("failed to unmarshal repo record: %w", err)
67 }
68
69 if record.Knot != h.c.Server.Hostname {
70 return nil
71 }
72 if record.RepoDid == nil || *record.RepoDid == "" {
73 l.Info("skipping repo event without repoDid")
74 return nil
75 }
76 repoDid := *record.RepoDid
77
78 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
79 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
80 return nil
81 }
82
83 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
84 if lookupErr != nil {
85 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
86 return nil
87 }
88 if ownerDid != event.Did {
89 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
90 return nil
91 }
92
93 alias := db.RepoAlias{
94 OwnerDid: event.Did,
95 Rkey: rkey,
96 RepoDid: repoDid,
97 Rev: event.Commit.Rev,
98 }
99 if err := h.db.UpsertRepoAlias(alias); err != nil {
100 l.Warn("failed to upsert repo alias", "err", err)
101 return nil
102 }
103
104 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
105 return nil
106}
107
108func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
109 var err error
110 switch event.Kind {
111 case jmodels.EventKindIdentity:
112 err = h.resolver.InvalidateIdent(ctx, event.Did)
113 case jmodels.EventKindCommit:
114 switch event.Commit.Collection {
115 case tangled.PublicKeyNSID:
116 err = h.processPublicKey(ctx, event)
117 case tangled.RepoNSID:
118 err = h.processRepo(ctx, event)
119 }
120 default:
121 return nil
122 }
123
124 if err != nil {
125 args := []any{"kind", event.Kind, "err", err}
126 if event.Kind == jmodels.EventKindCommit {
127 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey)
128 }
129 h.l.Warn("failed to process event, skipping", args...)
130 }
131
132 lastTimeUs := event.TimeUS + 1
133 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
134 h.l.Error("failed to save cursor", "err", saveErr)
135 }
136
137 return nil
138}