Monorepo for Tangled
tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "strings"
8
9 jmodels "github.com/bluesky-social/jetstream/pkg/models"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/knotserver/db"
12 knotxrpc "tangled.org/core/knotserver/xrpc"
13 "tangled.org/core/log"
14)
15
16func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
17 l := log.FromContext(ctx).With("handler", "processPublicKey", "did", event.Did, "rkey", event.Commit.RKey)
18 did := syntax.DID(event.Did)
19 rkey := syntax.RecordKey(event.Commit.RKey)
20
21 switch event.Commit.Operation {
22 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
23 var record tangled.PublicKey
24 if err := json.Unmarshal(json.RawMessage(event.Commit.Record), &record); err != nil {
25 return fmt.Errorf("failed to unmarshal record: %w", err)
26 }
27
28 pk := db.PublicKey{
29 Did: did,
30 Rkey: rkey,
31 PublicKey: record,
32 }
33 if err := h.db.UpsertPublicKey(pk); err != nil {
34 return fmt.Errorf("failed to upsert public key: %w", err)
35 }
36 l.Info("upserted public key from firehose")
37 case jmodels.CommitOperationDelete:
38 if err := h.db.DeletePublicKeyByRkey(did, rkey); err != nil {
39 return fmt.Errorf("failed to delete public key: %w", err)
40 }
41 l.Info("deleted public key from firehose")
42 }
43
44 return nil
45}
46
47// returns a repo path on disk if present, and error if not
48type targetRepo struct {
49 RepoPath string
50 OwnerDid string
51 RepoName string
52 RepoDid string
53 DefaultBranch string // default branch
54}
55
56func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error {
57 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey)
58
59 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git")
60 if rkey == "" {
61 return nil
62 }
63
64 if event.Commit.Operation == jmodels.CommitOperationDelete {
65 return nil
66 }
67
68 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate {
69 return nil
70 }
71
72 raw := json.RawMessage(event.Commit.Record)
73 var record tangled.Repo
74 if err := json.Unmarshal(raw, &record); err != nil {
75 return fmt.Errorf("failed to unmarshal repo record: %w", err)
76 }
77
78 if record.Knot != h.c.Server.Hostname {
79 return nil
80 }
81 if record.RepoDid == nil || *record.RepoDid == "" {
82 l.Info("skipping repo event without repoDid")
83 return nil
84 }
85 repoDid := *record.RepoDid
86
87 if err := knotxrpc.ValidateRepoName(rkey); err != nil {
88 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err)
89 return nil
90 }
91
92 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid)
93 if lookupErr != nil {
94 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid)
95 return nil
96 }
97 if ownerDid != event.Did {
98 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did)
99 return nil
100 }
101
102 alias := db.RepoAlias{
103 OwnerDid: event.Did,
104 Rkey: rkey,
105 RepoDid: repoDid,
106 Rev: event.Commit.Rev,
107 }
108 if err := h.db.UpsertRepoAlias(alias); err != nil {
109 l.Warn("failed to upsert repo alias", "err", err)
110 return nil
111 }
112
113 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev)
114 return nil
115}
116
117func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
118 var err error
119 switch event.Kind {
120 case jmodels.EventKindIdentity:
121 err = h.resolver.InvalidateIdent(ctx, event.Did)
122 case jmodels.EventKindCommit:
123 switch event.Commit.Collection {
124 case tangled.PublicKeyNSID:
125 err = h.processPublicKey(ctx, event)
126 case tangled.RepoNSID:
127 err = h.processRepo(ctx, event)
128 }
129 default:
130 return nil
131 }
132
133 if err != nil {
134 args := []any{"kind", event.Kind, "err", err}
135 if event.Kind == jmodels.EventKindCommit {
136 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey)
137 }
138 h.l.Warn("failed to process event, skipping", args...)
139 }
140
141 lastTimeUs := event.TimeUS + 1
142 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
143 h.l.Error("failed to save cursor", "err", saveErr)
144 }
145
146 return nil
147}