forked from
willdot.net/cocoon
A fork of the Cocoon PDS but being made more distributed.
1package server
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "sync"
10 "time"
11
12 "github.com/avast/retry-go/v4"
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/events"
15 indigomodels "github.com/bluesky-social/indigo/models"
16 cbg "github.com/whyrusleeping/cbor-gen"
17 "gorm.io/gorm"
18
19 "github.com/haileyok/cocoon/models"
20)
21
22type DbPersister struct {
23 Db *gorm.DB
24
25 Lk sync.Mutex
26
27 Broadcast func(*events.XRPCStreamEvent)
28
29 // how long do we actually want to keep these things around
30 Retention time.Duration
31}
32
33func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) {
34 if err := db.AutoMigrate(&models.EventRecord{}); err != nil {
35 return nil, fmt.Errorf("failed to migrate EventRecord: %w", err)
36 }
37
38 if retention == 0 {
39 retention = 72 * time.Hour
40 }
41
42 p := &DbPersister{
43 Db: db,
44 Retention: retention,
45 }
46
47 go p.cleanupRoutine()
48
49 return p, nil
50}
51
52func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
53 p.Broadcast = brc
54}
55
56func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
57 p.Lk.Lock()
58 defer p.Lk.Unlock()
59
60 rec := &models.EventRecord{}
61 if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil {
62 slog.Error("fetching most recent event record", "error", err)
63 rec.Seq = time.Now().UnixMilli()
64 }
65
66 // if the error on inserting the event record is a constraint error, it means that
67 // another event has been stored since getting the last sequence number. In that case
68 // retry which will increase the sequence number again and hopefully work. Any others
69 // can error out.
70 retryIfFunc := func(err error) bool {
71 return errors.Is(err, gorm.ErrDuplicatedKey)
72 }
73 err := retry.Do(func() error {
74 rec.Seq++
75
76 var did string
77 var evtType string
78
79 switch {
80 case e.RepoCommit != nil:
81 e.RepoCommit.Seq = rec.Seq
82 did = e.RepoCommit.Repo
83 evtType = "commit"
84 case e.RepoSync != nil:
85 e.RepoSync.Seq = rec.Seq
86 did = e.RepoSync.Did
87 evtType = "sync"
88 case e.RepoIdentity != nil:
89 e.RepoIdentity.Seq = rec.Seq
90 did = e.RepoIdentity.Did
91 evtType = "identity"
92 case e.RepoAccount != nil:
93 e.RepoAccount.Seq = rec.Seq
94 did = e.RepoAccount.Did
95 evtType = "account"
96 default:
97 return fmt.Errorf("unknown event type")
98 }
99
100 data, err := serializeEvent(e)
101 if err != nil {
102 return fmt.Errorf("failed to serialize event: %w", err)
103 }
104
105 rec.CreatedAt = time.Now()
106 rec.Did = did
107 rec.Type = evtType
108 rec.Data = data
109
110 err = p.Db.Create(rec).Error
111 if err != nil {
112 return fmt.Errorf("failed to persist event: %w", err)
113 }
114
115 return nil
116 }, retry.RetryIf(retryIfFunc))
117
118 if err != nil {
119 return err
120 }
121
122 if p.Broadcast != nil {
123 p.Broadcast(e)
124 }
125
126 return nil
127}
128
129func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
130 const pageSize = 500
131
132 cursor := since
133 for {
134 var records []models.EventRecord
135 if err := p.Db.WithContext(ctx).
136 Where("seq > ?", cursor).
137 Order("seq asc").
138 Limit(pageSize).
139 Find(&records).Error; err != nil {
140 return fmt.Errorf("failed to query events: %w", err)
141 }
142
143 slog.Info("playback", "len of records", len(records), "cursor", cursor)
144
145 if len(records) == 0 {
146 return nil
147 }
148
149 for _, rec := range records {
150 evt, err := deserializeEvent(rec.Type, rec.Data)
151 if err != nil {
152 return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err)
153 }
154
155 if err := cb(evt); err != nil {
156 return err
157 }
158
159 cursor = rec.Seq
160 }
161
162 if len(records) < pageSize {
163 return nil
164 }
165 }
166}
167
168func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error {
169 return nil
170}
171
172func (p *DbPersister) Flush(ctx context.Context) error {
173 return nil
174}
175
176func (p *DbPersister) Shutdown(ctx context.Context) error {
177 return nil
178}
179
180func (p *DbPersister) cleanupRoutine() {
181 ticker := time.NewTicker(time.Hour)
182 defer ticker.Stop()
183
184 for range ticker.C {
185 cutoff := time.Now().Add(-p.Retention)
186 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil {
187 continue
188 }
189 }
190}
191
192func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) {
193 buf := new(bytes.Buffer)
194 cw := cbg.NewCborWriter(buf)
195
196 switch {
197 case e.RepoCommit != nil:
198 if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
199 return nil, err
200 }
201 case e.RepoSync != nil:
202 if err := e.RepoSync.MarshalCBOR(cw); err != nil {
203 return nil, err
204 }
205 case e.RepoIdentity != nil:
206 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil {
207 return nil, err
208 }
209 case e.RepoAccount != nil:
210 if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
211 return nil, err
212 }
213 default:
214 return nil, fmt.Errorf("unknown event type")
215 }
216
217 return buf.Bytes(), nil
218}
219
220func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) {
221 r := bytes.NewReader(data)
222 cr := cbg.NewCborReader(r)
223
224 switch evtType {
225 case "commit":
226 evt := &atproto.SyncSubscribeRepos_Commit{}
227 if err := evt.UnmarshalCBOR(cr); err != nil {
228 return nil, err
229 }
230 return &events.XRPCStreamEvent{RepoCommit: evt}, nil
231 case "sync":
232 evt := &atproto.SyncSubscribeRepos_Sync{}
233 if err := evt.UnmarshalCBOR(cr); err != nil {
234 return nil, err
235 }
236 return &events.XRPCStreamEvent{RepoSync: evt}, nil
237 case "identity":
238 evt := &atproto.SyncSubscribeRepos_Identity{}
239 if err := evt.UnmarshalCBOR(cr); err != nil {
240 return nil, err
241 }
242 return &events.XRPCStreamEvent{RepoIdentity: evt}, nil
243 case "account":
244 evt := &atproto.SyncSubscribeRepos_Account{}
245 if err := evt.UnmarshalCBOR(cr); err != nil {
246 return nil, err
247 }
248 return &events.XRPCStreamEvent{RepoAccount: evt}, nil
249 default:
250 return nil, fmt.Errorf("unknown event type: %s", evtType)
251 }
252}