A fork of the Cocoon PDS but being made more distributed.
0

Configure Feed

Select the types of activity you want to include in your feed.

1package server 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "log/slog" 9 "sync" 10 "time" 11 12 "github.com/avast/retry-go/v4" 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/events" 15 indigomodels "github.com/bluesky-social/indigo/models" 16 cbg "github.com/whyrusleeping/cbor-gen" 17 "gorm.io/gorm" 18 19 "github.com/haileyok/cocoon/models" 20) 21 22type DbPersister struct { 23 Db *gorm.DB 24 25 Lk sync.Mutex 26 27 Broadcast func(*events.XRPCStreamEvent) 28 29 // how long do we actually want to keep these things around 30 Retention time.Duration 31} 32 33func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) { 34 if err := db.AutoMigrate(&models.EventRecord{}); err != nil { 35 return nil, fmt.Errorf("failed to migrate EventRecord: %w", err) 36 } 37 38 if retention == 0 { 39 retention = 72 * time.Hour 40 } 41 42 p := &DbPersister{ 43 Db: db, 44 Retention: retention, 45 } 46 47 go p.cleanupRoutine() 48 49 return p, nil 50} 51 52func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 53 p.Broadcast = brc 54} 55 56func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 57 p.Lk.Lock() 58 defer p.Lk.Unlock() 59 60 rec := &models.EventRecord{} 61 if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil { 62 slog.Error("fetching most recent event record", "error", err) 63 rec.Seq = time.Now().UnixMilli() 64 } 65 66 // if the error on inserting the event record is a constraint error, it means that 67 // another event has been stored since getting the last sequence number. In that case 68 // retry which will increase the sequence number again and hopefully work. Any others 69 // can error out. 70 retryIfFunc := func(err error) bool { 71 return errors.Is(err, gorm.ErrDuplicatedKey) 72 } 73 err := retry.Do(func() error { 74 rec.Seq++ 75 76 var did string 77 var evtType string 78 79 switch { 80 case e.RepoCommit != nil: 81 e.RepoCommit.Seq = rec.Seq 82 did = e.RepoCommit.Repo 83 evtType = "commit" 84 case e.RepoSync != nil: 85 e.RepoSync.Seq = rec.Seq 86 did = e.RepoSync.Did 87 evtType = "sync" 88 case e.RepoIdentity != nil: 89 e.RepoIdentity.Seq = rec.Seq 90 did = e.RepoIdentity.Did 91 evtType = "identity" 92 case e.RepoAccount != nil: 93 e.RepoAccount.Seq = rec.Seq 94 did = e.RepoAccount.Did 95 evtType = "account" 96 default: 97 return fmt.Errorf("unknown event type") 98 } 99 100 data, err := serializeEvent(e) 101 if err != nil { 102 return fmt.Errorf("failed to serialize event: %w", err) 103 } 104 105 rec.CreatedAt = time.Now() 106 rec.Did = did 107 rec.Type = evtType 108 rec.Data = data 109 110 err = p.Db.Create(rec).Error 111 if err != nil { 112 return fmt.Errorf("failed to persist event: %w", err) 113 } 114 115 return nil 116 }, retry.RetryIf(retryIfFunc)) 117 118 if err != nil { 119 slog.Error(err.Error()) 120 return err 121 } 122 123 if p.Broadcast != nil { 124 p.Broadcast(e) 125 } 126 127 return nil 128} 129 130func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 131 const pageSize = 500 132 133 cursor := since 134 for { 135 var records []models.EventRecord 136 if err := p.Db.WithContext(ctx). 137 Where("seq > ?", cursor). 138 Order("seq asc"). 139 Limit(pageSize). 140 Find(&records).Error; err != nil { 141 return fmt.Errorf("failed to query events: %w", err) 142 } 143 144 slog.Info("playback", "len of records", len(records), "cursor", cursor) 145 146 if len(records) == 0 { 147 return nil 148 } 149 150 for _, rec := range records { 151 evt, err := deserializeEvent(rec.Type, rec.Data) 152 if err != nil { 153 return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err) 154 } 155 156 if err := cb(evt); err != nil { 157 return err 158 } 159 160 cursor = rec.Seq 161 } 162 163 if len(records) < pageSize { 164 return nil 165 } 166 } 167} 168 169func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error { 170 return nil 171} 172 173func (p *DbPersister) Flush(ctx context.Context) error { 174 return nil 175} 176 177func (p *DbPersister) Shutdown(ctx context.Context) error { 178 return nil 179} 180 181func (p *DbPersister) cleanupRoutine() { 182 ticker := time.NewTicker(time.Hour) 183 defer ticker.Stop() 184 185 for range ticker.C { 186 cutoff := time.Now().Add(-p.Retention) 187 if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil { 188 continue 189 } 190 } 191} 192 193func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) { 194 buf := new(bytes.Buffer) 195 cw := cbg.NewCborWriter(buf) 196 197 switch { 198 case e.RepoCommit != nil: 199 if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 200 return nil, err 201 } 202 case e.RepoSync != nil: 203 if err := e.RepoSync.MarshalCBOR(cw); err != nil { 204 return nil, err 205 } 206 case e.RepoIdentity != nil: 207 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 208 return nil, err 209 } 210 case e.RepoAccount != nil: 211 if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 212 return nil, err 213 } 214 default: 215 return nil, fmt.Errorf("unknown event type") 216 } 217 218 return buf.Bytes(), nil 219} 220 221func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) { 222 r := bytes.NewReader(data) 223 cr := cbg.NewCborReader(r) 224 225 switch evtType { 226 case "commit": 227 evt := &atproto.SyncSubscribeRepos_Commit{} 228 if err := evt.UnmarshalCBOR(cr); err != nil { 229 return nil, err 230 } 231 return &events.XRPCStreamEvent{RepoCommit: evt}, nil 232 case "sync": 233 evt := &atproto.SyncSubscribeRepos_Sync{} 234 if err := evt.UnmarshalCBOR(cr); err != nil { 235 return nil, err 236 } 237 return &events.XRPCStreamEvent{RepoSync: evt}, nil 238 case "identity": 239 evt := &atproto.SyncSubscribeRepos_Identity{} 240 if err := evt.UnmarshalCBOR(cr); err != nil { 241 return nil, err 242 } 243 return &events.XRPCStreamEvent{RepoIdentity: evt}, nil 244 case "account": 245 evt := &atproto.SyncSubscribeRepos_Account{} 246 if err := evt.UnmarshalCBOR(cr); err != nil { 247 return nil, err 248 } 249 return &events.XRPCStreamEvent{RepoAccount: evt}, nil 250 default: 251 return nil, fmt.Errorf("unknown event type: %s", evtType) 252 } 253}