Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "tangled.org/core/api/tangled"
10 "tangled.org/core/spindle/db"
11 "tangled.org/core/tapc"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/jetstream/pkg/models"
15)
16
17type Ingester func(ctx context.Context, e *models.Event) error
18
19func (s *Spindle) ingest() Ingester {
20 return func(ctx context.Context, e *models.Event) error {
21 if e.Kind != models.EventKindCommit {
22 return nil
23 }
24
25 var err error
26 switch e.Commit.Collection {
27 case tangled.SpindleMemberNSID:
28 err = s.ingestMember(ctx, e)
29 case tangled.RepoNSID, tangled.RepoCollaboratorNSID:
30 if evt, ok := jetstreamToTapEvent(e); ok {
31 err = s.tap.processEvent(ctx, evt)
32 }
33 }
34
35 if err != nil {
36 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err)
37 }
38
39 lastTimeUs := e.TimeUS + 1
40 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil {
41 s.l.Error("failed to save cursor", "err", saveErr)
42 }
43
44 return nil
45 }
46}
47
48func jetstreamToTapEvent(e *models.Event) (tapc.Event, bool) {
49 if e.Commit == nil {
50 return tapc.Event{}, false
51 }
52 did, err := syntax.ParseDID(e.Did)
53 if err != nil {
54 return tapc.Event{}, false
55 }
56 var action tapc.RecordAction
57 switch e.Commit.Operation {
58 case models.CommitOperationCreate:
59 action = tapc.RecordCreateAction
60 case models.CommitOperationUpdate:
61 action = tapc.RecordUpdateAction
62 case models.CommitOperationDelete:
63 action = tapc.RecordDeleteAction
64 default:
65 return tapc.Event{}, false
66 }
67 return tapc.Event{
68 Type: tapc.EvtRecord,
69 Record: &tapc.RecordEventData{
70 Did: did,
71 Rkey: syntax.RecordKey(e.Commit.RKey),
72 Collection: syntax.NSID(e.Commit.Collection),
73 Action: action,
74 Record: e.Commit.Record,
75 },
76 }, true
77}
78
79func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
80 var err error
81 did := e.Did
82 rkey := e.Commit.RKey
83
84 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
85
86 switch e.Commit.Operation {
87 case models.CommitOperationCreate, models.CommitOperationUpdate:
88 raw := e.Commit.Record
89 record := tangled.SpindleMember{}
90 err = json.Unmarshal(raw, &record)
91 if err != nil {
92 l.Error("invalid record", "error", err)
93 return err
94 }
95
96 domain := s.cfg.Server.Hostname
97 recordInstance := record.Instance
98
99 if recordInstance != domain {
100 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
101 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
102 }
103
104 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
105 if err != nil || !ok {
106 l.Error("failed to add member", "did", did, "error", err)
107 return fmt.Errorf("failed to enforce permissions: %w", err)
108 }
109
110 if err := db.AddSpindleMember(s.db, db.SpindleMember{
111 Did: syntax.DID(did),
112 Rkey: rkey,
113 Instance: recordInstance,
114 Subject: syntax.DID(record.Subject),
115 Created: time.Now(),
116 }); err != nil {
117 l.Error("failed to add member", "error", err)
118 return fmt.Errorf("failed to add member: %w", err)
119 }
120
121 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
122 l.Error("failed to add member", "error", err)
123 return fmt.Errorf("failed to add member: %w", err)
124 }
125 l.Info("added member from firehose", "member", record.Subject)
126
127 if err := s.db.AddDid(record.Subject); err != nil {
128 l.Error("failed to add did", "error", err)
129 return fmt.Errorf("failed to add did: %w", err)
130 }
131 s.jc.AddDid(record.Subject)
132
133 return nil
134
135 case models.CommitOperationDelete:
136 record, err := db.GetSpindleMember(s.db, did, rkey)
137 if err != nil {
138 l.Error("failed to find member", "error", err)
139 return fmt.Errorf("failed to find member: %w", err)
140 }
141
142 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
143 l.Error("failed to remove member", "error", err)
144 return fmt.Errorf("failed to remove member: %w", err)
145 }
146
147 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
148 l.Error("failed to add member", "error", err)
149 return fmt.Errorf("failed to add member: %w", err)
150 }
151 l.Info("added member from firehose", "member", record.Subject)
152
153 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
154 l.Error("failed to add did", "error", err)
155 return fmt.Errorf("failed to add did: %w", err)
156 }
157 s.jc.RemoveDid(record.Subject.String())
158
159 }
160 return nil
161}