Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1// extending code generated from sh.tangled.ci.pipeline.subscribeLogs 2 3package tangled 4 5import ( 6 "context" 7 "fmt" 8 "io" 9 10 "github.com/bluesky-social/indigo/events" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 cbg "github.com/whyrusleeping/cbor-gen" 13 extlexutil "tangled.org/core/lexutil" 14) 15 16// TODO: generate codes below from lexicon 17type CiPipelineSubscribeLogs_Event struct { 18 Error *events.ErrorFrame 19 Control *CiPipelineSubscribeLogs_Control 20 Data *CiPipelineSubscribeLogs_Data 21 22 // some private fields for internal routing perf 23 Preserialized []byte `json:"-" cborgen:"-"` 24} 25 26func (xevt *CiPipelineSubscribeLogs_Event) Serialize(wc io.Writer) error { 27 header := events.EventHeader{Op: events.EvtKindMessage} 28 var obj lexutil.CBOR 29 30 switch { 31 case xevt.Error != nil: 32 header.Op = events.EvtKindErrorFrame 33 obj = xevt.Error 34 case xevt.Control != nil: 35 header.MsgType = "#control" 36 obj = xevt.Control 37 case xevt.Data != nil: 38 header.MsgType = "#data" 39 obj = xevt.Data 40 default: 41 return fmt.Errorf("unrecognized event kind") 42 } 43 44 cborWriter := cbg.NewCborWriter(wc) 45 if err := header.MarshalCBOR(cborWriter); err != nil { 46 return fmt.Errorf("failed to write header: %w", err) 47 } 48 return obj.MarshalCBOR(cborWriter) 49} 50 51func (xevt *CiPipelineSubscribeLogs_Event) Deserialize(r io.Reader) error { 52 var header events.EventHeader 53 if err := header.UnmarshalCBOR(r); err != nil { 54 return fmt.Errorf("reading header: %w", err) 55 } 56 switch header.Op { 57 case events.EvtKindMessage: 58 switch header.MsgType { 59 case "#control": 60 var evt CiPipelineSubscribeLogs_Control 61 if err := evt.UnmarshalCBOR(r); err != nil { 62 return fmt.Errorf("reading repoCommit event: %w", err) 63 } 64 xevt.Control = &evt 65 case "#data": 66 var evt CiPipelineSubscribeLogs_Data 67 if err := evt.UnmarshalCBOR(r); err != nil { 68 return fmt.Errorf("reading repoSync event: %w", err) 69 } 70 xevt.Data = &evt 71 } 72 case events.EvtKindErrorFrame: 73 var errframe events.ErrorFrame 74 if err := errframe.UnmarshalCBOR(r); err != nil { 75 return err 76 } 77 xevt.Error = &errframe 78 default: 79 return fmt.Errorf("unrecognized event stream type: %d", header.Op) 80 } 81 return nil 82} 83 84func CiPipelineSubscribeLogs(ctx context.Context, c extlexutil.LexClient, pipeline string, workflows []string, sched extlexutil.Scheduler[CiPipelineSubscribeLogs_Event]) error { 85 defer sched.Shutdown() 86 87 params := map[string]any{} 88 params["pipeline"] = pipeline 89 params["workflows"] = workflows 90 91 return c.LexDo(ctx, extlexutil.Subscription, "", CiPipelineSubscribeLogsNSID, params, nil, func(ctx context.Context, cr *cbg.CborReader) error { 92 var evt CiPipelineSubscribeLogs_Event 93 if err := evt.Deserialize(cr); err != nil { 94 return err 95 } 96 return sched.AddWork(ctx, "", &evt) 97 }) 98}