Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "strings" 8 9 jmodels "github.com/bluesky-social/jetstream/pkg/models" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/knotserver/db" 12 knotxrpc "tangled.org/core/knotserver/xrpc" 13 "tangled.org/core/log" 14) 15 16func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 17 l := log.FromContext(ctx) 18 raw := json.RawMessage(event.Commit.Record) 19 did := event.Did 20 21 var record tangled.PublicKey 22 if err := json.Unmarshal(raw, &record); err != nil { 23 return fmt.Errorf("failed to unmarshal record: %w", err) 24 } 25 26 pk := db.PublicKey{ 27 Did: did, 28 PublicKey: record, 29 } 30 if err := h.db.AddPublicKey(pk); err != nil { 31 l.Error("failed to add public key", "error", err) 32 return fmt.Errorf("failed to add public key: %w", err) 33 } 34 l.Info("added public key from firehose", "did", did) 35 return nil 36} 37 38// returns a repo path on disk if present, and error if not 39type targetRepo struct { 40 RepoPath string 41 OwnerDid string 42 RepoName string 43 RepoDid string 44 DefaultBranch string // default branch 45} 46 47func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 48 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 49 50 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 51 if rkey == "" { 52 return nil 53 } 54 55 if event.Commit.Operation == jmodels.CommitOperationDelete { 56 return nil 57 } 58 59 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 60 return nil 61 } 62 63 raw := json.RawMessage(event.Commit.Record) 64 var record tangled.Repo 65 if err := json.Unmarshal(raw, &record); err != nil { 66 return fmt.Errorf("failed to unmarshal repo record: %w", err) 67 } 68 69 if record.Knot != h.c.Server.Hostname { 70 return nil 71 } 72 if record.RepoDid == nil || *record.RepoDid == "" { 73 l.Info("skipping repo event without repoDid") 74 return nil 75 } 76 repoDid := *record.RepoDid 77 78 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 79 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 80 return nil 81 } 82 83 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 84 if lookupErr != nil { 85 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 86 return nil 87 } 88 if ownerDid != event.Did { 89 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 90 return nil 91 } 92 93 alias := db.RepoAlias{ 94 OwnerDid: event.Did, 95 Rkey: rkey, 96 RepoDid: repoDid, 97 Rev: event.Commit.Rev, 98 } 99 if err := h.db.UpsertRepoAlias(alias); err != nil { 100 l.Warn("failed to upsert repo alias", "err", err) 101 return nil 102 } 103 104 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 105 return nil 106} 107 108func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 109 var err error 110 switch event.Kind { 111 case jmodels.EventKindIdentity: 112 err = h.resolver.InvalidateIdent(ctx, event.Did) 113 case jmodels.EventKindCommit: 114 switch event.Commit.Collection { 115 case tangled.PublicKeyNSID: 116 err = h.processPublicKey(ctx, event) 117 case tangled.RepoNSID: 118 err = h.processRepo(ctx, event) 119 } 120 default: 121 return nil 122 } 123 124 if err != nil { 125 args := []any{"kind", event.Kind, "err", err} 126 if event.Kind == jmodels.EventKindCommit { 127 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 128 } 129 h.l.Warn("failed to process event, skipping", args...) 130 } 131 132 lastTimeUs := event.TimeUS + 1 133 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 134 h.l.Error("failed to save cursor", "err", saveErr) 135 } 136 137 return nil 138}