Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39func (n *databaseNotifier) DeleteRepo(ctx context.Context, repo *models.Repo) {
40 // no-op for now
41}
42
43func (n *databaseNotifier) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) {
44}
45
46func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
47 l := log.FromContext(ctx)
48
49 if star.SubjectType != models.StarSubjectRepo {
50 return
51 }
52
53 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", star.Subject))
54 if err != nil {
55 l.Error("failed to get repos", "err", err)
56 return
57 }
58
59 actorDid := syntax.DID(star.Did)
60 recipients := sets.Singleton(syntax.DID(repo.Did))
61 eventType := models.NotificationTypeRepoStarred
62 entityType := "repo"
63 entityId := star.Subject
64 repoId := &repo.Id
65 var issueId *int64
66 var pullId *int64
67
68 n.notifyEvent(
69 ctx,
70 actorDid,
71 recipients,
72 eventType,
73 entityType,
74 entityId,
75 repoId,
76 issueId,
77 pullId,
78 )
79}
80
81func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
82 // no-op
83}
84
85func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) {
86 l := log.FromContext(ctx)
87
88 var (
89 // built the recipients list:
90 // - the owner of the repo
91 // - | if the comment is a reply -> everybody on that thread
92 // | if the comment is a top level -> just the issue owner
93 // - remove mentioned users from the recipients list
94 recipients = sets.New[syntax.DID]()
95 entityType string
96 entityId string
97 repoId *int64
98 issueId *int64
99 pullId *int64
100 )
101
102 subjectAt := syntax.ATURI(comment.Subject.Uri)
103
104 switch subjectAt.Collection() {
105 case tangled.RepoIssueNSID:
106 issues, err := db.GetIssues(
107 n.db,
108 orm.FilterEq("at_uri", subjectAt),
109 )
110 if err != nil {
111 l.Error("failed to get issues", "err", err)
112 return
113 }
114 if len(issues) == 0 {
115 l.Error("no issue found", "subject", comment.Subject)
116 return
117 }
118 issue := issues[0]
119
120 recipients.Insert(syntax.DID(issue.Repo.Did))
121 if comment.IsReply() {
122 // if this comment is a reply, then notify everybody in that thread
123 parent := *comment.ReplyTo
124
125 // find the parent thread, and add all DIDs from here to the recipient list
126 for _, t := range models.NewCommentList(issue.Comments) {
127 if t.Self.AtUri() == syntax.ATURI(parent.Uri) {
128 for _, p := range t.Participants() {
129 recipients.Insert(p)
130 }
131 }
132 }
133 } else {
134 // not a reply, notify just the issue author
135 recipients.Insert(syntax.DID(issue.Did))
136 }
137
138 entityType = "issue"
139 entityId = issue.AtUri().String()
140 repoId = &issue.Repo.Id
141 issueId = &issue.Id
142
143 for _, m := range mentions {
144 recipients.Remove(m)
145 }
146
147 n.notifyEvent(
148 ctx,
149 comment.Did,
150 recipients,
151 models.NotificationTypeIssueCommented,
152 entityType,
153 entityId,
154 repoId,
155 issueId,
156 pullId,
157 )
158
159 case tangled.RepoPullNSID:
160 pull, err := db.GetPull(
161 n.db,
162 orm.FilterEq("owner_did", subjectAt.Authority()),
163 orm.FilterEq("rkey", subjectAt.RecordKey()),
164 )
165 if err != nil {
166 l.Error("NewComment: failed to get pull", "err", err)
167 return
168 }
169
170 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("repo_did", pull.RepoDid))
171 if err != nil {
172 l.Error("NewComment: failed to get repo", "err", err)
173 return
174 }
175
176 recipients.Insert(syntax.DID(pull.Repo.Did))
177 for _, p := range pull.Participants() {
178 recipients.Insert(syntax.DID(p))
179 }
180
181 entityType = "pull"
182 entityId = pull.AtUri().String()
183 repoId = &pull.Repo.Id
184 p := int64(pull.ID)
185 pullId = &p
186
187 for _, m := range mentions {
188 recipients.Remove(m)
189 }
190
191 n.notifyEvent(
192 ctx,
193 comment.Did,
194 recipients,
195 models.NotificationTypePullCommented,
196 entityType,
197 entityId,
198 repoId,
199 issueId,
200 pullId,
201 )
202 default:
203 return // no-op
204 }
205
206 n.notifyEvent(
207 ctx,
208 comment.Did,
209 sets.Collect(slices.Values(mentions)),
210 models.NotificationTypeUserMentioned,
211 entityType,
212 entityId,
213 repoId,
214 issueId,
215 pullId,
216 )
217}
218
219func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) {
220 // no-op
221}
222
223func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
224 l := log.FromContext(ctx)
225
226 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid)))
227 if err != nil {
228 l.Error("failed to fetch collaborators", "err", err)
229 return
230 }
231
232 // build the recipients list
233 // - owner of the repo
234 // - collaborators in the repo
235 // - remove users already mentioned
236 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
237 for _, c := range collaborators {
238 recipients.Insert(c.SubjectDid)
239 }
240 for _, m := range mentions {
241 recipients.Remove(m)
242 }
243
244 actorDid := syntax.DID(issue.Did)
245 entityType := "issue"
246 entityId := issue.AtUri().String()
247 repoId := &issue.Repo.Id
248 issueId := &issue.Id
249 var pullId *int64
250
251 n.notifyEvent(
252 ctx,
253 actorDid,
254 recipients,
255 models.NotificationTypeIssueCreated,
256 entityType,
257 entityId,
258 repoId,
259 issueId,
260 pullId,
261 )
262 n.notifyEvent(
263 ctx,
264 actorDid,
265 sets.Collect(slices.Values(mentions)),
266 models.NotificationTypeUserMentioned,
267 entityType,
268 entityId,
269 repoId,
270 issueId,
271 pullId,
272 )
273}
274
275func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
276 // no-op for now
277}
278
279func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {}
280func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {}
281
282func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
283 actorDid := syntax.DID(follow.UserDid)
284 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
285 eventType := models.NotificationTypeFollowed
286 entityType := "follow"
287 entityId := follow.UserDid
288 var repoId, issueId, pullId *int64
289
290 n.notifyEvent(
291 ctx,
292 actorDid,
293 recipients,
294 eventType,
295 entityType,
296 entityId,
297 repoId,
298 issueId,
299 pullId,
300 )
301}
302
303func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
304 // no-op
305}
306
307func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
308 l := log.FromContext(ctx)
309
310 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
311 if err != nil {
312 l.Error("failed to get repos", "err", err)
313 return
314 }
315 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
316 if err != nil {
317 l.Error("failed to fetch collaborators", "err", err)
318 return
319 }
320
321 // build the recipients list
322 // - owner of the repo
323 // - collaborators in the repo
324 recipients := sets.Singleton(syntax.DID(repo.Did))
325 for _, c := range collaborators {
326 recipients.Insert(c.SubjectDid)
327 }
328
329 actorDid := syntax.DID(pull.OwnerDid)
330 eventType := models.NotificationTypePullCreated
331 entityType := "pull"
332 entityId := pull.AtUri().String()
333 repoId := &repo.Id
334 var issueId *int64
335 p := int64(pull.ID)
336 pullId := &p
337
338 n.notifyEvent(
339 ctx,
340 actorDid,
341 recipients,
342 eventType,
343 entityType,
344 entityId,
345 repoId,
346 issueId,
347 pullId,
348 )
349}
350
351func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
352 // no-op
353}
354
355func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
356 // no-op
357}
358
359func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
360 // no-op
361}
362
363func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
364 // no-op
365}
366
367func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
368 // no-op for now; webhooks are handled by the webhook notifier
369}
370
371func (n *databaseNotifier) Clone(ctx context.Context, repo *models.Repo) {
372 // no-op
373}
374
375func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
376 l := log.FromContext(ctx)
377
378 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid)))
379 if err != nil {
380 l.Error("failed to fetch collaborators", "err", err)
381 return
382 }
383
384 // build up the recipients list:
385 // - repo owner
386 // - repo collaborators
387 // - all issue participants
388 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
389 for _, c := range collaborators {
390 recipients.Insert(c.SubjectDid)
391 }
392 for _, p := range issue.Participants() {
393 recipients.Insert(syntax.DID(p))
394 }
395
396 entityType := "issue"
397 entityId := issue.AtUri().String()
398 repoId := &issue.Repo.Id
399 issueId := &issue.Id
400 var pullId *int64
401 var eventType models.NotificationType
402
403 if issue.Open {
404 eventType = models.NotificationTypeIssueReopen
405 } else {
406 eventType = models.NotificationTypeIssueClosed
407 }
408
409 n.notifyEvent(
410 ctx,
411 actor,
412 recipients,
413 eventType,
414 entityType,
415 entityId,
416 repoId,
417 issueId,
418 pullId,
419 )
420}
421
422func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
423 l := log.FromContext(ctx)
424
425 // Get repo details
426 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
427 if err != nil {
428 l.Error("failed to get repos", "err", err)
429 return
430 }
431
432 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
433 if err != nil {
434 l.Error("failed to fetch collaborators", "err", err)
435 return
436 }
437
438 // build up the recipients list:
439 // - repo owner
440 // - all pull participants
441 recipients := sets.Singleton(syntax.DID(repo.Did))
442 for _, c := range collaborators {
443 recipients.Insert(c.SubjectDid)
444 }
445 for _, p := range pull.Participants() {
446 recipients.Insert(p)
447 }
448
449 entityType := "pull"
450 entityId := pull.AtUri().String()
451 repoId := &repo.Id
452 var issueId *int64
453 var eventType models.NotificationType
454 switch pull.State {
455 case models.PullClosed:
456 eventType = models.NotificationTypePullClosed
457 case models.PullOpen:
458 eventType = models.NotificationTypePullReopen
459 case models.PullMerged:
460 eventType = models.NotificationTypePullMerged
461 default:
462 l.Error("unexpected new PR state", "state", pull.State)
463 return
464 }
465 p := int64(pull.ID)
466 pullId := &p
467
468 n.notifyEvent(
469 ctx,
470 actor,
471 recipients,
472 eventType,
473 entityType,
474 entityId,
475 repoId,
476 issueId,
477 pullId,
478 )
479}
480
481func (n *databaseNotifier) notifyEvent(
482 ctx context.Context,
483 actorDid syntax.DID,
484 recipients sets.Set[syntax.DID],
485 eventType models.NotificationType,
486 entityType string,
487 entityId string,
488 repoId *int64,
489 issueId *int64,
490 pullId *int64,
491) {
492 l := log.FromContext(ctx)
493
494 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
495 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
496 return
497 }
498
499 recipients.Remove(actorDid)
500
501 prefMap, err := db.GetNotificationPreferences(
502 n.db,
503 orm.FilterIn("user_did", slices.Collect(recipients.All())),
504 )
505 if err != nil {
506 // failed to get prefs for users
507 return
508 }
509
510 // create a transaction for bulk notification storage
511 tx, err := n.db.Begin()
512 if err != nil {
513 // failed to start tx
514 return
515 }
516 defer tx.Rollback()
517
518 // filter based on preferences
519 for recipientDid := range recipients.All() {
520 prefs, ok := prefMap[recipientDid]
521 if !ok {
522 prefs = models.DefaultNotificationPreferences(recipientDid)
523 }
524
525 // skip users who don’t want this type
526 if !prefs.ShouldNotify(eventType) {
527 continue
528 }
529
530 // create notification
531 notif := &models.Notification{
532 RecipientDid: recipientDid.String(),
533 ActorDid: actorDid.String(),
534 Type: eventType,
535 EntityType: entityType,
536 EntityId: entityId,
537 RepoId: repoId,
538 IssueId: issueId,
539 PullId: pullId,
540 }
541
542 if err := db.CreateNotification(tx, notif); err != nil {
543 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
544 }
545 }
546
547 if err := tx.Commit(); err != nil {
548 // failed to commit
549 return
550 }
551}