Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20 assigneeLabelAt = "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/assignee"
21)
22
23type databaseNotifier struct {
24 db *db.DB
25 res *idresolver.Resolver
26}
27
28func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
29 return &databaseNotifier{
30 db: database,
31 res: resolver,
32 }
33}
34
35var _ notify.Notifier = &databaseNotifier{}
36
37func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
38 // no-op for now
39}
40func (n *databaseNotifier) DeleteRepo(ctx context.Context, repo *models.Repo) {
41 // no-op for now
42}
43
44func (n *databaseNotifier) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) {
45}
46
47func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
48 l := log.FromContext(ctx)
49
50 if star.SubjectType != models.StarSubjectRepo {
51 return
52 }
53
54 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", star.Subject))
55 if err != nil {
56 l.Error("failed to get repos", "err", err)
57 return
58 }
59
60 actorDid := syntax.DID(star.Did)
61 recipients := sets.Singleton(syntax.DID(repo.Did))
62 eventType := models.NotificationTypeRepoStarred
63 entityType := "repo"
64 entityId := star.Subject
65 repoId := &repo.Id
66 var issueId *int64
67 var pullId *int64
68
69 n.notifyEvent(
70 ctx,
71 actorDid,
72 recipients,
73 eventType,
74 entityType,
75 entityId,
76 repoId,
77 issueId,
78 pullId,
79 )
80}
81
82func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
83 // no-op
84}
85
86func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) {
87 l := log.FromContext(ctx)
88
89 var (
90 // built the recipients list:
91 // - the owner of the repo
92 // - | if the comment is a reply -> everybody on that thread
93 // | if the comment is a top level -> just the issue owner
94 // - remove mentioned users from the recipients list
95 recipients = sets.New[syntax.DID]()
96 entityType string
97 entityId string
98 repoId *int64
99 issueId *int64
100 pullId *int64
101 )
102
103 subjectAt := syntax.ATURI(comment.Subject.Uri)
104
105 switch subjectAt.Collection() {
106 case tangled.RepoIssueNSID:
107 issues, err := db.GetIssues(
108 n.db,
109 orm.FilterEq("at_uri", subjectAt),
110 )
111 if err != nil {
112 l.Error("failed to get issues", "err", err)
113 return
114 }
115 if len(issues) == 0 {
116 l.Error("no issue found", "subject", comment.Subject)
117 return
118 }
119 issue := issues[0]
120
121 recipients.Insert(syntax.DID(issue.Repo.Did))
122 if comment.IsReply() {
123 // if this comment is a reply, then notify everybody in that thread
124 parent := *comment.ReplyTo
125
126 // find the parent thread, and add all DIDs from here to the recipient list
127 for _, t := range models.NewCommentList(issue.Comments) {
128 if t.Self.AtUri() == syntax.ATURI(parent.Uri) {
129 for _, p := range t.Participants() {
130 recipients.Insert(p)
131 }
132 }
133 }
134 } else {
135 // not a reply, notify just the issue author
136 recipients.Insert(syntax.DID(issue.Did))
137 }
138
139 entityType = "issue"
140 entityId = issue.AtUri().String()
141 repoId = &issue.Repo.Id
142 issueId = &issue.Id
143
144 for _, m := range mentions {
145 recipients.Remove(m)
146 }
147
148 n.notifyEvent(
149 ctx,
150 comment.Did,
151 recipients,
152 models.NotificationTypeIssueCommented,
153 entityType,
154 entityId,
155 repoId,
156 issueId,
157 pullId,
158 )
159
160 case tangled.RepoPullNSID:
161 pull, err := db.GetPull(
162 n.db,
163 orm.FilterEq("owner_did", subjectAt.Authority()),
164 orm.FilterEq("rkey", subjectAt.RecordKey()),
165 )
166 if err != nil {
167 l.Error("NewComment: failed to get pull", "err", err)
168 return
169 }
170
171 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("repo_did", pull.RepoDid))
172 if err != nil {
173 l.Error("NewComment: failed to get repo", "err", err)
174 return
175 }
176
177 recipients.Insert(syntax.DID(pull.Repo.Did))
178 for _, p := range pull.Participants() {
179 recipients.Insert(syntax.DID(p))
180 }
181
182 entityType = "pull"
183 entityId = pull.AtUri().String()
184 repoId = &pull.Repo.Id
185 p := int64(pull.ID)
186 pullId = &p
187
188 for _, m := range mentions {
189 recipients.Remove(m)
190 }
191
192 n.notifyEvent(
193 ctx,
194 comment.Did,
195 recipients,
196 models.NotificationTypePullCommented,
197 entityType,
198 entityId,
199 repoId,
200 issueId,
201 pullId,
202 )
203 default:
204 return // no-op
205 }
206
207 n.notifyEvent(
208 ctx,
209 comment.Did,
210 sets.Collect(slices.Values(mentions)),
211 models.NotificationTypeUserMentioned,
212 entityType,
213 entityId,
214 repoId,
215 issueId,
216 pullId,
217 )
218}
219
220func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) {
221 // no-op
222}
223
224func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
225 l := log.FromContext(ctx)
226
227 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid)))
228 if err != nil {
229 l.Error("failed to fetch collaborators", "err", err)
230 return
231 }
232
233 // build the recipients list
234 // - owner of the repo
235 // - collaborators in the repo
236 // - remove users already mentioned
237 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
238 for _, c := range collaborators {
239 recipients.Insert(c.SubjectDid)
240 }
241 for _, m := range mentions {
242 recipients.Remove(m)
243 }
244
245 actorDid := syntax.DID(issue.Did)
246 entityType := "issue"
247 entityId := issue.AtUri().String()
248 repoId := &issue.Repo.Id
249 issueId := &issue.Id
250 var pullId *int64
251
252 n.notifyEvent(
253 ctx,
254 actorDid,
255 recipients,
256 models.NotificationTypeIssueCreated,
257 entityType,
258 entityId,
259 repoId,
260 issueId,
261 pullId,
262 )
263 n.notifyEvent(
264 ctx,
265 actorDid,
266 sets.Collect(slices.Values(mentions)),
267 models.NotificationTypeUserMentioned,
268 entityType,
269 entityId,
270 repoId,
271 issueId,
272 pullId,
273 )
274}
275
276func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
277 // no-op for now
278}
279
280func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, actor syntax.DID, issue *models.Issue, ops []models.LabelOp) {
281 entityType := "issue"
282 entityId := issue.AtUri().String()
283 repoId := &issue.Repo.Id
284 issueId := &issue.Id
285 var pullId *int64
286
287 assigned := sets.New[syntax.DID]()
288 unassigned := sets.New[syntax.DID]()
289 for _, op := range ops {
290 if op.OperandKey != assigneeLabelAt {
291 continue
292 }
293 assignee := syntax.DID(op.OperandValue)
294 switch op.Operation {
295 case models.LabelOperationAdd:
296 assigned.Insert(assignee)
297 case models.LabelOperationDel:
298 unassigned.Insert(assignee)
299 default:
300 continue
301 }
302 }
303
304 n.notifyEvent(ctx, actor, assigned, models.NotificationTypeIssueAssigned, entityType, entityId, repoId, issueId, pullId)
305 n.notifyEvent(ctx, actor, unassigned, models.NotificationTypeIssueUnassigned, entityType, entityId, repoId, issueId, pullId)
306}
307
308func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, actor syntax.DID, pull *models.Pull, ops []models.LabelOp) {
309 entityType := "pull"
310 entityId := pull.AtUri().String()
311 repoId := &pull.Repo.Id
312 var issueId *int64
313 p := int64(pull.ID)
314 pullId := &p
315
316 assigned := sets.New[syntax.DID]()
317 unassigned := sets.New[syntax.DID]()
318 for _, op := range ops {
319 if op.OperandKey != assigneeLabelAt {
320 continue
321 }
322 assignee := syntax.DID(op.OperandValue)
323 switch op.Operation {
324 case models.LabelOperationAdd:
325 assigned.Insert(assignee)
326 case models.LabelOperationDel:
327 unassigned.Insert(assignee)
328 default:
329 continue
330 }
331 }
332
333 n.notifyEvent(ctx, actor, assigned, models.NotificationTypePullAssigned, entityType, entityId, repoId, issueId, pullId)
334 n.notifyEvent(ctx, actor, unassigned, models.NotificationTypePullUnassigned, entityType, entityId, repoId, issueId, pullId)
335}
336
337func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
338 actorDid := syntax.DID(follow.UserDid)
339 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
340 eventType := models.NotificationTypeFollowed
341 entityType := "follow"
342 entityId := follow.UserDid
343 var repoId, issueId, pullId *int64
344
345 n.notifyEvent(
346 ctx,
347 actorDid,
348 recipients,
349 eventType,
350 entityType,
351 entityId,
352 repoId,
353 issueId,
354 pullId,
355 )
356}
357
358func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
359 // no-op
360}
361
362func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
363 l := log.FromContext(ctx)
364
365 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
366 if err != nil {
367 l.Error("failed to get repos", "err", err)
368 return
369 }
370 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
371 if err != nil {
372 l.Error("failed to fetch collaborators", "err", err)
373 return
374 }
375
376 // build the recipients list
377 // - owner of the repo
378 // - collaborators in the repo
379 recipients := sets.Singleton(syntax.DID(repo.Did))
380 for _, c := range collaborators {
381 recipients.Insert(c.SubjectDid)
382 }
383
384 actorDid := syntax.DID(pull.OwnerDid)
385 eventType := models.NotificationTypePullCreated
386 entityType := "pull"
387 entityId := pull.AtUri().String()
388 repoId := &repo.Id
389 var issueId *int64
390 p := int64(pull.ID)
391 pullId := &p
392
393 n.notifyEvent(
394 ctx,
395 actorDid,
396 recipients,
397 eventType,
398 entityType,
399 entityId,
400 repoId,
401 issueId,
402 pullId,
403 )
404}
405
406func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
407 // no-op
408}
409
410func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
411 // no-op
412}
413
414func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
415 // no-op
416}
417
418func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
419 // no-op
420}
421
422func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
423 // no-op for now; webhooks are handled by the webhook notifier
424}
425
426func (n *databaseNotifier) Clone(ctx context.Context, repo *models.Repo) {
427 // no-op
428}
429
430func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
431 l := log.FromContext(ctx)
432
433 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid)))
434 if err != nil {
435 l.Error("failed to fetch collaborators", "err", err)
436 return
437 }
438
439 // build up the recipients list:
440 // - repo owner
441 // - repo collaborators
442 // - all issue participants
443 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
444 for _, c := range collaborators {
445 recipients.Insert(c.SubjectDid)
446 }
447 for _, p := range issue.Participants() {
448 recipients.Insert(syntax.DID(p))
449 }
450
451 entityType := "issue"
452 entityId := issue.AtUri().String()
453 repoId := &issue.Repo.Id
454 issueId := &issue.Id
455 var pullId *int64
456 var eventType models.NotificationType
457
458 if issue.Open {
459 eventType = models.NotificationTypeIssueReopen
460 } else {
461 eventType = models.NotificationTypeIssueClosed
462 }
463
464 n.notifyEvent(
465 ctx,
466 actor,
467 recipients,
468 eventType,
469 entityType,
470 entityId,
471 repoId,
472 issueId,
473 pullId,
474 )
475}
476
477func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
478 l := log.FromContext(ctx)
479
480 // Get repo details
481 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
482 if err != nil {
483 l.Error("failed to get repos", "err", err)
484 return
485 }
486
487 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid)))
488 if err != nil {
489 l.Error("failed to fetch collaborators", "err", err)
490 return
491 }
492
493 // build up the recipients list:
494 // - repo owner
495 // - all pull participants
496 recipients := sets.Singleton(syntax.DID(repo.Did))
497 for _, c := range collaborators {
498 recipients.Insert(c.SubjectDid)
499 }
500 for _, p := range pull.Participants() {
501 recipients.Insert(p)
502 }
503
504 entityType := "pull"
505 entityId := pull.AtUri().String()
506 repoId := &repo.Id
507 var issueId *int64
508 var eventType models.NotificationType
509 switch pull.State {
510 case models.PullClosed:
511 eventType = models.NotificationTypePullClosed
512 case models.PullOpen:
513 eventType = models.NotificationTypePullReopen
514 case models.PullMerged:
515 eventType = models.NotificationTypePullMerged
516 default:
517 l.Error("unexpected new PR state", "state", pull.State)
518 return
519 }
520 p := int64(pull.ID)
521 pullId := &p
522
523 n.notifyEvent(
524 ctx,
525 actor,
526 recipients,
527 eventType,
528 entityType,
529 entityId,
530 repoId,
531 issueId,
532 pullId,
533 )
534}
535
536func (n *databaseNotifier) notifyEvent(
537 ctx context.Context,
538 actorDid syntax.DID,
539 recipients sets.Set[syntax.DID],
540 eventType models.NotificationType,
541 entityType string,
542 entityId string,
543 repoId *int64,
544 issueId *int64,
545 pullId *int64,
546) {
547 l := log.FromContext(ctx)
548
549 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
550 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
551 return
552 }
553
554 recipients.Remove(actorDid)
555
556 prefMap, err := db.GetNotificationPreferences(
557 n.db,
558 orm.FilterIn("user_did", slices.Collect(recipients.All())),
559 )
560 if err != nil {
561 // failed to get prefs for users
562 return
563 }
564
565 // create a transaction for bulk notification storage
566 tx, err := n.db.Begin()
567 if err != nil {
568 // failed to start tx
569 return
570 }
571 defer tx.Rollback()
572
573 // filter based on preferences
574 for recipientDid := range recipients.All() {
575 prefs, ok := prefMap[recipientDid]
576 if !ok {
577 prefs = models.DefaultNotificationPreferences(recipientDid)
578 }
579
580 // skip users who don’t want this type
581 if !prefs.ShouldNotify(eventType) {
582 continue
583 }
584
585 // create notification
586 notif := &models.Notification{
587 RecipientDid: recipientDid.String(),
588 ActorDid: actorDid.String(),
589 Type: eventType,
590 EntityType: entityType,
591 EntityId: entityId,
592 RepoId: repoId,
593 IssueId: issueId,
594 PullId: pullId,
595 }
596
597 if err := db.CreateNotification(tx, notif); err != nil {
598 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
599 }
600 }
601
602 if err := tx.Commit(); err != nil {
603 // failed to commit
604 return
605 }
606}