This repository has no description
1//go:generate go run github.com/steebchen/prisma-client-go generate
2
3package main
4
5import (
6 "context"
7 "fmt"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "git.inpt.fr/churros/notella"
14 "github.com/common-nighthawk/go-figure"
15 ll "github.com/gwennlbh/label-logger-go"
16 "github.com/nats-io/nats.go"
17)
18
19var Version = "DEV"
20
21func main() {
22 figure.NewColorFigure("Notella", "", "yellow", true).Print()
23 fmt.Printf("%38s\n", fmt.Sprintf("美味しそう〜 v%s", Version))
24 fmt.Println()
25
26 config, _ := notella.LoadConfiguration()
27
28 ll.Info("Server time is %s", time.Now().Format("2006-01-02 15:04:05 -07:00:00"))
29 if config.DryRunMode {
30 ll.Info("Running [bold]in dry run mode[reset] with")
31 } else {
32 ll.Info("Running with config ")
33 }
34 ll.Log("", "reset", "Schedule recovery: [bold][dim]at startup [reset][bold]%s[reset]", config.StartupScheduleRestoration)
35 ll.Log("", "reset", "contact email: [bold]%s[reset]", config.ContactEmail)
36 ll.Log("", "reset", "NATS URL: [bold]%s[reset]", redactURL(config.NatsURL))
37 ll.Log("", "reset", "Churros DB URL: [bold]%s[reset]", redactURL(config.ChurrosDatabaseURL))
38 ll.Log("", "reset", "Redis URL: [bold]%s[reset]", redactURL(config.RedisURL))
39 ll.Log("", "reset", "Health check on: [bold]:%d/health[reset]", config.HealthCheckPort)
40 ll.Log("", "reset", "App Package ID: [bold]%s[reset]", config.AppPackageId)
41 if config.VapidPublicKey != "" && config.VapidPrivateKey != "" {
42 ll.Log("", "reset", "VAPID keys: [bold][green]set[reset]")
43 } else {
44 ll.Log("", "reset", "VAPID keys: [bold][red]not set[reset]")
45 }
46 if config.HasValidFirebaseServiceAccount() {
47 ll.Log("", "reset", "Firebase: [bold][green]configured[reset]")
48 } else {
49 ll.Log("", "reset", "Firebase: [bold][red]unconfigured[reset]")
50 }
51 fmt.Println()
52
53 if config.StartupScheduleRestoration != "disabled" {
54 notella.RestoreSchedule(config.StartupScheduleRestoration == "eager")
55 }
56 notella.DisplaySchedule()
57
58 ll.Info("starting scheduler")
59 go notella.StartScheduler()
60
61 ll.Log("Connecting", "cyan", "to Churros database at [bold]%s[reset]", config.ChurrosDatabaseURL)
62 err := notella.ConnectToDababase()
63 if err != nil {
64 ll.ErrorDisplay("could not connect to database", err)
65 }
66
67 ll.Log("Connecting", "cyan", "to NATS server at [bold]%s[reset]", config.NatsURL)
68 nc, err := nats.Connect(config.NatsURL)
69 if err != nil {
70 ll.ErrorDisplay("could not connect to NATS at %s", err, config.NatsURL)
71 return
72 }
73
74 js, err := nc.JetStream()
75 if err != nil {
76 ll.ErrorDisplay("could not connect to Jetstream", err)
77 return
78 }
79
80 ll.Log("Initializing", "cyan", "a Jetstream stream [bold]%s[reset], listening for subject [bold]%s[reset]", notella.StreamName, notella.SubjectName)
81
82 _, err = js.AddStream(&nats.StreamConfig{
83 Name: notella.StreamName,
84 Subjects: []string{notella.SubjectName},
85 })
86 if err != nil {
87 ll.ErrorDisplay("could not create stream", err)
88 return
89 }
90
91 ll.Log("Initializing", "cyan", "Jetstream consumer [bold]NotellaConsumer[reset] with [bold]AckExplicitPolicy[reset]")
92
93 _, err = js.AddConsumer(notella.StreamName, &nats.ConsumerConfig{
94 Durable: "NotellaConsumer",
95 AckPolicy: nats.AckExplicitPolicy,
96 })
97 if err != nil {
98 ll.ErrorDisplay("could not create consumer", err)
99 return
100 }
101
102 ll.Log("Starting", "cyan", "consumer [bold]NotellaConsumer[reset]")
103
104 sub, err := js.PullSubscribe(notella.SubjectName, "NotellaConsumer")
105 if err != nil {
106 ll.ErrorDisplay("could not start consumer", err)
107 return
108 }
109
110 // Setup a context to handle graceful shutdowns
111 ctx, cancel := context.WithCancel(context.Background())
112 defer cancel()
113
114 // Capture OS signals for graceful shutdown
115 go func() {
116 sigChan := make(chan os.Signal, 1)
117 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
118 <-sigChan
119 ll.Log("Shuting down", "magenta", "because of signal received")
120 cancel()
121 }()
122
123 // Start healthcheck endpoint
124 go notella.StartHealthCheckEndpoint(config.HealthCheckPort)
125
126 // Send EventShowScheduledJobs to the stream every 5 minutes and save schedule to redis
127 go func() {
128 for {
129 select {
130 case <-ctx.Done():
131 return
132 default:
133 time.Sleep(5 * time.Minute)
134 notella.DisplaySchedule()
135 notella.SaveSchedule()
136 }
137 }
138 }()
139
140 // Continuously fetch and process messages
141 go func() {
142 for {
143 select {
144 case <-ctx.Done():
145 return
146 default:
147 // Fetch messages in batches
148 msgs, err := sub.Fetch(10, nats.MaxWait(5*time.Second))
149 if err != nil && err != nats.ErrTimeout {
150 ll.ErrorDisplay("Could not fetch messages", err)
151 time.Sleep(2 * time.Second) // Wait before retrying
152 continue
153 }
154
155 // Process each message
156 for _, msg := range msgs {
157 err = notella.NatsReceiver(msg)
158 if err != nil {
159 ll.ErrorDisplay("Could not process message", err)
160 }
161 msg.Ack() // Acknowledge the message
162 }
163 }
164 }
165 }()
166
167 // Block until the context is canceled (i.e., server shutdown signal received)
168 <-ctx.Done()
169 ll.Log("Stopped", "red", "server")
170}