A fork of the Cocoon PDS but being made more distributed.
0

Configure Feed

Select the types of activity you want to include in your feed.

1package server 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "log/slog" 9 "sync" 10 "time" 11 12 "github.com/avast/retry-go/v4" 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/events" 15 indigomodels "github.com/bluesky-social/indigo/models" 16 cbg "github.com/whyrusleeping/cbor-gen" 17 "gorm.io/gorm" 18 19 "github.com/haileyok/cocoon/models" 20) 21 22type DbPersister struct { 23 Db *gorm.DB 24 25 Lk sync.Mutex 26 27 Broadcast func(*events.XRPCStreamEvent) 28 29 // how long do we actually want to keep these things around 30 Retention time.Duration 31} 32 33func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) { 34 if err := db.AutoMigrate(&models.EventRecord{}); err != nil { 35 return nil, fmt.Errorf("failed to migrate EventRecord: %w", err) 36 } 37 38 if retention == 0 { 39 retention = 72 * time.Hour 40 } 41 42 p := &DbPersister{ 43 Db: db, 44 Retention: retention, 45 } 46 47 go p.cleanupRoutine() 48 49 return p, nil 50} 51 52func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 53 p.Broadcast = brc 54} 55 56func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 57 p.Lk.Lock() 58 defer p.Lk.Unlock() 59 60 rec := &models.EventRecord{} 61 if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil { 62 slog.Error("fetching most recent event record", "error", err) 63 rec.Seq = time.Now().Unix() 64 } 65 66 // if the error on inserting the event record is a constraint error, it means that 67 // another event has been stored since getting the last sequence number. In that case 68 // retry which will increase the sequence number again and hopefully work. Any others 69 // can error out. 70 retryIfFunc := func(err error) bool { 71 return errors.Is(err, gorm.ErrDuplicatedKey) 72 } 73 err := retry.Do(func() error { 74 rec.Seq++ 75 76 var did string 77 var evtType string 78 79 switch { 80 case e.RepoCommit != nil: 81 e.RepoCommit.Seq = rec.Seq 82 did = e.RepoCommit.Repo 83 evtType = "commit" 84 case e.RepoSync != nil: 85 e.RepoSync.Seq = rec.Seq 86 did = e.RepoSync.Did 87 evtType = "sync" 88 case e.RepoIdentity != nil: 89 e.RepoIdentity.Seq = rec.Seq 90 did = e.RepoIdentity.Did 91 evtType = "identity" 92 case e.RepoAccount != nil: 93 e.RepoAccount.Seq = rec.Seq 94 did = e.RepoAccount.Did 95 evtType = "account" 96 default: 97 return fmt.Errorf("unknown event type") 98 } 99 100 data, err := serializeEvent(e) 101 if err != nil { 102 return fmt.Errorf("failed to serialize event: %w", err) 103 } 104 105 rec.CreatedAt = time.Now() 106 rec.Did = did 107 rec.Type = evtType 108 rec.Data = data 109 110 err = p.Db.Create(rec).Error 111 if err != nil { 112 return fmt.Errorf("failed to persist event: %w", err) 113 } 114 115 return nil 116 }, retry.RetryIf(retryIfFunc)) 117 118 if err != nil { 119 return err 120 } 121 122 if p.Broadcast != nil { 123 p.Broadcast(e) 124 } 125 126 return nil 127} 128 129func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 130 const pageSize = 500 131 132 cursor := since 133 for { 134 var records []models.EventRecord 135 if err := p.Db.WithContext(ctx). 136 Where("seq > ?", cursor). 137 Order("seq asc"). 138 Limit(pageSize). 139 Find(&records).Error; err != nil { 140 return fmt.Errorf("failed to query events: %w", err) 141 } 142 143 if len(records) == 0 { 144 return nil 145 } 146 147 for _, rec := range records { 148 evt, err := deserializeEvent(rec.Type, rec.Data) 149 if err != nil { 150 return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err) 151 } 152 153 if err := cb(evt); err != nil { 154 return err 155 } 156 157 cursor = rec.Seq 158 } 159 160 if len(records) < pageSize { 161 return nil 162 } 163 } 164} 165 166func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error { 167 return nil 168} 169 170func (p *DbPersister) Flush(ctx context.Context) error { 171 return nil 172} 173 174func (p *DbPersister) Shutdown(ctx context.Context) error { 175 return nil 176} 177 178func (p *DbPersister) cleanupRoutine() { 179 ticker := time.NewTicker(time.Hour) 180 defer ticker.Stop() 181 182 for range ticker.C { 183 cutoff := time.Now().Add(-p.Retention) 184 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil { 185 continue 186 } 187 } 188} 189 190func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) { 191 buf := new(bytes.Buffer) 192 cw := cbg.NewCborWriter(buf) 193 194 switch { 195 case e.RepoCommit != nil: 196 if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 197 return nil, err 198 } 199 case e.RepoSync != nil: 200 if err := e.RepoSync.MarshalCBOR(cw); err != nil { 201 return nil, err 202 } 203 case e.RepoIdentity != nil: 204 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 205 return nil, err 206 } 207 case e.RepoAccount != nil: 208 if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 209 return nil, err 210 } 211 default: 212 return nil, fmt.Errorf("unknown event type") 213 } 214 215 return buf.Bytes(), nil 216} 217 218func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) { 219 r := bytes.NewReader(data) 220 cr := cbg.NewCborReader(r) 221 222 switch evtType { 223 case "commit": 224 evt := &atproto.SyncSubscribeRepos_Commit{} 225 if err := evt.UnmarshalCBOR(cr); err != nil { 226 return nil, err 227 } 228 return &events.XRPCStreamEvent{RepoCommit: evt}, nil 229 case "sync": 230 evt := &atproto.SyncSubscribeRepos_Sync{} 231 if err := evt.UnmarshalCBOR(cr); err != nil { 232 return nil, err 233 } 234 return &events.XRPCStreamEvent{RepoSync: evt}, nil 235 case "identity": 236 evt := &atproto.SyncSubscribeRepos_Identity{} 237 if err := evt.UnmarshalCBOR(cr); err != nil { 238 return nil, err 239 } 240 return &events.XRPCStreamEvent{RepoIdentity: evt}, nil 241 case "account": 242 evt := &atproto.SyncSubscribeRepos_Account{} 243 if err := evt.UnmarshalCBOR(cr); err != nil { 244 return nil, err 245 } 246 return &events.XRPCStreamEvent{RepoAccount: evt}, nil 247 default: 248 return nil, fmt.Errorf("unknown event type: %s", evtType) 249 } 250}