Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "strings" 8 9 jmodels "github.com/bluesky-social/jetstream/pkg/models" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/knotserver/db" 12 knotxrpc "tangled.org/core/knotserver/xrpc" 13 "tangled.org/core/log" 14) 15 16func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 17 l := log.FromContext(ctx).With("handler", "processPublicKey", "did", event.Did, "rkey", event.Commit.RKey) 18 did := syntax.DID(event.Did) 19 rkey := syntax.RecordKey(event.Commit.RKey) 20 21 switch event.Commit.Operation { 22 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 23 var record tangled.PublicKey 24 if err := json.Unmarshal(json.RawMessage(event.Commit.Record), &record); err != nil { 25 return fmt.Errorf("failed to unmarshal record: %w", err) 26 } 27 28 pk := db.PublicKey{ 29 Did: did, 30 Rkey: rkey, 31 PublicKey: record, 32 } 33 if err := h.db.UpsertPublicKey(pk); err != nil { 34 return fmt.Errorf("failed to upsert public key: %w", err) 35 } 36 l.Info("upserted public key from firehose") 37 case jmodels.CommitOperationDelete: 38 if err := h.db.DeletePublicKeyByRkey(did, rkey); err != nil { 39 return fmt.Errorf("failed to delete public key: %w", err) 40 } 41 l.Info("deleted public key from firehose") 42 } 43 44 return nil 45} 46 47// returns a repo path on disk if present, and error if not 48type targetRepo struct { 49 RepoPath string 50 OwnerDid string 51 RepoName string 52 RepoDid string 53 DefaultBranch string // default branch 54} 55 56func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 57 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 58 59 rkey := strings.TrimSuffix(strings.TrimSpace(event.Commit.RKey), ".git") 60 if rkey == "" { 61 return nil 62 } 63 64 if event.Commit.Operation == jmodels.CommitOperationDelete { 65 return nil 66 } 67 68 if event.Commit.Operation != jmodels.CommitOperationCreate && event.Commit.Operation != jmodels.CommitOperationUpdate { 69 return nil 70 } 71 72 raw := json.RawMessage(event.Commit.Record) 73 var record tangled.Repo 74 if err := json.Unmarshal(raw, &record); err != nil { 75 return fmt.Errorf("failed to unmarshal repo record: %w", err) 76 } 77 78 if record.Knot != h.c.Server.Hostname { 79 return nil 80 } 81 if record.RepoDid == nil || *record.RepoDid == "" { 82 l.Info("skipping repo event without repoDid") 83 return nil 84 } 85 repoDid := *record.RepoDid 86 87 if err := knotxrpc.ValidateRepoName(rkey); err != nil { 88 l.Warn("skipping repo event with invalid rkey", "repoDid", repoDid, "rkey", rkey, "err", err) 89 return nil 90 } 91 92 ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(repoDid) 93 if lookupErr != nil { 94 l.Info("skipping repo event for unknown repoDid", "repoDid", repoDid) 95 return nil 96 } 97 if ownerDid != event.Did { 98 l.Warn("repo event author does not own repoDid", "repoDid", repoDid, "author", event.Did) 99 return nil 100 } 101 102 alias := db.RepoAlias{ 103 OwnerDid: event.Did, 104 Rkey: rkey, 105 RepoDid: repoDid, 106 Rev: event.Commit.Rev, 107 } 108 if err := h.db.UpsertRepoAlias(alias); err != nil { 109 l.Warn("failed to upsert repo alias", "err", err) 110 return nil 111 } 112 113 l.Info("recorded repo alias", "repoDid", repoDid, "rkey", rkey, "rev", event.Commit.Rev) 114 return nil 115} 116 117func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 118 var err error 119 switch event.Kind { 120 case jmodels.EventKindIdentity: 121 err = h.resolver.InvalidateIdent(ctx, event.Did) 122 case jmodels.EventKindCommit: 123 switch event.Commit.Collection { 124 case tangled.PublicKeyNSID: 125 err = h.processPublicKey(ctx, event) 126 case tangled.RepoNSID: 127 err = h.processRepo(ctx, event) 128 } 129 default: 130 return nil 131 } 132 133 if err != nil { 134 args := []any{"kind", event.Kind, "err", err} 135 if event.Kind == jmodels.EventKindCommit { 136 args = append(args, "nsid", event.Commit.Collection, "did", event.Did, "rkey", event.Commit.RKey) 137 } 138 h.l.Warn("failed to process event, skipping", args...) 139 } 140 141 lastTimeUs := event.TimeUS + 1 142 if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 143 h.l.Error("failed to save cursor", "err", saveErr) 144 } 145 146 return nil 147}