Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

at icy/ytnwlw 7.1 kB View raw
1package webhook 2 3import ( 4 "bytes" 5 "context" 6 "crypto/hmac" 7 "crypto/sha256" 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "io" 12 "log/slog" 13 "net/http" 14 "time" 15 16 "github.com/avast/retry-go/v4" 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 "github.com/google/uuid" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 "tangled.org/core/log" 23) 24 25type Notifier struct { 26 notify.BaseNotifier 27 db *db.DB 28 logger *slog.Logger 29 client *http.Client 30} 31 32func NewNotifier(database *db.DB) *Notifier { 33 return &Notifier{ 34 db: database, 35 logger: log.New("webhook-notifier"), 36 client: &http.Client{ 37 Timeout: 30 * time.Second, 38 }, 39 } 40} 41 42var _ notify.Notifier = &Notifier{} 43 44func (w *Notifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 45 webhooks, err := w.activeWebhooksForEvent(repo.RepoDid, models.WebhookEventPush) 46 if err != nil { 47 w.logger.Error("failed to get webhooks for repo", "repo_did", repo.RepoDid, "err", err) 48 return 49 } 50 if len(webhooks) == 0 { 51 return 52 } 53 54 payload := w.buildPushPayload(repo, ref, oldSha, newSha, committerDid) 55 payloadBytes, err := json.Marshal(payload) 56 if err != nil { 57 w.logger.Error("failed to marshal push payload", "repo_did", repo.RepoDid, "err", err) 58 return 59 } 60 61 userAgent := "Tangled-Hook/" + newSha[:7] 62 for _, webhook := range webhooks { 63 go w.sendWebhook(ctx, webhook, string(models.WebhookEventPush), payload.Repository.FullName, userAgent, payloadBytes) 64 } 65} 66 67func (w *Notifier) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) { 68 webhooks, err := w.activeWebhooksForEvent(newRepo.RepoDid, models.WebhookEventRepoRenamed) 69 if err != nil { 70 w.logger.Error("failed to get webhooks for repo", "repo_did", newRepo.RepoDid, "err", err) 71 return 72 } 73 if len(webhooks) == 0 { 74 return 75 } 76 77 payload := &models.WebhookRenamePayload{ 78 OldName: oldRepo.Name, 79 NewName: newRepo.Name, 80 Repository: buildWebhookRepository(newRepo), 81 Sender: models.WebhookUser{Did: actor.String()}, 82 } 83 payloadBytes, err := json.Marshal(payload) 84 if err != nil { 85 w.logger.Error("failed to marshal rename payload", "repo_did", newRepo.RepoDid, "err", err) 86 return 87 } 88 89 userAgent := "Tangled-Hook/rename" 90 for _, webhook := range webhooks { 91 go w.sendWebhook(ctx, webhook, string(models.WebhookEventRepoRenamed), payload.Repository.FullName, userAgent, payloadBytes) 92 } 93} 94 95func (w *Notifier) activeWebhooksForEvent(repoDid string, event models.WebhookEvent) ([]models.Webhook, error) { 96 webhooks, err := db.GetActiveWebhooksForRepo(w.db, repoDid) 97 if err != nil { 98 return nil, err 99 } 100 var matching []models.Webhook 101 for _, webhook := range webhooks { 102 if webhook.HasEvent(event) { 103 matching = append(matching, webhook) 104 } 105 } 106 return matching, nil 107} 108 109func buildWebhookRepository(repo *models.Repo) models.WebhookRepository { 110 repository := models.WebhookRepository{ 111 Name: repo.Name, 112 FullName: fmt.Sprintf("%s/%s", repo.Did, repo.Rkey), 113 Description: repo.Description, 114 Fork: repo.Source != "", 115 HtmlUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Rkey), 116 CloneUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Rkey), 117 SshUrl: fmt.Sprintf("ssh://git@%s/%s/%s", repo.Knot, repo.Did, repo.Rkey), 118 CreatedAt: repo.Created.Format(time.RFC3339), 119 UpdatedAt: repo.Created.Format(time.RFC3339), 120 Owner: models.WebhookUser{ 121 Did: repo.Did, 122 }, 123 } 124 if repo.Website != "" { 125 repository.Website = repo.Website 126 } 127 if repo.RepoStats != nil { 128 repository.StarsCount = repo.RepoStats.StarCount 129 repository.OpenIssues = repo.RepoStats.IssueCount.Open 130 } 131 return repository 132} 133 134func (w *Notifier) buildPushPayload(repo *models.Repo, ref, oldSha, newSha, committerDid string) *models.WebhookPayload { 135 pusher := committerDid 136 if committerDid == "" { 137 pusher = repo.Did 138 } 139 return &models.WebhookPayload{ 140 Ref: ref, 141 Before: oldSha, 142 After: newSha, 143 Repository: buildWebhookRepository(repo), 144 Pusher: models.WebhookUser{ 145 Did: pusher, 146 }, 147 } 148} 149 150func (w *Notifier) sendWebhook(ctx context.Context, webhook models.Webhook, event, repoFullName, userAgent string, payloadBytes []byte) { 151 deliveryId := uuid.New().String() 152 153 req, err := http.NewRequestWithContext(ctx, "POST", webhook.Url, bytes.NewReader(payloadBytes)) 154 if err != nil { 155 w.logger.Error("failed to create webhook request", "webhook_id", webhook.Id, "err", err) 156 return 157 } 158 159 req.Header.Set("Content-Type", "application/json") 160 req.Header.Set("User-Agent", userAgent) 161 req.Header.Set("X-Tangled-Event", event) 162 req.Header.Set("X-Tangled-Hook-ID", fmt.Sprintf("%d", webhook.Id)) 163 req.Header.Set("X-Tangled-Delivery", deliveryId) 164 req.Header.Set("X-Tangled-Repo", repoFullName) 165 166 if webhook.Secret != "" { 167 signature := w.computeSignature(payloadBytes, webhook.Secret) 168 req.Header.Set("X-Tangled-Signature-256", "sha256="+signature) 169 } 170 171 delivery := &models.WebhookDelivery{ 172 WebhookId: webhook.Id, 173 Event: event, 174 DeliveryId: deliveryId, 175 Url: webhook.Url, 176 RequestBody: string(payloadBytes), 177 } 178 179 retryOpts := []retry.Option{ 180 retry.Attempts(3), 181 retry.Delay(1 * time.Second), 182 retry.MaxDelay(10 * time.Second), 183 retry.DelayType(retry.BackOffDelay), 184 retry.LastErrorOnly(true), 185 retry.OnRetry(func(n uint, err error) { 186 w.logger.Info("retrying webhook delivery", 187 "webhook_id", webhook.Id, 188 "attempt", n+1, 189 "err", err) 190 }), 191 retry.Context(ctx), 192 retry.RetryIf(func(err error) bool { 193 return err != nil 194 }), 195 } 196 197 var resp *http.Response 198 err = retry.Do(func() error { 199 var err error 200 resp, err = w.client.Do(req) 201 if err != nil { 202 return err 203 } 204 if resp.StatusCode >= 500 { 205 defer resp.Body.Close() 206 return fmt.Errorf("server error: %d", resp.StatusCode) 207 } 208 return nil 209 }, retryOpts...) 210 211 if err != nil { 212 w.logger.Error("webhook request failed after retries", "webhook_id", webhook.Id, "err", err) 213 delivery.Success = false 214 delivery.ResponseBody = err.Error() 215 } else { 216 defer resp.Body.Close() 217 218 delivery.ResponseCode = resp.StatusCode 219 delivery.Success = resp.StatusCode >= 200 && resp.StatusCode < 300 220 221 bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024)) 222 if err != nil { 223 w.logger.Warn("failed to read webhook response body", "webhook_id", webhook.Id, "err", err) 224 } else { 225 delivery.ResponseBody = string(bodyBytes) 226 } 227 228 if !delivery.Success { 229 w.logger.Warn("webhook delivery failed", 230 "webhook_id", webhook.Id, 231 "status", resp.StatusCode, 232 "url", webhook.Url) 233 } else { 234 w.logger.Info("webhook delivered successfully", 235 "webhook_id", webhook.Id, 236 "url", webhook.Url, 237 "delivery_id", deliveryId) 238 } 239 } 240 241 if err := db.AddWebhookDelivery(w.db, delivery); err != nil { 242 w.logger.Error("failed to record webhook delivery", "webhook_id", webhook.Id, "err", err) 243 } 244} 245 246func (w *Notifier) computeSignature(payload []byte, secret string) string { 247 mac := hmac.New(sha256.New, []byte(secret)) 248 mac.Write(payload) 249 return hex.EncodeToString(mac.Sum(nil)) 250}