forked from
willdot.net/cocoon
A fork of the Cocoon PDS but being made more distributed.
1package server
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "sync"
10 "time"
11
12 "github.com/avast/retry-go/v4"
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/events"
15 indigomodels "github.com/bluesky-social/indigo/models"
16 cbg "github.com/whyrusleeping/cbor-gen"
17 "gorm.io/gorm"
18
19 "github.com/haileyok/cocoon/models"
20)
21
22type DbPersister struct {
23 Db *gorm.DB
24
25 Lk sync.Mutex
26
27 Broadcast func(*events.XRPCStreamEvent)
28
29 // how long do we actually want to keep these things around
30 Retention time.Duration
31}
32
33func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) {
34 if err := db.AutoMigrate(&models.EventRecord{}); err != nil {
35 return nil, fmt.Errorf("failed to migrate EventRecord: %w", err)
36 }
37
38 if retention == 0 {
39 retention = 72 * time.Hour
40 }
41
42 p := &DbPersister{
43 Db: db,
44 Retention: retention,
45 }
46
47 go p.cleanupRoutine()
48
49 return p, nil
50}
51
52func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
53 p.Broadcast = brc
54}
55
56func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
57 p.Lk.Lock()
58 defer p.Lk.Unlock()
59
60 rec := &models.EventRecord{}
61 if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil {
62 slog.Error("fetching most recent event record", "error", err)
63 rec.Seq = time.Now().Unix()
64 }
65
66 // if the error on inserting the event record is a constraint error, it means that
67 // another event has been stored since getting the last sequence number. In that case
68 // retry which will increase the sequence number again and hopefully work. Any others
69 // can error out.
70 retryIfFunc := func(err error) bool {
71 return errors.Is(err, gorm.ErrDuplicatedKey)
72 }
73 err := retry.Do(func() error {
74 rec.Seq++
75
76 var did string
77 var evtType string
78
79 switch {
80 case e.RepoCommit != nil:
81 e.RepoCommit.Seq = rec.Seq
82 did = e.RepoCommit.Repo
83 evtType = "commit"
84 case e.RepoSync != nil:
85 e.RepoSync.Seq = rec.Seq
86 did = e.RepoSync.Did
87 evtType = "sync"
88 case e.RepoIdentity != nil:
89 e.RepoIdentity.Seq = rec.Seq
90 did = e.RepoIdentity.Did
91 evtType = "identity"
92 case e.RepoAccount != nil:
93 e.RepoAccount.Seq = rec.Seq
94 did = e.RepoAccount.Did
95 evtType = "account"
96 default:
97 return fmt.Errorf("unknown event type")
98 }
99
100 data, err := serializeEvent(e)
101 if err != nil {
102 return fmt.Errorf("failed to serialize event: %w", err)
103 }
104
105 rec.CreatedAt = time.Now()
106 rec.Did = did
107 rec.Type = evtType
108 rec.Data = data
109
110 err = p.Db.Create(rec).Error
111 if err != nil {
112 return fmt.Errorf("failed to persist event: %w", err)
113 }
114
115 return nil
116 }, retry.RetryIf(retryIfFunc))
117
118 if err != nil {
119 return err
120 }
121
122 if p.Broadcast != nil {
123 p.Broadcast(e)
124 }
125
126 return nil
127}
128
129func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
130 const pageSize = 500
131
132 cursor := since
133 for {
134 var records []models.EventRecord
135 if err := p.Db.WithContext(ctx).
136 Where("seq > ?", cursor).
137 Order("seq asc").
138 Limit(pageSize).
139 Find(&records).Error; err != nil {
140 return fmt.Errorf("failed to query events: %w", err)
141 }
142
143 if len(records) == 0 {
144 return nil
145 }
146
147 for _, rec := range records {
148 evt, err := deserializeEvent(rec.Type, rec.Data)
149 if err != nil {
150 return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err)
151 }
152
153 if err := cb(evt); err != nil {
154 return err
155 }
156
157 cursor = rec.Seq
158 }
159
160 if len(records) < pageSize {
161 return nil
162 }
163 }
164}
165
166func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error {
167 return nil
168}
169
170func (p *DbPersister) Flush(ctx context.Context) error {
171 return nil
172}
173
174func (p *DbPersister) Shutdown(ctx context.Context) error {
175 return nil
176}
177
178func (p *DbPersister) cleanupRoutine() {
179 ticker := time.NewTicker(time.Hour)
180 defer ticker.Stop()
181
182 for range ticker.C {
183 cutoff := time.Now().Add(-p.Retention)
184 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil {
185 continue
186 }
187 }
188}
189
190func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) {
191 buf := new(bytes.Buffer)
192 cw := cbg.NewCborWriter(buf)
193
194 switch {
195 case e.RepoCommit != nil:
196 if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
197 return nil, err
198 }
199 case e.RepoSync != nil:
200 if err := e.RepoSync.MarshalCBOR(cw); err != nil {
201 return nil, err
202 }
203 case e.RepoIdentity != nil:
204 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil {
205 return nil, err
206 }
207 case e.RepoAccount != nil:
208 if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
209 return nil, err
210 }
211 default:
212 return nil, fmt.Errorf("unknown event type")
213 }
214
215 return buf.Bytes(), nil
216}
217
218func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) {
219 r := bytes.NewReader(data)
220 cr := cbg.NewCborReader(r)
221
222 switch evtType {
223 case "commit":
224 evt := &atproto.SyncSubscribeRepos_Commit{}
225 if err := evt.UnmarshalCBOR(cr); err != nil {
226 return nil, err
227 }
228 return &events.XRPCStreamEvent{RepoCommit: evt}, nil
229 case "sync":
230 evt := &atproto.SyncSubscribeRepos_Sync{}
231 if err := evt.UnmarshalCBOR(cr); err != nil {
232 return nil, err
233 }
234 return &events.XRPCStreamEvent{RepoSync: evt}, nil
235 case "identity":
236 evt := &atproto.SyncSubscribeRepos_Identity{}
237 if err := evt.UnmarshalCBOR(cr); err != nil {
238 return nil, err
239 }
240 return &events.XRPCStreamEvent{RepoIdentity: evt}, nil
241 case "account":
242 evt := &atproto.SyncSubscribeRepos_Account{}
243 if err := evt.UnmarshalCBOR(cr); err != nil {
244 return nil, err
245 }
246 return &events.XRPCStreamEvent{RepoAccount: evt}, nil
247 default:
248 return nil, fmt.Errorf("unknown event type: %s", evtType)
249 }
250}