A fork of the Cocoon PDS but being made more distributed.
0

Configure Feed

Select the types of activity you want to include in your feed.

event persisting using retryable sequencing increments on conflicts

Signed-off-by: Will <did:plc:dadhhalkfcq3gucaq25hjqon>

author willdot.net committer
Tangled
date (Jun 6, 2026, 2:41 PM UTC) commit cd251b6b parent 20ed5d21 change-id nzlnlqmu
+68 -55
+1
go.mod
··· 4 4 5 5 require ( 6 6 github.com/Azure/go-autorest/autorest/to v0.4.1 7 + github.com/avast/retry-go/v4 v4.7.0 7 8 github.com/aws/aws-sdk-go v1.55.7 8 9 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 9 10 github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec
+2
go.sum
··· 9 9 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= 10 10 github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= 11 11 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= 12 + github.com/avast/retry-go/v4 v4.7.0 h1:yjDs35SlGvKwRNSykujfjdMxMhMQQM0TnIjJaHB+Zio= 13 + github.com/avast/retry-go/v4 v4.7.0/go.mod h1:ZMPDa3sY2bKgpLtap9JRUgk2yTAba7cgiFhqxY2Sg6Q= 12 14 github.com/aws/aws-sdk-go v1.55.7 h1:UJrkFq7es5CShfBwlWAC8DA077vp8PyVbQd3lqLiztE= 13 15 github.com/aws/aws-sdk-go v1.55.7/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= 14 16 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+59 -52
server/persist.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "errors" 6 7 "fmt" 8 + "log/slog" 7 9 "sync" 8 10 "time" 9 11 12 + "github.com/avast/retry-go/v4" 10 13 "github.com/bluesky-social/indigo/api/atproto" 11 14 "github.com/bluesky-social/indigo/events" 12 15 indigomodels "github.com/bluesky-social/indigo/models" ··· 19 22 type DbPersister struct { 20 23 Db *gorm.DB 21 24 22 - Lk sync.Mutex 23 - Seq int64 25 + Lk sync.Mutex 24 26 25 27 Broadcast func(*events.XRPCStreamEvent) 26 28 ··· 42 44 Retention: retention, 43 45 } 44 46 45 - // kind of hacky. we will try and get the latest one from the db, but if it doesn't exist...well we have a problem 46 - // because the relay will already have _some_ value > 0 set as a cursor, we'll want to just set this to some high value 47 - // we'll just grab a current unix timestamp and set that as the cursor 48 - var lastEvent models.EventRecord 49 - if err := db.Order("seq desc").Limit(1).First(&lastEvent).Error; err != nil { 50 - if err != gorm.ErrRecordNotFound { 51 - return nil, fmt.Errorf("failed to get last event seq: %w", err) 52 - } 53 - p.Seq = time.Now().Unix() 54 - } else { 55 - p.Seq = lastEvent.Seq 56 - } 57 - 58 47 go p.cleanupRoutine() 59 48 60 49 return p, nil ··· 68 57 p.Lk.Lock() 69 58 defer p.Lk.Unlock() 70 59 71 - p.Seq++ 72 - seq := p.Seq 60 + rec := &models.EventRecord{} 61 + if err := p.Db.Order("seq desc").Limit(1).First(rec).Error; err != nil { 62 + slog.Error("fetching most recent event record", "error", err) 63 + rec.Seq = time.Now().Unix() 64 + } 65 + 66 + // if the error on inserting the event record is a constraint error, it means that 67 + // another event has been stored since getting the last sequence number. In that case 68 + // retry which will increase the sequence number again and hopefully work. Any others 69 + // can error out. 70 + retryIfFunc := func(err error) bool { 71 + return errors.Is(err, gorm.ErrDuplicatedKey) 72 + } 73 + err := retry.Do(func() error { 74 + rec.Seq++ 75 + 76 + var did string 77 + var evtType string 78 + 79 + switch { 80 + case e.RepoCommit != nil: 81 + e.RepoCommit.Seq = rec.Seq 82 + did = e.RepoCommit.Repo 83 + evtType = "commit" 84 + case e.RepoSync != nil: 85 + e.RepoSync.Seq = rec.Seq 86 + did = e.RepoSync.Did 87 + evtType = "sync" 88 + case e.RepoIdentity != nil: 89 + e.RepoIdentity.Seq = rec.Seq 90 + did = e.RepoIdentity.Did 91 + evtType = "identity" 92 + case e.RepoAccount != nil: 93 + e.RepoAccount.Seq = rec.Seq 94 + did = e.RepoAccount.Did 95 + evtType = "account" 96 + default: 97 + return fmt.Errorf("unknown event type") 98 + } 73 99 74 - var did string 75 - var evtType string 100 + data, err := serializeEvent(e) 101 + if err != nil { 102 + return fmt.Errorf("failed to serialize event: %w", err) 103 + } 76 104 77 - switch { 78 - case e.RepoCommit != nil: 79 - e.RepoCommit.Seq = seq 80 - did = e.RepoCommit.Repo 81 - evtType = "commit" 82 - case e.RepoSync != nil: 83 - e.RepoSync.Seq = seq 84 - did = e.RepoSync.Did 85 - evtType = "sync" 86 - case e.RepoIdentity != nil: 87 - e.RepoIdentity.Seq = seq 88 - did = e.RepoIdentity.Did 89 - evtType = "identity" 90 - case e.RepoAccount != nil: 91 - e.RepoAccount.Seq = seq 92 - did = e.RepoAccount.Did 93 - evtType = "account" 94 - default: 95 - return fmt.Errorf("unknown event type") 96 - } 105 + rec.CreatedAt = time.Now() 106 + rec.Did = did 107 + rec.Type = evtType 108 + rec.Data = data 97 109 98 - data, err := serializeEvent(e) 99 - if err != nil { 100 - return fmt.Errorf("failed to serialize event: %w", err) 101 - } 110 + err = p.Db.Create(rec).Error 111 + if err != nil { 112 + return fmt.Errorf("failed to persist event: %w", err) 113 + } 102 114 103 - rec := &models.EventRecord{ 104 - Seq: seq, 105 - CreatedAt: time.Now(), 106 - Did: did, 107 - Type: evtType, 108 - Data: data, 109 - } 115 + return nil 116 + }, retry.RetryIf(retryIfFunc)) 110 117 111 - if err := p.Db.Create(rec).Error; err != nil { 112 - return fmt.Errorf("failed to persist event: %w", err) 118 + if err != nil { 119 + return err 113 120 } 114 121 115 122 if p.Broadcast != nil {
+6 -3
server/server.go
··· 348 348 } 349 349 350 350 var gdb *gorm.DB 351 + gormCfg := gorm.Config{ 352 + TranslateError: true, 353 + } 351 354 var err error 352 355 switch dbType { 353 356 case "postgres": 354 357 if args.DatabaseURL == "" { 355 358 return nil, fmt.Errorf("database-url must be set when using postgres") 356 359 } 357 - gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{}) 360 + gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gormCfg) 358 361 if err != nil { 359 362 return nil, fmt.Errorf("failed to connect to postgres: %w", err) 360 363 } ··· 366 369 db, err := sql.Open("libsql", fmt.Sprintf("%s?authToken=%s", primaryUrl, authToken)) 367 370 gdb, err = gorm.Open(sqlite.New(sqlite.Config{ 368 371 Conn: db, 369 - }), &gorm.Config{}) 372 + }), &gormCfg) 370 373 if err != nil { 371 374 return nil, fmt.Errorf("failed to connect to postgres: %w", err) 372 375 } 373 376 logger.Info("connected to PostgreSQL database") 374 377 375 378 default: 376 - gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{}) 379 + gdb, err = gorm.Open(sqlite.Open(args.DbName), &gormCfg) 377 380 if err != nil { 378 381 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 379 382 }