Monorepo for Tangled
tangled.org
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
3package tapc
4
5import (
6 "bytes"
7 "context"
8 "encoding/base64"
9 "encoding/json"
10 "fmt"
11 "net/http"
12 "net/url"
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/gorilla/websocket"
17 "tangled.org/core/log"
18)
19
20type Handler interface {
21 OnEvent(ctx context.Context, evt Event) error
22 OnError(ctx context.Context, err error)
23}
24
25type ConnectHandler interface {
26 OnConnect(ctx context.Context)
27}
28
29type Client struct {
30 Url string
31 AdminPassword string
32 HTTPClient *http.Client
33}
34
35func NewClient(url, adminPassword string) Client {
36 return Client{
37 Url: url,
38 AdminPassword: adminPassword,
39 HTTPClient: &http.Client{},
40 }
41}
42
43func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
44 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
45 if err != nil {
46 return err
47 }
48 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
49 if err != nil {
50 return err
51 }
52 req.SetBasicAuth("admin", c.AdminPassword)
53 req.Header.Set("Content-Type", "application/json")
54
55 resp, err := c.HTTPClient.Do(req)
56 if err != nil {
57 return err
58 }
59 defer resp.Body.Close()
60 if resp.StatusCode != http.StatusOK {
61 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
62 }
63 return nil
64}
65
66func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
67 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
68 if err != nil {
69 return err
70 }
71 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
72 if err != nil {
73 return err
74 }
75 req.SetBasicAuth("admin", c.AdminPassword)
76 req.Header.Set("Content-Type", "application/json")
77
78 resp, err := c.HTTPClient.Do(req)
79 if err != nil {
80 return err
81 }
82 defer resp.Body.Close()
83 if resp.StatusCode != http.StatusOK {
84 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
85 }
86 return nil
87}
88
89func (c *Client) Connect(ctx context.Context, handler Handler) error {
90 l := log.FromContext(ctx)
91
92 u, err := url.Parse(c.Url)
93 if err != nil {
94 return err
95 }
96 if u.Scheme == "https" {
97 u.Scheme = "wss"
98 } else {
99 u.Scheme = "ws"
100 }
101 u.Path = "/channel"
102
103 url := u.String()
104 basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte("admin:"+c.AdminPassword))
105
106 var backoff int
107 for {
108 select {
109 case <-ctx.Done():
110 return ctx.Err()
111 default:
112 }
113
114 header := http.Header{
115 "Authorization": []string{basicAuth},
116 }
117 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
118 if err != nil {
119 if backoff < 12 {
120 backoff++
121 }
122 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
123 time.Sleep(time.Duration(5*backoff) * time.Second)
124
125 continue
126 }
127 backoff = 0
128 l.Info("connected to tap service", "subscription_code", res.StatusCode)
129
130 if ch, ok := handler.(ConnectHandler); ok {
131 ch.OnConnect(ctx)
132 }
133
134 if err = c.handleConnection(ctx, conn, handler); err != nil {
135 l.Warn("tap connection failed", "err", err)
136 }
137 }
138}
139
140func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
141 l := log.FromContext(ctx)
142
143 defer func() {
144 conn.Close()
145 l.Warn("closed tap connection")
146 }()
147 l.Info("established tap connection")
148
149 for {
150 select {
151 case <-ctx.Done():
152 return ctx.Err()
153 default:
154 }
155 _, message, err := conn.ReadMessage()
156 if err != nil {
157 return err
158 }
159
160 var ev Event
161 if err := json.Unmarshal(message, &ev); err != nil {
162 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
163 continue
164 }
165 if err := handler.OnEvent(ctx, ev); err != nil {
166 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
167 continue
168 }
169
170 ack := map[string]any{
171 "type": "ack",
172 "id": ev.ID,
173 }
174 if err := conn.WriteJSON(ack); err != nil {
175 l.Warn("failed to send ack", "err", err)
176 continue
177 }
178 }
179}