Monorepo for Tangled
tangled.org
1package webhook
2
3import (
4 "bytes"
5 "context"
6 "crypto/hmac"
7 "crypto/sha256"
8 "encoding/hex"
9 "encoding/json"
10 "fmt"
11 "io"
12 "log/slog"
13 "net/http"
14 "time"
15
16 "github.com/avast/retry-go/v4"
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 "github.com/google/uuid"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 "tangled.org/core/log"
23)
24
25type Notifier struct {
26 notify.BaseNotifier
27 db *db.DB
28 logger *slog.Logger
29 client *http.Client
30}
31
32func NewNotifier(database *db.DB) *Notifier {
33 return &Notifier{
34 db: database,
35 logger: log.New("webhook-notifier"),
36 client: &http.Client{
37 Timeout: 30 * time.Second,
38 },
39 }
40}
41
42var _ notify.Notifier = &Notifier{}
43
44func (w *Notifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
45 webhooks, err := w.activeWebhooksForEvent(repo.RepoDid, models.WebhookEventPush)
46 if err != nil {
47 w.logger.Error("failed to get webhooks for repo", "repo_did", repo.RepoDid, "err", err)
48 return
49 }
50 if len(webhooks) == 0 {
51 return
52 }
53
54 payload := w.buildPushPayload(repo, ref, oldSha, newSha, committerDid)
55 payloadBytes, err := json.Marshal(payload)
56 if err != nil {
57 w.logger.Error("failed to marshal push payload", "repo_did", repo.RepoDid, "err", err)
58 return
59 }
60
61 userAgent := "Tangled-Hook/" + newSha[:7]
62 for _, webhook := range webhooks {
63 go w.sendWebhook(ctx, webhook, string(models.WebhookEventPush), payload.Repository.FullName, userAgent, payloadBytes)
64 }
65}
66
67func (w *Notifier) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) {
68 webhooks, err := w.activeWebhooksForEvent(newRepo.RepoDid, models.WebhookEventRepoRenamed)
69 if err != nil {
70 w.logger.Error("failed to get webhooks for repo", "repo_did", newRepo.RepoDid, "err", err)
71 return
72 }
73 if len(webhooks) == 0 {
74 return
75 }
76
77 payload := &models.WebhookRenamePayload{
78 OldName: oldRepo.Name,
79 NewName: newRepo.Name,
80 Repository: buildWebhookRepository(newRepo),
81 Sender: models.WebhookUser{Did: actor.String()},
82 }
83 payloadBytes, err := json.Marshal(payload)
84 if err != nil {
85 w.logger.Error("failed to marshal rename payload", "repo_did", newRepo.RepoDid, "err", err)
86 return
87 }
88
89 userAgent := "Tangled-Hook/rename"
90 for _, webhook := range webhooks {
91 go w.sendWebhook(ctx, webhook, string(models.WebhookEventRepoRenamed), payload.Repository.FullName, userAgent, payloadBytes)
92 }
93}
94
95func (w *Notifier) activeWebhooksForEvent(repoDid string, event models.WebhookEvent) ([]models.Webhook, error) {
96 webhooks, err := db.GetActiveWebhooksForRepo(w.db, repoDid)
97 if err != nil {
98 return nil, err
99 }
100 var matching []models.Webhook
101 for _, webhook := range webhooks {
102 if webhook.HasEvent(event) {
103 matching = append(matching, webhook)
104 }
105 }
106 return matching, nil
107}
108
109func buildWebhookRepository(repo *models.Repo) models.WebhookRepository {
110 repository := models.WebhookRepository{
111 Name: repo.Name,
112 FullName: fmt.Sprintf("%s/%s", repo.Did, repo.Rkey),
113 Description: repo.Description,
114 Fork: repo.Source != "",
115 HtmlUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Rkey),
116 CloneUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Rkey),
117 SshUrl: fmt.Sprintf("ssh://git@%s/%s/%s", repo.Knot, repo.Did, repo.Rkey),
118 CreatedAt: repo.Created.Format(time.RFC3339),
119 UpdatedAt: repo.Created.Format(time.RFC3339),
120 Owner: models.WebhookUser{
121 Did: repo.Did,
122 },
123 }
124 if repo.Website != "" {
125 repository.Website = repo.Website
126 }
127 if repo.RepoStats != nil {
128 repository.StarsCount = repo.RepoStats.StarCount
129 repository.OpenIssues = repo.RepoStats.IssueCount.Open
130 }
131 return repository
132}
133
134func (w *Notifier) buildPushPayload(repo *models.Repo, ref, oldSha, newSha, committerDid string) *models.WebhookPayload {
135 pusher := committerDid
136 if committerDid == "" {
137 pusher = repo.Did
138 }
139 return &models.WebhookPayload{
140 Ref: ref,
141 Before: oldSha,
142 After: newSha,
143 Repository: buildWebhookRepository(repo),
144 Pusher: models.WebhookUser{
145 Did: pusher,
146 },
147 }
148}
149
150func (w *Notifier) sendWebhook(ctx context.Context, webhook models.Webhook, event, repoFullName, userAgent string, payloadBytes []byte) {
151 deliveryId := uuid.New().String()
152
153 req, err := http.NewRequestWithContext(ctx, "POST", webhook.Url, bytes.NewReader(payloadBytes))
154 if err != nil {
155 w.logger.Error("failed to create webhook request", "webhook_id", webhook.Id, "err", err)
156 return
157 }
158
159 req.Header.Set("Content-Type", "application/json")
160 req.Header.Set("User-Agent", userAgent)
161 req.Header.Set("X-Tangled-Event", event)
162 req.Header.Set("X-Tangled-Hook-ID", fmt.Sprintf("%d", webhook.Id))
163 req.Header.Set("X-Tangled-Delivery", deliveryId)
164 req.Header.Set("X-Tangled-Repo", repoFullName)
165
166 if webhook.Secret != "" {
167 signature := w.computeSignature(payloadBytes, webhook.Secret)
168 req.Header.Set("X-Tangled-Signature-256", "sha256="+signature)
169 }
170
171 delivery := &models.WebhookDelivery{
172 WebhookId: webhook.Id,
173 Event: event,
174 DeliveryId: deliveryId,
175 Url: webhook.Url,
176 RequestBody: string(payloadBytes),
177 }
178
179 retryOpts := []retry.Option{
180 retry.Attempts(3),
181 retry.Delay(1 * time.Second),
182 retry.MaxDelay(10 * time.Second),
183 retry.DelayType(retry.BackOffDelay),
184 retry.LastErrorOnly(true),
185 retry.OnRetry(func(n uint, err error) {
186 w.logger.Info("retrying webhook delivery",
187 "webhook_id", webhook.Id,
188 "attempt", n+1,
189 "err", err)
190 }),
191 retry.Context(ctx),
192 retry.RetryIf(func(err error) bool {
193 return err != nil
194 }),
195 }
196
197 var resp *http.Response
198 err = retry.Do(func() error {
199 var err error
200 resp, err = w.client.Do(req)
201 if err != nil {
202 return err
203 }
204 if resp.StatusCode >= 500 {
205 defer resp.Body.Close()
206 return fmt.Errorf("server error: %d", resp.StatusCode)
207 }
208 return nil
209 }, retryOpts...)
210
211 if err != nil {
212 w.logger.Error("webhook request failed after retries", "webhook_id", webhook.Id, "err", err)
213 delivery.Success = false
214 delivery.ResponseBody = err.Error()
215 } else {
216 defer resp.Body.Close()
217
218 delivery.ResponseCode = resp.StatusCode
219 delivery.Success = resp.StatusCode >= 200 && resp.StatusCode < 300
220
221 bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024))
222 if err != nil {
223 w.logger.Warn("failed to read webhook response body", "webhook_id", webhook.Id, "err", err)
224 } else {
225 delivery.ResponseBody = string(bodyBytes)
226 }
227
228 if !delivery.Success {
229 w.logger.Warn("webhook delivery failed",
230 "webhook_id", webhook.Id,
231 "status", resp.StatusCode,
232 "url", webhook.Url)
233 } else {
234 w.logger.Info("webhook delivered successfully",
235 "webhook_id", webhook.Id,
236 "url", webhook.Url,
237 "delivery_id", deliveryId)
238 }
239 }
240
241 if err := db.AddWebhookDelivery(w.db, delivery); err != nil {
242 w.logger.Error("failed to record webhook delivery", "webhook_id", webhook.Id, "err", err)
243 }
244}
245
246func (w *Notifier) computeSignature(payload []byte, secret string) string {
247 mac := hmac.New(sha256.New, []byte(secret))
248 mac.Write(payload)
249 return hex.EncodeToString(mac.Sum(nil))
250}