forked from
willdot.net/cocoon
A fork of the Cocoon PDS but being made more distributed.
1package server
2
3import (
4 "bytes"
5 "context"
6 "log/slog"
7 "net/http"
8 "time"
9
10 "github.com/bluesky-social/indigo/events"
11 "github.com/bluesky-social/indigo/lex/util"
12 "github.com/haileyok/cocoon/models"
13)
14
15func (s *Server) emmitEvents(ctx context.Context) error {
16 ctx, cancel := context.WithCancel(ctx)
17 defer cancel()
18
19 logger := s.logger.With("component", "event-emmiter")
20 ident := "self"
21
22 // get the most recent event
23 rec := &models.EventRecord{}
24 err := s.db.Raw(ctx, "SELECT * from event_records ORDER BY seq DESC LIMIT 1", nil).Scan(&rec).Error
25 if err != nil {
26 slog.Error("fetching most recent event record", "error", err)
27 rec.Seq = time.Now().UnixMilli()
28 }
29
30 since := rec.Seq
31
32 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
33 return true
34 }, &since)
35 if err != nil {
36 return err
37 }
38 defer evtManCancel()
39
40 header := events.EventHeader{Op: events.EvtKindMessage}
41 for evt := range evts {
42 func() {
43 if ctx.Err() != nil {
44 logger.Error("context error", "err", err)
45 return
46 }
47
48 var obj util.CBOR
49 switch {
50 case evt.Error != nil:
51 header.Op = events.EvtKindErrorFrame
52 obj = evt.Error
53 case evt.RepoCommit != nil:
54 header.MsgType = "#commit"
55 obj = evt.RepoCommit
56 case evt.RepoIdentity != nil:
57 header.MsgType = "#identity"
58 obj = evt.RepoIdentity
59 case evt.RepoAccount != nil:
60 header.MsgType = "#account"
61 obj = evt.RepoAccount
62 case evt.RepoInfo != nil:
63 header.MsgType = "#info"
64 obj = evt.RepoInfo
65 default:
66 logger.Warn("unrecognized event kind")
67 return
68 }
69
70 buf := new(bytes.Buffer)
71
72 if err := header.MarshalCBOR(buf); err != nil {
73 logger.Error("failed to marshal header to buffer", "err", err)
74 return
75 }
76
77 if err := obj.MarshalCBOR(buf); err != nil {
78 logger.Error("failed to marshal event to buffer", "err", err)
79 return
80 }
81
82 // TODO: use a HTTP client here not the default
83 resp, err := http.Post(s.config.SubscribeReposServiceURL, "", buf)
84 if err != nil {
85 logger.Error("posting to web server", "error", err)
86 return
87 }
88 if resp.StatusCode == http.StatusAccepted {
89 logger.Info("posted event to subscribe repos")
90 return
91 }
92
93 logger.Error("posting event to subscribe repos", "status", resp.StatusCode)
94 }()
95 }
96
97 // we should tell the relay to request a new crawl at this point if we got disconnected
98 // use a new context since the old one might be cancelled at this point
99 go func() {
100 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
101 defer retryCancel()
102 if err := s.requestCrawl(retryCtx); err != nil {
103 logger.Error("error requesting crawls", "err", err)
104 }
105 }()
106
107 return nil
108}