This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1package notella 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 ll "github.com/gwennlbh/label-logger-go" 10 cmap "github.com/orcaman/concurrent-map/v2" 11 12 "github.com/redis/go-redis/v9" 13) 14 15var redisClient *redis.Client 16 17type Schedule struct { 18 cmap.ConcurrentMap[string, Message] 19} 20 21var schedules Schedule = Schedule{cmap.New[Message]()} 22 23func (job Message) Unschedule() { 24 ll.Debug("Unscheduling %s", job.Id) 25 schedules.Remove(job.Id) 26} 27 28// RestoreSchedule restores the scheduled messages from Redis to memory 29func RestoreSchedule(eager bool) error { 30 if eager { 31 ll.Log("Restoring", "blue", "schedule from Redis [red][bold]eagerly[reset]") 32 } else { 33 ll.Log("Restoring", "blue", "schedule from Redis") 34 35 } 36 keys, err := redisClient.Keys(context.Background(), "notella:message:*").Result() 37 if err != nil { 38 return fmt.Errorf("while getting notella:message:* keys from redis: %w", err) 39 } 40 41 keyCountBefore := schedules.Count() 42 43 for _, key := range keys { 44 value, err := redisClient.Get(context.Background(), key).Result() 45 if err != nil { 46 return fmt.Errorf("while restoring schedule: could not get value for Redis key %s: %w", key, err) 47 } 48 49 var job Message 50 err = json.Unmarshal([]byte(value), &job) 51 if err != nil { 52 return fmt.Errorf("while restoring schedule: could not unmarshal value for Redis key %s: %w", key, err) 53 } 54 55 if !eager && job.SendAt.Before(time.Now()) { 56 ll.Warn("skipping restoration of %s because it's in the past: %#v", job.Id, job) 57 continue 58 } 59 60 schedules.Set(job.Id, job) 61 } 62 63 ll.Log("Restored", "green", "%d scheduled jobs from Redis", schedules.Count()-keyCountBefore) 64 65 return nil 66} 67 68// SaveSchedule saves the in-memory scheduled messages to Redis 69func SaveSchedule() { 70 ll.Log("Saving", "blue", "%d scheduled jobs to Redis", schedules.Count()) 71 for key, job := range schedules.Items() { 72 go func(key string, job Message) { 73 status := redisClient.Set(context.Background(), fmt.Sprintf("notella:message:%s", key), job.JSONString(), 31*24*time.Hour) 74 if status.Err() != nil { 75 ll.ErrorDisplay("could not save %s to Redis", status.Err(), key) 76 } 77 }(key, job) 78 } 79} 80 81func ClearSavedSchedule() { 82 ll.Log("Clearing", "yellow", "all stored scheduled jobs in Redis") 83 redisClient.Del(context.Background(), redisClient.Keys(context.Background(), "notella:message:*").Val()...) 84} 85 86func ClearInMemorySchedule() { 87 ll.Log("Clearing", "yellow", "all scheduled jobs") 88 for _, job := range schedules.Items() { 89 job.Unschedule() 90 } 91} 92 93func UnscheduleAllForObject(objectId string) { 94 ll.Log("Unscheduling", "yellow", "all jobs for %s", objectId) 95 for _, job := range schedules.Items() { 96 if job.ChurrosObjectId == objectId { 97 job.Unschedule() 98 } 99 } 100} 101 102func DisplaySchedule() { 103 ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count()) 104 ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s | %s", "ID", "Event", "Object ID", "Fire at") 105 for _, job := range schedules.Items() { 106 ll.Log("", "reset", "%-15s | %-20s | %-20s | %s", job.Id, job.Event, job.ChurrosObjectId, job.SendAt) 107 } 108} 109 110func (job Message) Schedule() { 111 if !job.SendAt.IsZero() { 112 ll.Log("Scheduling", "magenta", "%s for %s", job.Id, job.SendAt) 113 } 114 schedules.Set(job.Id, job) 115} 116 117func (job Message) IsScheduled() bool { 118 return schedules.Has(job.Id) 119} 120 121// StartScheduler starts the scheduler loop, which runs forever 122// TODO instead of having a in-memory scheduler, use jetstream: 123// 1. Get the message 124// 2. job.ShouldRun? if yes, run it 125// 3. otherwise, put it back at then end of the stream 126// this means that we'll have to do a lot of json marshalling/unmarshalling though, since we'll have to decode the message to check if we need to run it... is there a better way? 127func StartScheduler() { 128 for { 129 for _, job := range schedules.Items() { 130 if job.ShouldRun() { 131 job.Unschedule() 132 go func() { 133 switch job.Event { 134 case EventShowScheduledJobs: 135 DisplaySchedule() 136 137 case EventRestoreSchedule: 138 err := RestoreSchedule(false) 139 if err != nil { 140 ll.ErrorDisplay("could not restore schedule", err) 141 } 142 143 case EventRestoreScheduleEager: 144 err := RestoreSchedule(true) 145 if err != nil { 146 ll.ErrorDisplay("could not restore schedule", err) 147 } 148 149 case EventSaveSchedule: 150 SaveSchedule() 151 152 case EventClearStoredSchedule: 153 ClearSavedSchedule() 154 155 case EventClearSchedule: 156 ClearInMemorySchedule() 157 158 default: 159 ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 160 err := job.Run() 161 if err != nil { 162 ll.ErrorDisplay("could not run job %s", err, job.Id) 163 } 164 ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId) 165 } 166 }() 167 } 168 } 169 } 170}