This repository has no description
1package notella
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 ll "github.com/gwennlbh/label-logger-go"
10 cmap "github.com/orcaman/concurrent-map/v2"
11
12 "github.com/redis/go-redis/v9"
13)
14
15var redisClient *redis.Client
16
17type Schedule struct {
18 cmap.ConcurrentMap[string, Message]
19}
20
21// schedules stores the scheduled messages in memory, as a mapping of job.Id -> job
22var schedules Schedule = Schedule{cmap.New[Message]()}
23
24func (job Message) Unschedule() {
25 ll.Debug("Unscheduling %s", job.Id)
26 schedules.Remove(job.Id)
27}
28
29// RestoreSchedule restores the scheduled messages from Redis to memory
30func RestoreSchedule(eager bool) error {
31 if eager {
32 ll.Log("Restoring", "blue", "schedule from Redis [red][bold]eagerly[reset]")
33 } else {
34 ll.Log("Restoring", "blue", "schedule from Redis")
35
36 }
37 keys, err := redisClient.Keys(context.Background(), "notella:message:*").Result()
38 if err != nil {
39 return fmt.Errorf("while getting notella:message:* keys from redis: %w", err)
40 }
41
42 keyCountBefore := schedules.Count()
43
44 for _, key := range keys {
45 value, err := redisClient.Get(context.Background(), key).Result()
46 if err != nil {
47 return fmt.Errorf("while restoring schedule: could not get value for Redis key %s: %w", key, err)
48 }
49
50 var job Message
51 err = json.Unmarshal([]byte(value), &job)
52 if err != nil {
53 return fmt.Errorf("while restoring schedule: could not unmarshal value for Redis key %s: %w", key, err)
54 }
55
56 if !eager && job.SendAt.Before(time.Now()) {
57 ll.Warn("skipping restoration of %s because it's in the past: %#v", job.Id, job)
58 continue
59 }
60
61 schedules.Set(job.Id, job)
62 }
63
64 ll.Log("Restored", "green", "%d scheduled jobs from Redis", schedules.Count()-keyCountBefore)
65
66 return nil
67}
68
69// SaveSchedule saves the in-memory scheduled messages to Redis
70func SaveSchedule() {
71 ll.Log("Saving", "blue", "%d scheduled jobs to Redis", schedules.Count())
72 for key, job := range schedules.Items() {
73 go func(key string, job Message) {
74 status := redisClient.Set(context.Background(), fmt.Sprintf("notella:message:%s", key), job.JSONString(), 31*24*time.Hour)
75 if status.Err() != nil {
76 ll.ErrorDisplay("could not save %s to Redis", status.Err(), key)
77 }
78 }(key, job)
79 }
80}
81
82func ClearSavedSchedule() {
83 ll.Log("Clearing", "yellow", "all stored scheduled jobs in Redis")
84 redisClient.Del(context.Background(), redisClient.Keys(context.Background(), "notella:message:*").Val()...)
85}
86
87func ClearInMemorySchedule() {
88 ll.Log("Clearing", "yellow", "all scheduled jobs")
89 for _, job := range schedules.Items() {
90 job.Unschedule()
91 }
92}
93
94// UnscheduleAllForObject unschedules all jobs for a given object ID. If any ofType is provided, only events of the types given will be unscheduled
95func UnscheduleAllForObject(objectId string, ofType ...Event) {
96 var filter func(Message) bool
97 if len(ofType) > 0 {
98 ll.Log("Unscheduling", "yellow", "all jobs for %s of type %v", objectId, ofType)
99
100 filter = func(job Message) bool {
101 for _, t := range ofType {
102 if job.Event == t {
103 return true
104 }
105 }
106 return false
107 }
108
109 } else {
110 ll.Log("Unscheduling", "yellow", "all jobs for %s", objectId)
111
112 filter = func(Message) bool { return true }
113 }
114
115 for _, job := range schedules.Items() {
116 if job.ChurrosObjectId == objectId && filter(job) {
117 ll.Log("Unscheduling", "yellow", "%s | %s", job.Id, job.String())
118 job.Unschedule()
119 redisClient.Del(context.Background(), fmt.Sprintf("notella:message:%s", job.Id))
120 }
121 }
122}
123
124func DisplaySchedule() {
125 ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count())
126 ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s | %s", "ID", "Event", "Object ID", "Fire at")
127 for _, job := range schedules.Items() {
128 ll.Log("", "reset", "%-15s | %-20s | %-20s | %s", job.Id, job.Event, job.ChurrosObjectId, job.SendAt)
129 }
130}
131
132func (job Message) Schedule() {
133 if !job.SendAt.IsZero() {
134 ll.Log("Scheduling", "magenta", "%s for %s", job.Id, job.SendAt)
135 }
136 schedules.Set(job.Id, job)
137}
138
139func (job Message) IsScheduled() bool {
140 return schedules.Has(job.Id)
141}
142
143// StartScheduler starts the scheduler loop, which runs forever
144// TODO instead of having a in-memory scheduler, use jetstream:
145// 1. Get the message
146// 2. job.ShouldRun? if yes, run it
147// 3. otherwise, put it back at then end of the stream
148// this means that we'll have to do a lot of json marshalling/unmarshalling though, since we'll have to decode the message to check if we need to run it... is there a better way?
149func StartScheduler() {
150 for {
151 for _, job := range schedules.Items() {
152 if job.ShouldRun() {
153 job.Unschedule()
154 go func() {
155 switch job.Event {
156 case EventShowScheduledJobs:
157 DisplaySchedule()
158
159 case EventRestoreSchedule:
160 err := RestoreSchedule(false)
161 if err != nil {
162 ll.ErrorDisplay("could not restore schedule", err)
163 }
164
165 case EventRestoreScheduleEager:
166 err := RestoreSchedule(true)
167 if err != nil {
168 ll.ErrorDisplay("could not restore schedule", err)
169 }
170
171 case EventSaveSchedule:
172 SaveSchedule()
173
174 case EventClearStoredSchedule:
175 ClearSavedSchedule()
176
177 case EventClearSchedule:
178 ClearInMemorySchedule()
179
180 default:
181 ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId)
182 err := job.Run()
183 if err != nil {
184 ll.ErrorDisplay("could not run job %s", err, job.Id)
185 }
186 ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId)
187 }
188 }()
189 }
190 }
191 }
192}