This repository has no description
0

Configure Feed

Select the types of activity you want to include in your feed.

1//go:generate go run github.com/steebchen/prisma-client-go generate 2 3package main 4 5import ( 6 "context" 7 "fmt" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 "git.inpt.fr/churros/notella" 14 "github.com/common-nighthawk/go-figure" 15 ll "github.com/gwennlbh/label-logger-go" 16 "github.com/nats-io/nats.go" 17) 18 19var Version = "DEV" 20 21func main() { 22 figure.NewColorFigure("Notella", "", "yellow", true).Print() 23 fmt.Printf("%38s\n", fmt.Sprintf("美味しそう〜 v%s", Version)) 24 fmt.Println() 25 26 config, _ := notella.LoadConfiguration() 27 28 ll.Info("Server time is %s", time.Now().Format("2006-01-02 15:04:05 -07:00:00")) 29 if config.DryRunMode { 30 ll.Info("Running [bold]in dry run mode[reset] with") 31 } else { 32 ll.Info("Running with config ") 33 } 34 ll.Log("", "reset", "Schedule recovery: [bold][dim]at startup [reset][bold]%s[reset]", config.StartupScheduleRestoration) 35 ll.Log("", "reset", "contact email: [bold]%s[reset]", config.ContactEmail) 36 ll.Log("", "reset", "NATS URL: [bold]%s[reset]", redactURL(config.NatsURL)) 37 ll.Log("", "reset", "Churros DB URL: [bold]%s[reset]", redactURL(config.ChurrosDatabaseURL)) 38 ll.Log("", "reset", "Redis URL: [bold]%s[reset]", redactURL(config.RedisURL)) 39 ll.Log("", "reset", "Health check on: [bold]:%d/health[reset]", config.HealthCheckPort) 40 ll.Log("", "reset", "App Package ID: [bold]%s[reset]", config.AppPackageId) 41 if config.VapidPublicKey != "" && config.VapidPrivateKey != "" { 42 ll.Log("", "reset", "VAPID keys: [bold][green]set[reset]") 43 } else { 44 ll.Log("", "reset", "VAPID keys: [bold][red]not set[reset]") 45 } 46 if config.HasValidFirebaseServiceAccount() { 47 ll.Log("", "reset", "Firebase: [bold][green]configured[reset]") 48 } else { 49 ll.Log("", "reset", "Firebase: [bold][red]unconfigured[reset]") 50 } 51 fmt.Println() 52 53 if config.StartupScheduleRestoration != "disabled" { 54 notella.RestoreSchedule(config.StartupScheduleRestoration == "eager") 55 } 56 notella.DisplaySchedule() 57 58 ll.Info("starting scheduler") 59 go notella.StartScheduler() 60 61 ll.Log("Connecting", "cyan", "to Churros database at [bold]%s[reset]", config.ChurrosDatabaseURL) 62 err := notella.ConnectToDababase() 63 if err != nil { 64 ll.ErrorDisplay("could not connect to database", err) 65 } 66 67 ll.Log("Connecting", "cyan", "to NATS server at [bold]%s[reset]", config.NatsURL) 68 nc, err := nats.Connect(config.NatsURL) 69 if err != nil { 70 ll.ErrorDisplay("could not connect to NATS at %s", err, config.NatsURL) 71 return 72 } 73 74 js, err := nc.JetStream() 75 if err != nil { 76 ll.ErrorDisplay("could not connect to Jetstream", err) 77 return 78 } 79 80 ll.Log("Initializing", "cyan", "a Jetstream stream [bold]%s[reset], listening for subject [bold]%s[reset]", notella.StreamName, notella.SubjectName) 81 82 _, err = js.AddStream(&nats.StreamConfig{ 83 Name: notella.StreamName, 84 Subjects: []string{notella.SubjectName}, 85 }) 86 if err != nil { 87 ll.ErrorDisplay("could not create stream", err) 88 return 89 } 90 91 ll.Log("Initializing", "cyan", "Jetstream consumer [bold]NotellaConsumer[reset] with [bold]AckExplicitPolicy[reset]") 92 93 _, err = js.AddConsumer(notella.StreamName, &nats.ConsumerConfig{ 94 Durable: "NotellaConsumer", 95 AckPolicy: nats.AckExplicitPolicy, 96 }) 97 if err != nil { 98 ll.ErrorDisplay("could not create consumer", err) 99 return 100 } 101 102 ll.Log("Starting", "cyan", "consumer [bold]NotellaConsumer[reset]") 103 104 sub, err := js.PullSubscribe(notella.SubjectName, "NotellaConsumer") 105 if err != nil { 106 ll.ErrorDisplay("could not start consumer", err) 107 return 108 } 109 110 // Setup a context to handle graceful shutdowns 111 ctx, cancel := context.WithCancel(context.Background()) 112 defer cancel() 113 114 // Capture OS signals for graceful shutdown 115 go func() { 116 sigChan := make(chan os.Signal, 1) 117 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) 118 <-sigChan 119 ll.Log("Shuting down", "magenta", "because of signal received") 120 cancel() 121 }() 122 123 // Start healthcheck endpoint 124 go notella.StartHealthCheckEndpoint(config.HealthCheckPort) 125 126 // Send EventShowScheduledJobs to the stream every 5 minutes and save schedule to redis 127 go func() { 128 for { 129 select { 130 case <-ctx.Done(): 131 return 132 default: 133 time.Sleep(5 * time.Minute) 134 notella.DisplaySchedule() 135 notella.SaveSchedule() 136 } 137 } 138 }() 139 140 // Continuously fetch and process messages 141 go func() { 142 for { 143 select { 144 case <-ctx.Done(): 145 return 146 default: 147 // Fetch messages in batches 148 msgs, err := sub.Fetch(10, nats.MaxWait(5*time.Second)) 149 if err != nil && err != nats.ErrTimeout { 150 ll.ErrorDisplay("Could not fetch messages", err) 151 time.Sleep(2 * time.Second) // Wait before retrying 152 continue 153 } 154 155 // Process each message 156 for _, msg := range msgs { 157 err = notella.NatsReceiver(msg) 158 if err != nil { 159 ll.ErrorDisplay("Could not process message", err) 160 } 161 msg.Ack() // Acknowledge the message 162 } 163 } 164 } 165 }() 166 167 // Block until the context is canceled (i.e., server shutdown signal received) 168 <-ctx.Done() 169 ll.Log("Stopped", "red", "server") 170}