A fork of the Cocoon PDS but being made more distributed.
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.8 kB View raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "log/slog" 9 "sync" 10 "time" 11 12 "github.com/avast/retry-go/v4" 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/events" 15 indigomodels "github.com/bluesky-social/indigo/models" 16 cbg "github.com/whyrusleeping/cbor-gen" 17 "gorm.io/gorm" 18 19 "github.com/haileyok/cocoon/models" 20) 21 22type DbPersister struct { 23 Db *gorm.DB 24 25 Lk sync.Mutex 26 27 Broadcast func(*events.XRPCStreamEvent) 28 29 // how long do we actually want to keep these things around 30 Retention time.Duration 31} 32 33func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) { 34 if err := db.AutoMigrate(&models.EventRecord{}); err != nil { 35 return nil, fmt.Errorf("failed to migrate EventRecord: %w", err) 36 } 37 38 if retention == 0 { 39 retention = 72 * time.Hour 40 } 41 42 p := &DbPersister{ 43 Db: db, 44 Retention: retention, 45 } 46 47 go p.cleanupRoutine() 48 49 return p, nil 50} 51 52func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 53 p.Broadcast = brc 54} 55 56func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 57 p.Lk.Lock() 58 defer p.Lk.Unlock() 59 60 rec := &models.EventRecord{} 61 if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil { 62 slog.Error("fetching most recent event record", "error", err) 63 rec.Seq = time.Now().UnixMilli() 64 } 65 66 // if the error on inserting the event record is a constraint error, it means that 67 // another event has been stored since getting the last sequence number. In that case 68 // retry which will increase the sequence number again and hopefully work. Any others 69 // can error out. 70 retryIfFunc := func(err error) bool { 71 return errors.Is(err, gorm.ErrDuplicatedKey) 72 } 73 err := retry.Do(func() error { 74 rec.Seq++ 75 76 var did string 77 var evtType string 78 79 switch { 80 case e.RepoCommit != nil: 81 e.RepoCommit.Seq = rec.Seq 82 did = e.RepoCommit.Repo 83 evtType = "commit" 84 case e.RepoSync != nil: 85 e.RepoSync.Seq = rec.Seq 86 did = e.RepoSync.Did 87 evtType = "sync" 88 case e.RepoIdentity != nil: 89 e.RepoIdentity.Seq = rec.Seq 90 did = e.RepoIdentity.Did 91 evtType = "identity" 92 case e.RepoAccount != nil: 93 e.RepoAccount.Seq = rec.Seq 94 did = e.RepoAccount.Did 95 evtType = "account" 96 default: 97 return fmt.Errorf("unknown event type") 98 } 99 100 data, err := serializeEvent(e) 101 if err != nil { 102 return fmt.Errorf("failed to serialize event: %w", err) 103 } 104 105 rec.CreatedAt = time.Now() 106 rec.Did = did 107 rec.Type = evtType 108 rec.Data = data 109 110 err = p.Db.Create(rec).Error 111 if err != nil { 112 return fmt.Errorf("failed to persist event: %w", err) 113 } 114 115 return nil 116 }, retry.RetryIf(retryIfFunc)) 117 118 if err != nil { 119 return err 120 } 121 122 if p.Broadcast != nil { 123 p.Broadcast(e) 124 } 125 126 return nil 127} 128 129func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 130 const pageSize = 500 131 132 cursor := since 133 for { 134 var records []models.EventRecord 135 if err := p.Db.WithContext(ctx). 136 Where("seq > ?", cursor). 137 Order("seq asc"). 138 Limit(pageSize). 139 Find(&records).Error; err != nil { 140 return fmt.Errorf("failed to query events: %w", err) 141 } 142 143 slog.Info("playback", "len of records", len(records), "cursor", cursor) 144 145 if len(records) == 0 { 146 return nil 147 } 148 149 for _, rec := range records { 150 evt, err := deserializeEvent(rec.Type, rec.Data) 151 if err != nil { 152 return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err) 153 } 154 155 if err := cb(evt); err != nil { 156 return err 157 } 158 159 cursor = rec.Seq 160 } 161 162 if len(records) < pageSize { 163 return nil 164 } 165 } 166} 167 168func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error { 169 return nil 170} 171 172func (p *DbPersister) Flush(ctx context.Context) error { 173 return nil 174} 175 176func (p *DbPersister) Shutdown(ctx context.Context) error { 177 return nil 178} 179 180func (p *DbPersister) cleanupRoutine() { 181 ticker := time.NewTicker(time.Hour) 182 defer ticker.Stop() 183 184 for range ticker.C { 185 cutoff := time.Now().Add(-p.Retention) 186 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil { 187 continue 188 } 189 } 190} 191 192func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) { 193 buf := new(bytes.Buffer) 194 cw := cbg.NewCborWriter(buf) 195 196 switch { 197 case e.RepoCommit != nil: 198 if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 199 return nil, err 200 } 201 case e.RepoSync != nil: 202 if err := e.RepoSync.MarshalCBOR(cw); err != nil { 203 return nil, err 204 } 205 case e.RepoIdentity != nil: 206 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 207 return nil, err 208 } 209 case e.RepoAccount != nil: 210 if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 211 return nil, err 212 } 213 default: 214 return nil, fmt.Errorf("unknown event type") 215 } 216 217 return buf.Bytes(), nil 218} 219 220func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) { 221 r := bytes.NewReader(data) 222 cr := cbg.NewCborReader(r) 223 224 switch evtType { 225 case "commit": 226 evt := &atproto.SyncSubscribeRepos_Commit{} 227 if err := evt.UnmarshalCBOR(cr); err != nil { 228 return nil, err 229 } 230 return &events.XRPCStreamEvent{RepoCommit: evt}, nil 231 case "sync": 232 evt := &atproto.SyncSubscribeRepos_Sync{} 233 if err := evt.UnmarshalCBOR(cr); err != nil { 234 return nil, err 235 } 236 return &events.XRPCStreamEvent{RepoSync: evt}, nil 237 case "identity": 238 evt := &atproto.SyncSubscribeRepos_Identity{} 239 if err := evt.UnmarshalCBOR(cr); err != nil { 240 return nil, err 241 } 242 return &events.XRPCStreamEvent{RepoIdentity: evt}, nil 243 case "account": 244 evt := &atproto.SyncSubscribeRepos_Account{} 245 if err := evt.UnmarshalCBOR(cr); err != nil { 246 return nil, err 247 } 248 return &events.XRPCStreamEvent{RepoAccount: evt}, nil 249 default: 250 return nil, fmt.Errorf("unknown event type: %s", evtType) 251 } 252}