Monorepo for Tangled
tangled.org
1// extending code generated from sh.tangled.ci.pipeline.subscribeLogs
2
3package tangled
4
5import (
6 "context"
7 "fmt"
8 "io"
9
10 "github.com/bluesky-social/indigo/events"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12 cbg "github.com/whyrusleeping/cbor-gen"
13 extlexutil "tangled.org/core/lexutil"
14)
15
16// TODO: generate codes below from lexicon
17type CiPipelineSubscribeLogs_Event struct {
18 Error *events.ErrorFrame
19 Control *CiPipelineSubscribeLogs_Control
20 Data *CiPipelineSubscribeLogs_Data
21
22 // some private fields for internal routing perf
23 Preserialized []byte `json:"-" cborgen:"-"`
24}
25
26func (xevt *CiPipelineSubscribeLogs_Event) Serialize(wc io.Writer) error {
27 header := events.EventHeader{Op: events.EvtKindMessage}
28 var obj lexutil.CBOR
29
30 switch {
31 case xevt.Error != nil:
32 header.Op = events.EvtKindErrorFrame
33 obj = xevt.Error
34 case xevt.Control != nil:
35 header.MsgType = "#control"
36 obj = xevt.Control
37 case xevt.Data != nil:
38 header.MsgType = "#data"
39 obj = xevt.Data
40 default:
41 return fmt.Errorf("unrecognized event kind")
42 }
43
44 cborWriter := cbg.NewCborWriter(wc)
45 if err := header.MarshalCBOR(cborWriter); err != nil {
46 return fmt.Errorf("failed to write header: %w", err)
47 }
48 return obj.MarshalCBOR(cborWriter)
49}
50
51func (xevt *CiPipelineSubscribeLogs_Event) Deserialize(r io.Reader) error {
52 var header events.EventHeader
53 if err := header.UnmarshalCBOR(r); err != nil {
54 return fmt.Errorf("reading header: %w", err)
55 }
56 switch header.Op {
57 case events.EvtKindMessage:
58 switch header.MsgType {
59 case "#control":
60 var evt CiPipelineSubscribeLogs_Control
61 if err := evt.UnmarshalCBOR(r); err != nil {
62 return fmt.Errorf("reading repoCommit event: %w", err)
63 }
64 xevt.Control = &evt
65 case "#data":
66 var evt CiPipelineSubscribeLogs_Data
67 if err := evt.UnmarshalCBOR(r); err != nil {
68 return fmt.Errorf("reading repoSync event: %w", err)
69 }
70 xevt.Data = &evt
71 }
72 case events.EvtKindErrorFrame:
73 var errframe events.ErrorFrame
74 if err := errframe.UnmarshalCBOR(r); err != nil {
75 return err
76 }
77 xevt.Error = &errframe
78 default:
79 return fmt.Errorf("unrecognized event stream type: %d", header.Op)
80 }
81 return nil
82}
83
84func CiPipelineSubscribeLogs(ctx context.Context, c extlexutil.LexClient, pipeline string, workflows []string, sched extlexutil.Scheduler[CiPipelineSubscribeLogs_Event]) error {
85 defer sched.Shutdown()
86
87 params := map[string]any{}
88 params["pipeline"] = pipeline
89 params["workflows"] = workflows
90
91 return c.LexDo(ctx, extlexutil.Subscription, "", CiPipelineSubscribeLogsNSID, params, nil, func(ctx context.Context, cr *cbg.CborReader) error {
92 var evt CiPipelineSubscribeLogs_Event
93 if err := evt.Deserialize(cr); err != nil {
94 return err
95 }
96 return sched.AddWork(ctx, "", &evt)
97 })
98}