forked from
willdot.net/cocoon
A fork of the Cocoon PDS but being made more distributed.
1package server
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "sync"
10 "time"
11
12 "github.com/avast/retry-go/v4"
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/events"
15 indigomodels "github.com/bluesky-social/indigo/models"
16 cbg "github.com/whyrusleeping/cbor-gen"
17 "gorm.io/gorm"
18
19 "github.com/haileyok/cocoon/models"
20)
21
22type DbPersister struct {
23 Db *gorm.DB
24
25 Lk sync.Mutex
26
27 Broadcast func(*events.XRPCStreamEvent)
28
29 // how long do we actually want to keep these things around
30 Retention time.Duration
31}
32
33func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) {
34 if err := db.AutoMigrate(&models.EventRecord{}); err != nil {
35 return nil, fmt.Errorf("failed to migrate EventRecord: %w", err)
36 }
37
38 if retention == 0 {
39 retention = 72 * time.Hour
40 }
41
42 p := &DbPersister{
43 Db: db,
44 Retention: retention,
45 }
46
47 go p.cleanupRoutine()
48
49 return p, nil
50}
51
52func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
53 p.Broadcast = brc
54}
55
56func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
57 p.Lk.Lock()
58 defer p.Lk.Unlock()
59
60 rec := &models.EventRecord{}
61 if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil {
62 slog.Error("fetching most recent event record", "error", err)
63 rec.Seq = time.Now().UnixMilli()
64 }
65
66 // if the error on inserting the event record is a constraint error, it means that
67 // another event has been stored since getting the last sequence number. In that case
68 // retry which will increase the sequence number again and hopefully work. Any others
69 // can error out.
70 retryIfFunc := func(err error) bool {
71 return errors.Is(err, gorm.ErrDuplicatedKey)
72 }
73 err := retry.Do(func() error {
74 rec.Seq++
75
76 var did string
77 var evtType string
78
79 switch {
80 case e.RepoCommit != nil:
81 e.RepoCommit.Seq = rec.Seq
82 did = e.RepoCommit.Repo
83 evtType = "commit"
84 case e.RepoSync != nil:
85 e.RepoSync.Seq = rec.Seq
86 did = e.RepoSync.Did
87 evtType = "sync"
88 case e.RepoIdentity != nil:
89 e.RepoIdentity.Seq = rec.Seq
90 did = e.RepoIdentity.Did
91 evtType = "identity"
92 case e.RepoAccount != nil:
93 e.RepoAccount.Seq = rec.Seq
94 did = e.RepoAccount.Did
95 evtType = "account"
96 default:
97 return fmt.Errorf("unknown event type")
98 }
99
100 data, err := serializeEvent(e)
101 if err != nil {
102 return fmt.Errorf("failed to serialize event: %w", err)
103 }
104
105 rec.CreatedAt = time.Now()
106 rec.Did = did
107 rec.Type = evtType
108 rec.Data = data
109
110 err = p.Db.Create(rec).Error
111 if err != nil {
112 return fmt.Errorf("failed to persist event: %w", err)
113 }
114
115 return nil
116 }, retry.RetryIf(retryIfFunc))
117
118 if err != nil {
119 slog.Error(err.Error())
120 return err
121 }
122
123 if p.Broadcast != nil {
124 p.Broadcast(e)
125 }
126
127 return nil
128}
129
130func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
131 const pageSize = 500
132
133 cursor := since
134 for {
135 var records []models.EventRecord
136 if err := p.Db.WithContext(ctx).
137 Where("seq > ?", cursor).
138 Order("seq asc").
139 Limit(pageSize).
140 Find(&records).Error; err != nil {
141 return fmt.Errorf("failed to query events: %w", err)
142 }
143
144 slog.Info("playback", "len of records", len(records), "cursor", cursor)
145
146 if len(records) == 0 {
147 return nil
148 }
149
150 for _, rec := range records {
151 evt, err := deserializeEvent(rec.Type, rec.Data)
152 if err != nil {
153 return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err)
154 }
155
156 if err := cb(evt); err != nil {
157 return err
158 }
159
160 cursor = rec.Seq
161 }
162
163 if len(records) < pageSize {
164 return nil
165 }
166 }
167}
168
169func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error {
170 return nil
171}
172
173func (p *DbPersister) Flush(ctx context.Context) error {
174 return nil
175}
176
177func (p *DbPersister) Shutdown(ctx context.Context) error {
178 return nil
179}
180
181func (p *DbPersister) cleanupRoutine() {
182 ticker := time.NewTicker(time.Hour)
183 defer ticker.Stop()
184
185 for range ticker.C {
186 cutoff := time.Now().Add(-p.Retention)
187 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil {
188 continue
189 }
190 }
191}
192
193func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) {
194 buf := new(bytes.Buffer)
195 cw := cbg.NewCborWriter(buf)
196
197 switch {
198 case e.RepoCommit != nil:
199 if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
200 return nil, err
201 }
202 case e.RepoSync != nil:
203 if err := e.RepoSync.MarshalCBOR(cw); err != nil {
204 return nil, err
205 }
206 case e.RepoIdentity != nil:
207 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil {
208 return nil, err
209 }
210 case e.RepoAccount != nil:
211 if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
212 return nil, err
213 }
214 default:
215 return nil, fmt.Errorf("unknown event type")
216 }
217
218 return buf.Bytes(), nil
219}
220
221func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) {
222 r := bytes.NewReader(data)
223 cr := cbg.NewCborReader(r)
224
225 switch evtType {
226 case "commit":
227 evt := &atproto.SyncSubscribeRepos_Commit{}
228 if err := evt.UnmarshalCBOR(cr); err != nil {
229 return nil, err
230 }
231 return &events.XRPCStreamEvent{RepoCommit: evt}, nil
232 case "sync":
233 evt := &atproto.SyncSubscribeRepos_Sync{}
234 if err := evt.UnmarshalCBOR(cr); err != nil {
235 return nil, err
236 }
237 return &events.XRPCStreamEvent{RepoSync: evt}, nil
238 case "identity":
239 evt := &atproto.SyncSubscribeRepos_Identity{}
240 if err := evt.UnmarshalCBOR(cr); err != nil {
241 return nil, err
242 }
243 return &events.XRPCStreamEvent{RepoIdentity: evt}, nil
244 case "account":
245 evt := &atproto.SyncSubscribeRepos_Account{}
246 if err := evt.UnmarshalCBOR(cr); err != nil {
247 return nil, err
248 }
249 return &events.XRPCStreamEvent{RepoAccount: evt}, nil
250 default:
251 return nil, fmt.Errorf("unknown event type: %s", evtType)
252 }
253}