This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 5.5 kB View raw
1package notella 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 ll "github.com/gwennlbh/label-logger-go" 10 cmap "github.com/orcaman/concurrent-map/v2" 11 12 "github.com/redis/go-redis/v9" 13) 14 15var redisClient *redis.Client 16 17type Schedule struct { 18 cmap.ConcurrentMap[string, Message] 19} 20 21// schedules stores the scheduled messages in memory, as a mapping of job.Id -> job 22var schedules Schedule = Schedule{cmap.New[Message]()} 23 24func (job Message) Unschedule() { 25 ll.Debug("Unscheduling %s", job.Id) 26 schedules.Remove(job.Id) 27} 28 29// RestoreSchedule restores the scheduled messages from Redis to memory 30func RestoreSchedule(eager bool) error { 31 if eager { 32 ll.Log("Restoring", "blue", "schedule from Redis [red][bold]eagerly[reset]") 33 } else { 34 ll.Log("Restoring", "blue", "schedule from Redis") 35 36 } 37 keys, err := redisClient.Keys(context.Background(), "notella:message:*").Result() 38 if err != nil { 39 return fmt.Errorf("while getting notella:message:* keys from redis: %w", err) 40 } 41 42 keyCountBefore := schedules.Count() 43 44 for _, key := range keys { 45 value, err := redisClient.Get(context.Background(), key).Result() 46 if err != nil { 47 return fmt.Errorf("while restoring schedule: could not get value for Redis key %s: %w", key, err) 48 } 49 50 var job Message 51 err = json.Unmarshal([]byte(value), &job) 52 if err != nil { 53 return fmt.Errorf("while restoring schedule: could not unmarshal value for Redis key %s: %w", key, err) 54 } 55 56 if !eager && job.SendAt.Before(time.Now()) { 57 ll.Warn("skipping restoration of %s because it's in the past: %#v", job.Id, job) 58 continue 59 } 60 61 schedules.Set(job.Id, job) 62 } 63 64 ll.Log("Restored", "green", "%d scheduled jobs from Redis", schedules.Count()-keyCountBefore) 65 66 return nil 67} 68 69// SaveSchedule saves the in-memory scheduled messages to Redis 70func SaveSchedule() { 71 ll.Log("Saving", "blue", "%d scheduled jobs to Redis", schedules.Count()) 72 for key, job := range schedules.Items() { 73 go func(key string, job Message) { 74 status := redisClient.Set(context.Background(), fmt.Sprintf("notella:message:%s", key), job.JSONString(), 31*24*time.Hour) 75 if status.Err() != nil { 76 ll.ErrorDisplay("could not save %s to Redis", status.Err(), key) 77 } 78 }(key, job) 79 } 80} 81 82func ClearSavedSchedule() { 83 ll.Log("Clearing", "yellow", "all stored scheduled jobs in Redis") 84 redisClient.Del(context.Background(), redisClient.Keys(context.Background(), "notella:message:*").Val()...) 85} 86 87func ClearInMemorySchedule() { 88 ll.Log("Clearing", "yellow", "all scheduled jobs") 89 for _, job := range schedules.Items() { 90 job.Unschedule() 91 } 92} 93 94// UnscheduleAllForObject unschedules all jobs for a given object ID. If any ofType is provided, only events of the types given will be unscheduled 95func UnscheduleAllForObject(objectId string, ofType ...Event) { 96 var filter func(Message) bool 97 if len(ofType) > 0 { 98 ll.Log("Unscheduling", "yellow", "all jobs for %s of type %v", objectId, ofType) 99 100 filter = func(job Message) bool { 101 for _, t := range ofType { 102 if job.Event == t { 103 return true 104 } 105 } 106 return false 107 } 108 109 } else { 110 ll.Log("Unscheduling", "yellow", "all jobs for %s", objectId) 111 112 filter = func(Message) bool { return true } 113 } 114 115 for _, job := range schedules.Items() { 116 if job.ChurrosObjectId == objectId && filter(job) { 117 ll.Log("Unscheduling", "yellow", "%s | %s", job.Id, job.String()) 118 job.Unschedule() 119 redisClient.Del(context.Background(), fmt.Sprintf("notella:message:%s", job.Id)) 120 } 121 } 122} 123 124func DisplaySchedule() { 125 ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count()) 126 ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s | %s", "ID", "Event", "Object ID", "Fire at") 127 for _, job := range schedules.Items() { 128 ll.Log("", "reset", "%-15s | %-20s | %-20s | %s", job.Id, job.Event, job.ChurrosObjectId, job.SendAt) 129 } 130} 131 132func (job Message) Schedule() { 133 if !job.SendAt.IsZero() { 134 ll.Log("Scheduling", "magenta", "%s for %s", job.Id, job.SendAt) 135 } 136 schedules.Set(job.Id, job) 137} 138 139func (job Message) IsScheduled() bool { 140 return schedules.Has(job.Id) 141} 142 143// StartScheduler starts the scheduler loop, which runs forever 144// TODO instead of having a in-memory scheduler, use jetstream: 145// 1. Get the message 146// 2. job.ShouldRun? if yes, run it 147// 3. otherwise, put it back at then end of the stream 148// this means that we'll have to do a lot of json marshalling/unmarshalling though, since we'll have to decode the message to check if we need to run it... is there a better way? 149func StartScheduler() { 150 for { 151 for _, job := range schedules.Items() { 152 if job.ShouldRun() { 153 job.Unschedule() 154 go func() { 155 switch job.Event { 156 case EventShowScheduledJobs: 157 DisplaySchedule() 158 159 case EventRestoreSchedule: 160 err := RestoreSchedule(false) 161 if err != nil { 162 ll.ErrorDisplay("could not restore schedule", err) 163 } 164 165 case EventRestoreScheduleEager: 166 err := RestoreSchedule(true) 167 if err != nil { 168 ll.ErrorDisplay("could not restore schedule", err) 169 } 170 171 case EventSaveSchedule: 172 SaveSchedule() 173 174 case EventClearStoredSchedule: 175 ClearSavedSchedule() 176 177 case EventClearSchedule: 178 ClearInMemorySchedule() 179 180 default: 181 ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 182 err := job.Run() 183 if err != nil { 184 ll.ErrorDisplay("could not run job %s", err, job.Id) 185 } 186 ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 187 } 188 }() 189 } 190 } 191 } 192}