A fork of the Cocoon PDS but being made more distributed.
0

Configure Feed

Select the types of activity you want to include in your feed.

at main 2.7 kB View raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "log/slog" 7 "net/http" 8 "time" 9 10 "github.com/bluesky-social/indigo/events" 11 "github.com/bluesky-social/indigo/lex/util" 12 "github.com/haileyok/cocoon/models" 13) 14 15func (s *Server) emmitEvents(ctx context.Context) error { 16 ctx, cancel := context.WithCancel(ctx) 17 defer cancel() 18 19 logger := s.logger.With("component", "event-emmiter") 20 ident := "self" 21 22 // get the most recent event 23 rec := &models.EventRecord{} 24 err := s.db.Raw(ctx, "SELECT * from event_records ORDER BY seq DESC LIMIT 1", nil).Scan(&rec).Error 25 if err != nil { 26 slog.Error("fetching most recent event record", "error", err) 27 rec.Seq = time.Now().UnixMilli() 28 } 29 30 since := rec.Seq 31 32 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 33 return true 34 }, &since) 35 if err != nil { 36 return err 37 } 38 defer evtManCancel() 39 40 header := events.EventHeader{Op: events.EvtKindMessage} 41 for evt := range evts { 42 func() { 43 if ctx.Err() != nil { 44 logger.Error("context error", "err", err) 45 return 46 } 47 48 var obj util.CBOR 49 switch { 50 case evt.Error != nil: 51 header.Op = events.EvtKindErrorFrame 52 obj = evt.Error 53 case evt.RepoCommit != nil: 54 header.MsgType = "#commit" 55 obj = evt.RepoCommit 56 case evt.RepoIdentity != nil: 57 header.MsgType = "#identity" 58 obj = evt.RepoIdentity 59 case evt.RepoAccount != nil: 60 header.MsgType = "#account" 61 obj = evt.RepoAccount 62 case evt.RepoInfo != nil: 63 header.MsgType = "#info" 64 obj = evt.RepoInfo 65 default: 66 logger.Warn("unrecognized event kind") 67 return 68 } 69 70 buf := new(bytes.Buffer) 71 72 if err := header.MarshalCBOR(buf); err != nil { 73 logger.Error("failed to marshal header to buffer", "err", err) 74 return 75 } 76 77 if err := obj.MarshalCBOR(buf); err != nil { 78 logger.Error("failed to marshal event to buffer", "err", err) 79 return 80 } 81 82 // TODO: use a HTTP client here not the default 83 resp, err := http.Post(s.config.SubscribeReposServiceURL, "", buf) 84 if err != nil { 85 logger.Error("posting to web server", "error", err) 86 return 87 } 88 if resp.StatusCode == http.StatusAccepted { 89 logger.Info("posted event to subscribe repos") 90 return 91 } 92 93 logger.Error("posting event to subscribe repos", "status", resp.StatusCode) 94 }() 95 } 96 97 // we should tell the relay to request a new crawl at this point if we got disconnected 98 // use a new context since the old one might be cancelled at this point 99 go func() { 100 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 101 defer retryCancel() 102 if err := s.requestCrawl(retryCtx); err != nil { 103 logger.Error("error requesting crawls", "err", err) 104 } 105 }() 106 107 return nil 108}