This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1package notella 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 ll "github.com/gwennlbh/label-logger-go" 10 cmap "github.com/orcaman/concurrent-map/v2" 11 12 "github.com/redis/go-redis/v9" 13) 14 15var redisClient *redis.Client 16 17type Schedule struct { 18 cmap.ConcurrentMap[string, Message] 19} 20 21// schedules stores the scheduled messages in memory, as a mapping of job.Id -> job 22var schedules Schedule = Schedule{cmap.New[Message]()} 23 24func (job Message) Unschedule() { 25 ll.Debug("Unscheduling %s", job.Id) 26 schedules.Remove(job.Id) 27} 28 29// RestoreSchedule restores the scheduled messages from Redis to memory 30func RestoreSchedule(eager bool) error { 31 if eager { 32 ll.Log("Restoring", "blue", "schedule from Redis [red][bold]eagerly[reset]") 33 } else { 34 ll.Log("Restoring", "blue", "schedule from Redis") 35 36 } 37 keys, err := redisClient.Keys(context.Background(), "notella:message:*").Result() 38 if err != nil { 39 return fmt.Errorf("while getting notella:message:* keys from redis: %w", err) 40 } 41 42 keyCountBefore := schedules.Count() 43 44 for _, key := range keys { 45 value, err := redisClient.Get(context.Background(), key).Result() 46 if err != nil { 47 return fmt.Errorf("while restoring schedule: could not get value for Redis key %s: %w", key, err) 48 } 49 50 var job Message 51 err = json.Unmarshal([]byte(value), &job) 52 if err != nil { 53 return fmt.Errorf("while restoring schedule: could not unmarshal value for Redis key %s: %w", key, err) 54 } 55 56 if !eager && job.SendAt.Before(time.Now()) { 57 ll.Warn("skipping restoration of %s because it's in the past: %#v", job.Id, job) 58 continue 59 } 60 61 schedules.Set(job.Id, job) 62 } 63 64 ll.Log("Restored", "green", "%d scheduled jobs from Redis", schedules.Count()-keyCountBefore) 65 66 return nil 67} 68 69// SaveSchedule saves the in-memory scheduled messages to Redis 70func SaveSchedule() { 71 ll.Log("Saving", "blue", "%d scheduled jobs to Redis", schedules.Count()) 72 for key, job := range schedules.Items() { 73 go func(key string, job Message) { 74 status := redisClient.Set(context.Background(), fmt.Sprintf("notella:message:%s", key), job.JSONString(), 31*24*time.Hour) 75 if status.Err() != nil { 76 ll.ErrorDisplay("could not save %s to Redis", status.Err(), key) 77 } 78 }(key, job) 79 } 80} 81 82func ClearSavedSchedule() { 83 ll.Log("Clearing", "yellow", "all stored scheduled jobs in Redis") 84 redisClient.Del(context.Background(), redisClient.Keys(context.Background(), "notella:message:*").Val()...) 85} 86 87func ClearInMemorySchedule() { 88 ll.Log("Clearing", "yellow", "all scheduled jobs") 89 for _, job := range schedules.Items() { 90 job.Unschedule() 91 } 92} 93 94// UnscheduleAllForObject unschedules all jobs for a given object ID. If any ofType is provided, only events of the types given will be unscheduled 95func UnscheduleAllForObject(objectId string, ofType ...Event) { 96 var filter func(Message) bool 97 if len(ofType) > 0 { 98 ll.Log("Unscheduling", "yellow", "all jobs for %s of type %v", objectId, ofType) 99 100 filter = func(job Message) bool { 101 for _, t := range ofType { 102 if job.Event == t { 103 return true 104 } 105 } 106 return false 107 } 108 109 } else { 110 ll.Log("Unscheduling", "yellow", "all jobs for %s", objectId) 111 112 filter = func(Message) bool { return true } 113 } 114 115 for _, job := range schedules.Items() { 116 if job.ChurrosObjectId == objectId && filter(job) { 117 ll.Log("Unscheduling", "yellow", "%s | %s", job.Id, job.String()) 118 job.Unschedule() 119 } 120 } 121} 122 123func DisplaySchedule() { 124 ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count()) 125 ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s | %s", "ID", "Event", "Object ID", "Fire at") 126 for _, job := range schedules.Items() { 127 ll.Log("", "reset", "%-15s | %-20s | %-20s | %s", job.Id, job.Event, job.ChurrosObjectId, job.SendAt) 128 } 129} 130 131func (job Message) Schedule() { 132 if !job.SendAt.IsZero() { 133 ll.Log("Scheduling", "magenta", "%s for %s", job.Id, job.SendAt) 134 } 135 schedules.Set(job.Id, job) 136} 137 138func (job Message) IsScheduled() bool { 139 return schedules.Has(job.Id) 140} 141 142// StartScheduler starts the scheduler loop, which runs forever 143// TODO instead of having a in-memory scheduler, use jetstream: 144// 1. Get the message 145// 2. job.ShouldRun? if yes, run it 146// 3. otherwise, put it back at then end of the stream 147// this means that we'll have to do a lot of json marshalling/unmarshalling though, since we'll have to decode the message to check if we need to run it... is there a better way? 148func StartScheduler() { 149 for { 150 for _, job := range schedules.Items() { 151 if job.ShouldRun() { 152 job.Unschedule() 153 go func() { 154 switch job.Event { 155 case EventShowScheduledJobs: 156 DisplaySchedule() 157 158 case EventRestoreSchedule: 159 err := RestoreSchedule(false) 160 if err != nil { 161 ll.ErrorDisplay("could not restore schedule", err) 162 } 163 164 case EventRestoreScheduleEager: 165 err := RestoreSchedule(true) 166 if err != nil { 167 ll.ErrorDisplay("could not restore schedule", err) 168 } 169 170 case EventSaveSchedule: 171 SaveSchedule() 172 173 case EventClearStoredSchedule: 174 ClearSavedSchedule() 175 176 case EventClearSchedule: 177 ClearInMemorySchedule() 178 179 default: 180 ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 181 err := job.Run() 182 if err != nil { 183 ll.ErrorDisplay("could not run job %s", err, job.Id) 184 } 185 ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 186 } 187 }() 188 } 189 } 190 } 191}