This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

feat: handle schedule persistence (saving & restoring)

+158 -49
+7
config.go
··· 3 3 import ( 4 4 "fmt" 5 5 "os" 6 + "strings" 6 7 7 8 "github.com/caarlos0/env/v11" 8 9 ll "github.com/ewen-lbh/label-logger-go" 9 10 "github.com/joho/godotenv" 11 + "github.com/redis/go-redis/v9" 10 12 ) 11 13 12 14 type Configuration struct { 13 15 Port int `env:"PORT" envDefault:"8080"` 14 16 ChurrosDatabaseURL string `env:"DATABASE_URL"` 17 + RedisURL string `env:"REDIS_URL"` 15 18 VapidPublicKey string `env:"PUBLIC_VAPID_KEY"` 16 19 VapidPrivateKey string `env:"VAPID_PRIVATE_KEY"` 17 20 ContactEmail string `env:"CONTACT_EMAIL"` ··· 54 57 } 55 58 ll.Log("Initialized", "cyan", "firebase client") 56 59 60 + redisClient = redis.NewClient(&redis.Options{ 61 + Addr: strings.TrimPrefix(config.RedisURL, "redis://"), 62 + }) 63 + ll.Log("Initialized", "cyan", "redis client") 57 64 }
+21 -17
events.go
··· 7 7 const ( 8 8 // EventClearScheduledJobs is used to clear all future scheduled jobs for a given churros object 9 9 // For example, when adding a new ticket to an event, we want to unschedule all future notifications for the event since the shotgun date may have changed 10 - EventClearScheduledJobs Event = "clear_scheduled_jobs" 11 - EventShowScheduledJobs Event = "show_scheduled_jobs" 12 - EventNewPost Event = "new_post" 13 - EventGodchildRequest Event = "godchild_request" 14 - EventNewComment Event = "new_comment" 15 - EventCommentReply Event = "comment_reply" 16 - EventCustom Event = "custom" 17 - EventTest Event = "test" 18 - EventGodchildAccepted Event = "godchild_accepted" 19 - EventGodchildRejected Event = "godchild_rejected" 20 - EventPendingSignup Event = "pending_signup" 21 - EventLoginStuck Event = "login_stuck" 22 - EventBookingPaid Event = "booking_paid" 23 - EventContributionPaid Event = "contribution_paid" 24 - EventShotgunOpensSoon Event = "shotgun_opens_soon" 25 - EventShotgunClosesSoon Event = "shotgun_closes_soon" 10 + EventClearScheduledJobs Event = "clear_scheduled_jobs" 11 + EventClearStoredSchedule Event = "clear_stored_schedule" 12 + EventShowScheduledJobs Event = "show_scheduled_jobs" 13 + EventSaveSchedule Event = "save_schedule" 14 + EventRestoreSchedule Event = "restore_schedule" 15 + EventClearSchedule Event = "clear_schedule" 16 + EventNewPost Event = "new_post" 17 + EventGodchildRequest Event = "godchild_request" 18 + EventNewComment Event = "new_comment" 19 + EventCommentReply Event = "comment_reply" 20 + EventCustom Event = "custom" 21 + EventTest Event = "test" 22 + EventGodchildAccepted Event = "godchild_accepted" 23 + EventGodchildRejected Event = "godchild_rejected" 24 + EventPendingSignup Event = "pending_signup" 25 + EventLoginStuck Event = "login_stuck" 26 + EventBookingPaid Event = "booking_paid" 27 + EventContributionPaid Event = "contribution_paid" 28 + EventShotgunOpensSoon Event = "shotgun_opens_soon" 29 + EventShotgunClosesSoon Event = "shotgun_closes_soon" 26 30 ) 27 31 28 32 type Message struct { ··· 31 35 // When to push the notification 32 36 SendAt time.Time `json:"send_at"` 33 37 // Type of event that triggered the notification 34 - Event Event `json:"event" jsonschema:"enum=clear_scheduled_jobs,enum=show_scheduled_jobs,enum=new_post,enum=godchild_request,enum=new_comment,enum=comment_reply,enum=custom,enum=test,enum=godchild_accepted,enum=godchild_rejected,enum=pending_signup,enum=login_stuck,enum=booking_paid,enum=contribution_paid,enum=shotgun_opens_soon,enum=shotgun_closes_soon"` 38 + Event Event `json:"event" jsonschema:"enum=save_schedule,enum=clear_schedule,enum=clear_stored_schedule,enum=restore_schedule,enum=clear_scheduled_jobs,enum=show_scheduled_jobs,enum=new_post,enum=godchild_request,enum=new_comment,enum=comment_reply,enum=custom,enum=test,enum=godchild_accepted,enum=godchild_rejected,enum=pending_signup,enum=login_stuck,enum=booking_paid,enum=contribution_paid,enum=shotgun_opens_soon,enum=shotgun_closes_soon"` 35 39 // Churros ID of the ressource (the ticket, the post, the comment, etc) 36 40 // Used to determine to whom the notification should be sent 37 41 // For godchild_request, this is not a user id, but a godparent request id.
+3
go.mod
··· 27 27 github.com/MicahParks/keyfunc v1.9.0 // indirect 28 28 github.com/bahlo/generic-list-go v0.2.0 // indirect 29 29 github.com/buger/jsonparser v1.1.1 // indirect 30 + github.com/cespare/xxhash/v2 v2.3.0 // indirect 31 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect 30 32 github.com/felixge/httpsnoop v1.0.4 // indirect 31 33 github.com/go-logr/logr v1.4.2 // indirect 32 34 github.com/go-logr/stdr v1.2.2 // indirect ··· 71 73 github.com/invopop/jsonschema v0.12.0 72 74 github.com/mattn/go-isatty v0.0.20 // indirect 73 75 github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect 76 + github.com/redis/go-redis/v9 v9.7.0 74 77 golang.org/x/sys v0.26.0 // indirect 75 78 )
+10
go.sum
··· 24 24 github.com/SherClockHolmes/webpush-go v1.3.0/go.mod h1:AxRHmJuYwKGG1PVgYzToik1lphQvDnqFYDqimHvwhIw= 25 25 github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= 26 26 github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= 27 + github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= 28 + github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= 29 + github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= 30 + github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= 27 31 github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= 28 32 github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= 29 33 github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3Kg= 30 34 github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc= 31 35 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= 36 + github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 37 + github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 32 38 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= 33 39 github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= 34 40 github.com/common-nighthawk/go-figure v0.0.0-20210622060536-734e95fb86be h1:J5BL2kskAlV9ckgEsNQXscjIaLiOYiZ75d4e94E6dcQ= ··· 36 42 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 37 43 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 38 44 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 45 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= 46 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= 39 47 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= 40 48 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= 41 49 github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= ··· 119 127 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 120 128 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 121 129 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= 130 + github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= 131 + github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= 122 132 github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= 123 133 github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= 124 134 github.com/steebchen/prisma-client-go v0.42.0 h1:83keN+4jGvoTccCKCk74UU5JQj6pOwPcg3/zkoqxKJE=
+109 -21
scheduler.go
··· 1 1 package notella 2 2 3 3 import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "time" 8 + 4 9 ll "github.com/ewen-lbh/label-logger-go" 5 10 cmap "github.com/orcaman/concurrent-map/v2" 11 + 12 + "github.com/redis/go-redis/v9" 6 13 ) 7 14 8 - var schedules = cmap.New[Message]() 15 + var redisClient *redis.Client 16 + 17 + type Schedule struct { 18 + cmap.ConcurrentMap[string, Message] 19 + } 20 + 21 + var schedules Schedule = Schedule{cmap.New[Message]()} 9 22 10 23 func (job Message) Unschedule() { 11 24 ll.Debug("Unscheduling %s", job.Id) 12 25 schedules.Remove(job.Id) 13 26 } 14 27 28 + // RestoreSchedule restores the scheduled messages from Redis to memory 29 + func RestoreSchedule() error { 30 + ll.Log("Restoring", "blue", "schedule from Redis") 31 + keys, err := redisClient.Keys(context.Background(), "notella:message:*").Result() 32 + if err != nil { 33 + return fmt.Errorf("while getting notella:message:* keys from redis: %w", err) 34 + } 35 + 36 + keyCountBefore := schedules.Count() 37 + 38 + for _, key := range keys { 39 + value, err := redisClient.Get(context.Background(), key).Result() 40 + if err != nil { 41 + return fmt.Errorf("while restoring schedule: could not get value for Redis key %s: %w", key, err) 42 + } 43 + 44 + var job Message 45 + err = json.Unmarshal([]byte(value), &job) 46 + if err != nil { 47 + return fmt.Errorf("while restoring schedule: could not unmarshal value for Redis key %s: %w", key, err) 48 + } 49 + 50 + if job.SendAt.Before(time.Now()) { 51 + ll.Warn("skipping restoration of %s because it's in the past: %#v", job.Id, job) 52 + continue 53 + } 54 + 55 + schedules.Set(job.Id, job) 56 + } 57 + 58 + ll.Log("Restored", "green", "%d scheduled jobs from Redis", schedules.Count()-keyCountBefore) 59 + 60 + return nil 61 + } 62 + 63 + // SaveSchedule saves the in-memory scheduled messages to Redis 64 + func SaveSchedule() { 65 + ll.Log("Saving", "blue", "%d scheduled jobs to Redis", schedules.Count()) 66 + for key, job := range schedules.Items() { 67 + go func(key string, job Message) { 68 + status := redisClient.Set(context.Background(), fmt.Sprintf("notella:message:%s", key), job.JSONString(), 31*24*time.Hour) 69 + if status.Err() != nil { 70 + ll.ErrorDisplay("could not save %s to Redis", status.Err(), key) 71 + } 72 + }(key, job) 73 + } 74 + } 75 + 76 + func ClearSavedSchedule() { 77 + ll.Log("Clearing", "yellow", "all stored scheduled jobs in Redis") 78 + redisClient.Del(context.Background(), redisClient.Keys(context.Background(), "notella:message:*").Val()...) 79 + } 80 + 81 + func ClearInMemorySchedule() { 82 + ll.Log("Clearing", "yellow", "all scheduled jobs") 83 + for _, job := range schedules.Items() { 84 + job.Unschedule() 85 + } 86 + } 87 + 15 88 func UnscheduleAllForObject(objectId string) { 16 89 ll.Log("Unscheduling", "yellow", "all jobs for %s", objectId) 17 90 for _, job := range schedules.Items() { ··· 21 94 } 22 95 } 23 96 97 + func DisplaySchedule() { 98 + ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count()) 99 + ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s", "ID", "Event", "Object ID") 100 + for _, job := range schedules.Items() { 101 + ll.Log("", "reset", "%-15s | %-20s | %-20s", job.Id, job.Event, job.ChurrosObjectId) 102 + } 103 + } 104 + 24 105 func (job Message) Schedule() { 25 - if job.Event != EventShowScheduledJobs { 106 + if !job.SendAt.IsZero() { 26 107 ll.Log("Scheduling", "magenta", "%s for %s", job.Id, job.SendAt) 27 108 } 28 109 schedules.Set(job.Id, job) ··· 42 123 for { 43 124 for _, job := range schedules.Items() { 44 125 if job.ShouldRun() { 45 - if job.Event != EventShowScheduledJobs { 46 - ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 47 - } 48 126 job.Unschedule() 49 127 go func() { 50 - if job.Event == EventShowScheduledJobs { 51 - ShowScheduledJobs() 52 - return 53 - } 54 - err := job.Run() 55 - if err != nil { 56 - ll.ErrorDisplay("could not run job %s", err, job.Id) 128 + switch job.Event { 129 + case EventShowScheduledJobs: 130 + DisplaySchedule() 131 + 132 + case EventRestoreSchedule: 133 + err := RestoreSchedule() 134 + if err != nil { 135 + ll.ErrorDisplay("could not restore schedule", err) 136 + } 137 + 138 + case EventSaveSchedule: 139 + SaveSchedule() 140 + 141 + case EventClearStoredSchedule: 142 + ClearSavedSchedule() 143 + 144 + case EventClearSchedule: 145 + ClearInMemorySchedule() 146 + 147 + default: 148 + ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 149 + err := job.Run() 150 + if err != nil { 151 + ll.ErrorDisplay("could not run job %s", err, job.Id) 152 + } 153 + ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 57 154 } 58 - ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 59 155 }() 60 156 } 61 157 } 62 158 } 63 159 } 64 - 65 - func ShowScheduledJobs() { 66 - ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count()) 67 - ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s", "ID", "Event", "Object ID") 68 - for _, job := range schedules.Items() { 69 - ll.Log("", "reset", "%-15s | %-20s | %-20s", job.Id, job.Event, job.ChurrosObjectId) 70 - } 71 - }
+8 -11
server/main.go
··· 25 25 26 26 config, _ := notella.LoadConfiguration() 27 27 28 + ll.Info("Server time is %s", time.Now().Format("2006-01-02 15:04:05 -07:00:00")) 28 29 ll.Info("Running with config ") 29 30 ll.Log("", "reset", "port: [bold]%d[reset]", config.Port) 30 31 ll.Log("", "reset", "contact email: [bold]%s[reset]", config.ContactEmail) 31 32 ll.Log("", "reset", "Churros DB URL: [bold]%s[reset]", redactURL(config.ChurrosDatabaseURL)) 33 + ll.Log("", "reset", "Redis URL: [bold]%s[reset]", redactURL(config.RedisURL)) 32 34 ll.Log("", "reset", "App Package ID: [bold]%s[reset]", config.AppPackageId) 33 35 if config.VapidPublicKey != "" && config.VapidPrivateKey != "" { 34 36 ll.Log("", "reset", "VAPID keys: [bold][green]set[reset]") ··· 41 43 ll.Log("", "reset", "Firebase: [bold][red]unconfigured[reset]") 42 44 } 43 45 fmt.Println() 46 + 47 + notella.RestoreSchedule() 48 + notella.DisplaySchedule() 44 49 45 50 ll.Info("starting scheduler") 46 51 go notella.StartScheduler() ··· 107 112 cancel() 108 113 }() 109 114 110 - // Send EventShowScheduledJobs to the stream every 5 minutes 115 + // Send EventShowScheduledJobs to the stream every 5 minutes and save schedule to redis 111 116 go func() { 112 117 for { 113 118 select { 114 119 case <-ctx.Done(): 115 120 return 116 121 default: 117 - msg := notella.Message{ 118 - Id: fmt.Sprintf("show-jobs-%d:%d", time.Now().Hour(), time.Now().Minute()), 119 - Event: notella.EventShowScheduledJobs, 120 - } 121 - 122 - _, err := js.Publish(notella.SubjectName, msg.JSONBytes()) 123 - if err != nil { 124 - ll.ErrorDisplay("could not send scheduled jobs message", err) 125 - } 126 - 127 122 time.Sleep(5 * time.Minute) 123 + notella.DisplaySchedule() 124 + notella.SaveSchedule() 128 125 } 129 126 } 130 127 }()