Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tapc 4 5import ( 6 "bytes" 7 "context" 8 "encoding/base64" 9 "encoding/json" 10 "fmt" 11 "net/http" 12 "net/url" 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/gorilla/websocket" 17 "tangled.org/core/log" 18) 19 20type Handler interface { 21 OnEvent(ctx context.Context, evt Event) error 22 OnError(ctx context.Context, err error) 23} 24 25type ConnectHandler interface { 26 OnConnect(ctx context.Context) 27} 28 29type Client struct { 30 Url string 31 AdminPassword string 32 HTTPClient *http.Client 33} 34 35func NewClient(url, adminPassword string) Client { 36 return Client{ 37 Url: url, 38 AdminPassword: adminPassword, 39 HTTPClient: &http.Client{}, 40 } 41} 42 43func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 44 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 45 if err != nil { 46 return err 47 } 48 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 49 if err != nil { 50 return err 51 } 52 req.SetBasicAuth("admin", c.AdminPassword) 53 req.Header.Set("Content-Type", "application/json") 54 55 resp, err := c.HTTPClient.Do(req) 56 if err != nil { 57 return err 58 } 59 defer resp.Body.Close() 60 if resp.StatusCode != http.StatusOK { 61 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 62 } 63 return nil 64} 65 66func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 67 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 68 if err != nil { 69 return err 70 } 71 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 72 if err != nil { 73 return err 74 } 75 req.SetBasicAuth("admin", c.AdminPassword) 76 req.Header.Set("Content-Type", "application/json") 77 78 resp, err := c.HTTPClient.Do(req) 79 if err != nil { 80 return err 81 } 82 defer resp.Body.Close() 83 if resp.StatusCode != http.StatusOK { 84 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 85 } 86 return nil 87} 88 89func (c *Client) Connect(ctx context.Context, handler Handler) error { 90 l := log.FromContext(ctx) 91 92 u, err := url.Parse(c.Url) 93 if err != nil { 94 return err 95 } 96 if u.Scheme == "https" { 97 u.Scheme = "wss" 98 } else { 99 u.Scheme = "ws" 100 } 101 u.Path = "/channel" 102 103 url := u.String() 104 basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte("admin:"+c.AdminPassword)) 105 106 var backoff int 107 for { 108 select { 109 case <-ctx.Done(): 110 return ctx.Err() 111 default: 112 } 113 114 header := http.Header{ 115 "Authorization": []string{basicAuth}, 116 } 117 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 118 if err != nil { 119 if backoff < 12 { 120 backoff++ 121 } 122 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 123 time.Sleep(time.Duration(5*backoff) * time.Second) 124 125 continue 126 } 127 backoff = 0 128 l.Info("connected to tap service", "subscription_code", res.StatusCode) 129 130 if ch, ok := handler.(ConnectHandler); ok { 131 ch.OnConnect(ctx) 132 } 133 134 if err = c.handleConnection(ctx, conn, handler); err != nil { 135 l.Warn("tap connection failed", "err", err) 136 } 137 } 138} 139 140func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 141 l := log.FromContext(ctx) 142 143 defer func() { 144 conn.Close() 145 l.Warn("closed tap connection") 146 }() 147 l.Info("established tap connection") 148 149 for { 150 select { 151 case <-ctx.Done(): 152 return ctx.Err() 153 default: 154 } 155 _, message, err := conn.ReadMessage() 156 if err != nil { 157 return err 158 } 159 160 var ev Event 161 if err := json.Unmarshal(message, &ev); err != nil { 162 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 163 continue 164 } 165 if err := handler.OnEvent(ctx, ev); err != nil { 166 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 167 continue 168 } 169 170 ack := map[string]any{ 171 "type": "ack", 172 "id": ev.ID, 173 } 174 if err := conn.WriteJSON(ack); err != nil { 175 l.Warn("failed to send ack", "err", err) 176 continue 177 } 178 } 179}