Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/eventconsumer"
13 "tangled.org/core/rbac"
14 "tangled.org/core/spindle/db"
15
16 comatproto "github.com/bluesky-social/indigo/api/atproto"
17 "github.com/bluesky-social/indigo/atproto/identity"
18 "github.com/bluesky-social/indigo/atproto/syntax"
19 "github.com/bluesky-social/indigo/xrpc"
20 "github.com/bluesky-social/jetstream/pkg/models"
21 securejoin "github.com/cyphar/filepath-securejoin"
22)
23
24type Ingester func(ctx context.Context, e *models.Event) error
25
26func (s *Spindle) ingest() Ingester {
27 return func(ctx context.Context, e *models.Event) error {
28 if e.Kind != models.EventKindCommit {
29 return nil
30 }
31
32 var err error
33 switch e.Commit.Collection {
34 case tangled.SpindleMemberNSID:
35 err = s.ingestMember(ctx, e)
36 case tangled.RepoNSID:
37 err = s.ingestRepo(ctx, e)
38 case tangled.RepoCollaboratorNSID:
39 err = s.ingestCollaborator(ctx, e)
40 }
41
42 if err != nil {
43 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err)
44 }
45
46 lastTimeUs := e.TimeUS + 1
47 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
48 s.l.Error("failed to save cursor", "err", saveErr)
49 }
50
51 return nil
52 }
53}
54
55func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
56 var err error
57 did := e.Did
58 rkey := e.Commit.RKey
59
60 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
61
62 switch e.Commit.Operation {
63 case models.CommitOperationCreate, models.CommitOperationUpdate:
64 raw := e.Commit.Record
65 record := tangled.SpindleMember{}
66 err = json.Unmarshal(raw, &record)
67 if err != nil {
68 l.Error("invalid record", "error", err)
69 return err
70 }
71
72 domain := s.cfg.Server.Hostname
73 recordInstance := record.Instance
74
75 if recordInstance != domain {
76 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
77 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
78 }
79
80 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
81 if err != nil || !ok {
82 l.Error("failed to add member", "did", did, "error", err)
83 return fmt.Errorf("failed to enforce permissions: %w", err)
84 }
85
86 if err := db.AddSpindleMember(s.db, db.SpindleMember{
87 Did: syntax.DID(did),
88 Rkey: rkey,
89 Instance: recordInstance,
90 Subject: syntax.DID(record.Subject),
91 Created: time.Now(),
92 }); err != nil {
93 l.Error("failed to add member", "error", err)
94 return fmt.Errorf("failed to add member: %w", err)
95 }
96
97 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
98 l.Error("failed to add member", "error", err)
99 return fmt.Errorf("failed to add member: %w", err)
100 }
101 l.Info("added member from firehose", "member", record.Subject)
102
103 if err := s.db.AddDid(record.Subject); err != nil {
104 l.Error("failed to add did", "error", err)
105 return fmt.Errorf("failed to add did: %w", err)
106 }
107 s.jc.AddDid(record.Subject)
108
109 return nil
110
111 case models.CommitOperationDelete:
112 record, err := db.GetSpindleMember(s.db, did, rkey)
113 if err != nil {
114 l.Error("failed to find member", "error", err)
115 return fmt.Errorf("failed to find member: %w", err)
116 }
117
118 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
119 l.Error("failed to remove member", "error", err)
120 return fmt.Errorf("failed to remove member: %w", err)
121 }
122
123 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
124 l.Error("failed to add member", "error", err)
125 return fmt.Errorf("failed to add member: %w", err)
126 }
127 l.Info("added member from firehose", "member", record.Subject)
128
129 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
130 l.Error("failed to add did", "error", err)
131 return fmt.Errorf("failed to add did: %w", err)
132 }
133 s.jc.RemoveDid(record.Subject.String())
134
135 }
136 return nil
137}
138
139func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
140 var err error
141 did := e.Did
142
143 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
144
145 l.Info("ingesting repo record", "did", did)
146
147 switch e.Commit.Operation {
148 case models.CommitOperationCreate, models.CommitOperationUpdate:
149 raw := e.Commit.Record
150 record := tangled.Repo{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 l.Error("invalid record", "error", err)
154 return err
155 }
156
157 domain := s.cfg.Server.Hostname
158 rkey := e.Commit.RKey
159
160 // no spindle configured for this repo
161 if record.Spindle == nil {
162 l.Info("no spindle configured", "rkey", rkey)
163 return nil
164 }
165
166 // this repo did not want this spindle
167 if *record.Spindle != domain {
168 l.Info("different spindle configured", "rkey", rkey, "spindle", *record.Spindle, "domain", domain)
169 return nil
170 }
171
172 // add this repo to the watch list
173 if err := s.db.AddRepo(record.Knot, did, rkey); err != nil {
174 l.Error("failed to add repo", "error", err)
175 return fmt.Errorf("failed to add repo: %w", err)
176 }
177
178 didSlashRepo, err := securejoin.SecureJoin(did, rkey)
179 if err != nil {
180 return err
181 }
182
183 // add repo to rbac
184 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil {
185 l.Error("failed to add repo to enforcer", "error", err)
186 return fmt.Errorf("failed to add repo: %w", err)
187 }
188
189 // add collaborators to rbac
190 owner, err := s.res.ResolveIdent(ctx, did)
191 if err != nil || owner.Handle.IsInvalidHandle() {
192 return err
193 }
194 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
195 return err
196 }
197
198 // add this knot to the event consumer
199 src := eventconsumer.NewKnotSource(record.Knot)
200 s.ks.AddSource(context.Background(), src)
201
202 return nil
203
204 }
205 return nil
206}
207
208func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
209 var err error
210
211 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
212
213 l.Info("ingesting collaborator record")
214
215 switch e.Commit.Operation {
216 case models.CommitOperationCreate, models.CommitOperationUpdate:
217 raw := e.Commit.Record
218 record := tangled.RepoCollaborator{}
219 err = json.Unmarshal(raw, &record)
220 if err != nil {
221 l.Error("invalid record", "error", err)
222 return err
223 }
224
225 subjectId, err := s.res.ResolveIdent(ctx, record.Subject)
226 if err != nil || subjectId.Handle.IsInvalidHandle() {
227 return err
228 }
229
230 var rbacResource string
231 var ownerDid string
232 switch {
233 case strings.HasPrefix(record.Repo, "did:"):
234 resolvedOwner, repoName, lookupErr := s.resolveRepoDid(ctx, e.Did, record.Repo)
235 if lookupErr != nil {
236 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr)
237 }
238 ownerDid = resolvedOwner
239 rbacResource, _ = securejoin.SecureJoin(ownerDid, repoName)
240
241 case strings.Contains(record.Repo, "/"):
242 repoAt, parseErr := syntax.ParseATURI(record.Repo)
243 if parseErr != nil {
244 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
245 return nil
246 }
247
248 owner, resolveErr := s.res.ResolveIdent(ctx, repoAt.Authority().String())
249 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
250 return fmt.Errorf("failed to resolve handle: %w", resolveErr)
251 }
252
253 xrpcc := xrpc.Client{
254 Host: owner.PDSEndpoint(),
255 }
256
257 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
258 if getErr != nil {
259 return getErr
260 }
261
262 if _, ok := resp.Value.Val.(*tangled.Repo); !ok {
263 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt)
264 }
265 rbacResource, _ = securejoin.SecureJoin(owner.DID.String(), repoAt.RecordKey().String())
266 ownerDid = owner.DID.String()
267
268 default:
269 l.Info("rejecting collaborator record with unrecognized repo format", "repo", record.Repo)
270 return nil
271 }
272
273 if ok, err := s.e.IsCollaboratorInviteAllowed(ownerDid, rbac.ThisServer, rbacResource); !ok || err != nil {
274 return fmt.Errorf("insufficient permissions: %w", err)
275 }
276
277 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, rbacResource); err != nil {
278 l.Error("failed to add collaborator to enforcer", "error", err)
279 return fmt.Errorf("failed to add collaborator: %w", err)
280 }
281
282 return nil
283 }
284 return nil
285}
286
287func (s *Spindle) resolveRepoDid(ctx context.Context, ownerDid string, repoDid string) (string, string, error) {
288 owner, resolveErr := s.res.ResolveIdent(ctx, ownerDid)
289 if resolveErr != nil || owner.Handle.IsInvalidHandle() {
290 return "", "", fmt.Errorf("failed to resolve owner %s: %w", ownerDid, resolveErr)
291 }
292
293 xrpcc := xrpc.Client{
294 Host: owner.PDSEndpoint(),
295 }
296
297 cursor := ""
298 for {
299 resp, listErr := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoNSID, cursor, 100, ownerDid, false)
300 if listErr != nil {
301 return "", "", fmt.Errorf("failed to list repo records for %s: %w", ownerDid, listErr)
302 }
303
304 for _, r := range resp.Records {
305 if r == nil {
306 continue
307 }
308 repo, ok := r.Value.Val.(*tangled.Repo)
309 if !ok {
310 continue
311 }
312 if repo.RepoDid != nil && *repo.RepoDid == repoDid {
313 rkey := r.Uri[strings.LastIndex(r.Uri, "/")+1:]
314 return ownerDid, rkey, nil
315 }
316 }
317
318 if resp.Cursor == nil || *resp.Cursor == "" {
319 break
320 }
321 cursor = *resp.Cursor
322 }
323
324 return "", "", fmt.Errorf("repo DID %s not found in records for %s", repoDid, ownerDid)
325}
326
327func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
328 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
329
330 l.Info("fetching and adding existing collaborators")
331
332 xrpcc := xrpc.Client{
333 Host: owner.PDSEndpoint(),
334 }
335
336 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
337 if err != nil {
338 return err
339 }
340
341 var errs error
342 for _, r := range resp.Records {
343 if r == nil {
344 continue
345 }
346 record := r.Value.Val.(*tangled.RepoCollaborator)
347
348 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
349 l.Error("failed to add repo to enforcer", "error", err)
350 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
351 }
352 }
353
354 return errs
355}