This repository has no description
1package notella
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 ll "github.com/gwennlbh/label-logger-go"
10 cmap "github.com/orcaman/concurrent-map/v2"
11
12 "github.com/redis/go-redis/v9"
13)
14
15var redisClient *redis.Client
16
17type Schedule struct {
18 cmap.ConcurrentMap[string, Message]
19}
20
21var schedules Schedule = Schedule{cmap.New[Message]()}
22
23func (job Message) Unschedule() {
24 ll.Debug("Unscheduling %s", job.Id)
25 schedules.Remove(job.Id)
26}
27
28// RestoreSchedule restores the scheduled messages from Redis to memory
29func RestoreSchedule(eager bool) error {
30 if eager {
31 ll.Log("Restoring", "blue", "schedule from Redis [red][bold]eagerly[reset]")
32 } else {
33 ll.Log("Restoring", "blue", "schedule from Redis")
34
35 }
36 keys, err := redisClient.Keys(context.Background(), "notella:message:*").Result()
37 if err != nil {
38 return fmt.Errorf("while getting notella:message:* keys from redis: %w", err)
39 }
40
41 keyCountBefore := schedules.Count()
42
43 for _, key := range keys {
44 value, err := redisClient.Get(context.Background(), key).Result()
45 if err != nil {
46 return fmt.Errorf("while restoring schedule: could not get value for Redis key %s: %w", key, err)
47 }
48
49 var job Message
50 err = json.Unmarshal([]byte(value), &job)
51 if err != nil {
52 return fmt.Errorf("while restoring schedule: could not unmarshal value for Redis key %s: %w", key, err)
53 }
54
55 if !eager && job.SendAt.Before(time.Now()) {
56 ll.Warn("skipping restoration of %s because it's in the past: %#v", job.Id, job)
57 continue
58 }
59
60 schedules.Set(job.Id, job)
61 }
62
63 ll.Log("Restored", "green", "%d scheduled jobs from Redis", schedules.Count()-keyCountBefore)
64
65 return nil
66}
67
68// SaveSchedule saves the in-memory scheduled messages to Redis
69func SaveSchedule() {
70 ll.Log("Saving", "blue", "%d scheduled jobs to Redis", schedules.Count())
71 for key, job := range schedules.Items() {
72 go func(key string, job Message) {
73 status := redisClient.Set(context.Background(), fmt.Sprintf("notella:message:%s", key), job.JSONString(), 31*24*time.Hour)
74 if status.Err() != nil {
75 ll.ErrorDisplay("could not save %s to Redis", status.Err(), key)
76 }
77 }(key, job)
78 }
79}
80
81func ClearSavedSchedule() {
82 ll.Log("Clearing", "yellow", "all stored scheduled jobs in Redis")
83 redisClient.Del(context.Background(), redisClient.Keys(context.Background(), "notella:message:*").Val()...)
84}
85
86func ClearInMemorySchedule() {
87 ll.Log("Clearing", "yellow", "all scheduled jobs")
88 for _, job := range schedules.Items() {
89 job.Unschedule()
90 }
91}
92
93func UnscheduleAllForObject(objectId string) {
94 ll.Log("Unscheduling", "yellow", "all jobs for %s", objectId)
95 for _, job := range schedules.Items() {
96 if job.ChurrosObjectId == objectId {
97 job.Unschedule()
98 }
99 }
100}
101
102func DisplaySchedule() {
103 ll.Log("Showing", "magenta", "%d scheduled jobs", schedules.Count())
104 ll.Log("", "reset", "[dim]%-15s | %-20s | %-20s | %s", "ID", "Event", "Object ID", "Fire at")
105 for _, job := range schedules.Items() {
106 ll.Log("", "reset", "%-15s | %-20s | %-20s | %s", job.Id, job.Event, job.ChurrosObjectId, job.SendAt)
107 }
108}
109
110func (job Message) Schedule() {
111 if !job.SendAt.IsZero() {
112 ll.Log("Scheduling", "magenta", "%s for %s", job.Id, job.SendAt)
113 }
114 schedules.Set(job.Id, job)
115}
116
117func (job Message) IsScheduled() bool {
118 return schedules.Has(job.Id)
119}
120
121// StartScheduler starts the scheduler loop, which runs forever
122// TODO instead of having a in-memory scheduler, use jetstream:
123// 1. Get the message
124// 2. job.ShouldRun? if yes, run it
125// 3. otherwise, put it back at then end of the stream
126// this means that we'll have to do a lot of json marshalling/unmarshalling though, since we'll have to decode the message to check if we need to run it... is there a better way?
127func StartScheduler() {
128 for {
129 for _, job := range schedules.Items() {
130 if job.ShouldRun() {
131 job.Unschedule()
132 go func() {
133 switch job.Event {
134 case EventShowScheduledJobs:
135 DisplaySchedule()
136
137 case EventRestoreSchedule:
138 err := RestoreSchedule(false)
139 if err != nil {
140 ll.ErrorDisplay("could not restore schedule", err)
141 }
142
143 case EventRestoreScheduleEager:
144 err := RestoreSchedule(true)
145 if err != nil {
146 ll.ErrorDisplay("could not restore schedule", err)
147 }
148
149 case EventSaveSchedule:
150 SaveSchedule()
151
152 case EventClearStoredSchedule:
153 ClearSavedSchedule()
154
155 case EventClearSchedule:
156 ClearInMemorySchedule()
157
158 default:
159 ll.Log("Running", "cyan", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId)
160 err := job.Run()
161 if err != nil {
162 ll.ErrorDisplay("could not run job %s", err, job.Id)
163 }
164 ll.Log("Ran", "green", "[dim]%s[reset] job for %s on %s", job.Id, job.Event, job.ChurrosObjectId)
165 }
166 }()
167 }
168 }
169 }
170}