Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/appview/models"
13 "tangled.org/core/appview/pagination"
14 "tangled.org/core/orm"
15)
16
17func CreateNotification(e Execer, notification *models.Notification) error {
18 query := `
19 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id)
20 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
21 `
22
23 result, err := e.Exec(query,
24 notification.RecipientDid,
25 notification.ActorDid,
26 string(notification.Type),
27 notification.EntityType,
28 notification.EntityId,
29 notification.Read,
30 notification.RepoId,
31 notification.IssueId,
32 notification.PullId,
33 )
34 if err != nil {
35 return fmt.Errorf("failed to create notification: %w", err)
36 }
37
38 id, err := result.LastInsertId()
39 if err != nil {
40 return fmt.Errorf("failed to get notification ID: %w", err)
41 }
42
43 notification.ID = id
44 return nil
45}
46
47// GetNotificationsPaginated retrieves notifications with filters and pagination
48func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]*models.Notification, error) {
49 var conditions []string
50 var args []any
51
52 for _, filter := range filters {
53 conditions = append(conditions, filter.Condition())
54 args = append(args, filter.Arg()...)
55 }
56
57 whereClause := ""
58 if len(conditions) > 0 {
59 whereClause = "WHERE " + conditions[0]
60 for _, condition := range conditions[1:] {
61 whereClause += " AND " + condition
62 }
63 }
64 pageClause := ""
65 if page.Limit > 0 {
66 pageClause = " limit ? offset ? "
67 args = append(args, page.Limit, page.Offset)
68 }
69
70 query := fmt.Sprintf(`
71 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id
72 from notifications
73 %s
74 order by created desc
75 %s
76 `, whereClause, pageClause)
77
78 rows, err := e.QueryContext(context.Background(), query, args...)
79 if err != nil {
80 return nil, fmt.Errorf("failed to query notifications: %w", err)
81 }
82 defer rows.Close()
83
84 var notifications []*models.Notification
85 for rows.Next() {
86 var n models.Notification
87 var typeStr string
88 var createdStr string
89 err := rows.Scan(
90 &n.ID,
91 &n.RecipientDid,
92 &n.ActorDid,
93 &typeStr,
94 &n.EntityType,
95 &n.EntityId,
96 &n.Read,
97 &createdStr,
98 &n.RepoId,
99 &n.IssueId,
100 &n.PullId,
101 )
102 if err != nil {
103 return nil, fmt.Errorf("failed to scan notification: %w", err)
104 }
105 n.Type = models.NotificationType(typeStr)
106 n.Created, err = time.Parse(time.RFC3339, createdStr)
107 if err != nil {
108 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
109 }
110 notifications = append(notifications, &n)
111 }
112
113 return notifications, nil
114}
115
116// GetNotificationsWithEntities retrieves notifications with their related entities
117func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...orm.Filter) ([]*models.NotificationWithEntity, error) {
118 var conditions []string
119 var args []any
120
121 for _, filter := range filters {
122 conditions = append(conditions, filter.Condition())
123 args = append(args, filter.Arg()...)
124 }
125
126 whereClause := ""
127 if len(conditions) > 0 {
128 whereClause = "WHERE " + conditions[0]
129 for _, condition := range conditions[1:] {
130 whereClause += " AND " + condition
131 }
132 }
133
134 query := fmt.Sprintf(`
135 select
136 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id,
137 n.read, n.created, n.repo_id, n.issue_id, n.pull_id,
138 r.id as r_id, r.did as r_did, r.rkey as r_rkey, r.name as r_name, r.description as r_description, r.website as r_website, r.topics as r_topics,
139 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open,
140 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state
141 from notifications n
142 left join repos r on n.repo_id = r.id
143 left join issues i on n.issue_id = i.id
144 left join pulls p on n.pull_id = p.id
145 %s
146 order by n.created desc
147 limit ? offset ?
148 `, whereClause)
149
150 args = append(args, page.Limit, page.Offset)
151
152 rows, err := e.QueryContext(context.Background(), query, args...)
153 if err != nil {
154 return nil, fmt.Errorf("failed to query notifications with entities: %w", err)
155 }
156 defer rows.Close()
157
158 var notifications []*models.NotificationWithEntity
159 for rows.Next() {
160 var n models.Notification
161 var typeStr string
162 var createdStr string
163 var repo models.Repo
164 var issue models.Issue
165 var pull models.Pull
166 var rId, iId, pId sql.NullInt64
167 var rDid, rRkey, rName, rDescription, rWebsite, rTopicStr sql.NullString
168 var iDid sql.NullString
169 var iIssueId sql.NullInt64
170 var iTitle sql.NullString
171 var iOpen sql.NullBool
172 var pOwnerDid sql.NullString
173 var pPullId sql.NullInt64
174 var pTitle sql.NullString
175 var pState sql.NullInt64
176
177 err := rows.Scan(
178 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId,
179 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId,
180 &rId, &rDid, &rRkey, &rName, &rDescription, &rWebsite, &rTopicStr,
181 &iId, &iDid, &iIssueId, &iTitle, &iOpen,
182 &pId, &pOwnerDid, &pPullId, &pTitle, &pState,
183 )
184 if err != nil {
185 return nil, fmt.Errorf("failed to scan notification with entities: %w", err)
186 }
187
188 n.Type = models.NotificationType(typeStr)
189 n.Created, err = time.Parse(time.RFC3339, createdStr)
190 if err != nil {
191 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
192 }
193
194 entry := &models.NotificationWithEntity{Notification: &n}
195
196 // populate repo if present
197 if rId.Valid {
198 repo.Id = rId.Int64
199 if rDid.Valid {
200 repo.Did = rDid.String
201 }
202 if rRkey.Valid {
203 repo.Rkey = rRkey.String
204 }
205 if rName.Valid {
206 repo.Name = rName.String
207 }
208 if rDescription.Valid {
209 repo.Description = rDescription.String
210 }
211 if rWebsite.Valid {
212 repo.Website = rWebsite.String
213 }
214 if rTopicStr.Valid {
215 repo.Topics = strings.Fields(rTopicStr.String)
216 }
217 entry.Repo = &repo
218 }
219
220 // populate issue if present
221 if iId.Valid {
222 issue.Id = iId.Int64
223 if iDid.Valid {
224 issue.Did = iDid.String
225 }
226 if iIssueId.Valid {
227 issue.IssueId = int(iIssueId.Int64)
228 }
229 if iTitle.Valid {
230 issue.Title = iTitle.String
231 }
232 if iOpen.Valid {
233 issue.Open = iOpen.Bool
234 }
235 entry.Issue = &issue
236 }
237
238 // populate pull if present
239 if pId.Valid {
240 pull.ID = int(pId.Int64)
241 if pOwnerDid.Valid {
242 pull.OwnerDid = pOwnerDid.String
243 }
244 if pPullId.Valid {
245 pull.PullId = int(pPullId.Int64)
246 }
247 if pTitle.Valid {
248 pull.Title = pTitle.String
249 }
250 if pState.Valid {
251 pull.State = models.PullState(pState.Int64)
252 }
253 entry.Pull = &pull
254 }
255
256 notifications = append(notifications, entry)
257 }
258
259 return notifications, nil
260}
261
262// GetNotifications retrieves notifications with filters
263func GetNotifications(e Execer, filters ...orm.Filter) ([]*models.Notification, error) {
264 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...)
265}
266
267func CountNotifications(e Execer, filters ...orm.Filter) (int64, error) {
268 var conditions []string
269 var args []any
270 for _, filter := range filters {
271 conditions = append(conditions, filter.Condition())
272 args = append(args, filter.Arg()...)
273 }
274
275 whereClause := ""
276 if conditions != nil {
277 whereClause = " where " + strings.Join(conditions, " and ")
278 }
279
280 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause)
281 var count int64
282 err := e.QueryRow(query, args...).Scan(&count)
283
284 if !errors.Is(err, sql.ErrNoRows) && err != nil {
285 return 0, err
286 }
287
288 return count, nil
289}
290
291func MarkNotificationRead(e Execer, notificationID int64, userDID string) error {
292 idFilter := orm.FilterEq("id", notificationID)
293 recipientFilter := orm.FilterEq("recipient_did", userDID)
294
295 query := fmt.Sprintf(`
296 UPDATE notifications
297 SET read = 1
298 WHERE %s AND %s
299 `, idFilter.Condition(), recipientFilter.Condition())
300
301 args := append(idFilter.Arg(), recipientFilter.Arg()...)
302
303 result, err := e.Exec(query, args...)
304 if err != nil {
305 return fmt.Errorf("failed to mark notification as read: %w", err)
306 }
307
308 rowsAffected, err := result.RowsAffected()
309 if err != nil {
310 return fmt.Errorf("failed to get rows affected: %w", err)
311 }
312
313 if rowsAffected == 0 {
314 return fmt.Errorf("notification not found or access denied")
315 }
316
317 return nil
318}
319
320func MarkAllNotificationsRead(e Execer, userDID string) error {
321 recipientFilter := orm.FilterEq("recipient_did", userDID)
322 readFilter := orm.FilterEq("read", 0)
323
324 query := fmt.Sprintf(`
325 UPDATE notifications
326 SET read = 1
327 WHERE %s AND %s
328 `, recipientFilter.Condition(), readFilter.Condition())
329
330 args := append(recipientFilter.Arg(), readFilter.Arg()...)
331
332 _, err := e.Exec(query, args...)
333 if err != nil {
334 return fmt.Errorf("failed to mark all notifications as read: %w", err)
335 }
336
337 return nil
338}
339
340func DeleteNotification(e Execer, notificationID int64, userDID string) error {
341 idFilter := orm.FilterEq("id", notificationID)
342 recipientFilter := orm.FilterEq("recipient_did", userDID)
343
344 query := fmt.Sprintf(`
345 DELETE FROM notifications
346 WHERE %s AND %s
347 `, idFilter.Condition(), recipientFilter.Condition())
348
349 args := append(idFilter.Arg(), recipientFilter.Arg()...)
350
351 result, err := e.Exec(query, args...)
352 if err != nil {
353 return fmt.Errorf("failed to delete notification: %w", err)
354 }
355
356 rowsAffected, err := result.RowsAffected()
357 if err != nil {
358 return fmt.Errorf("failed to get rows affected: %w", err)
359 }
360
361 if rowsAffected == 0 {
362 return fmt.Errorf("notification not found or access denied")
363 }
364
365 return nil
366}
367
368func GetNotificationPreference(e Execer, userDid string) (*models.NotificationPreferences, error) {
369 prefs, err := GetNotificationPreferences(e, orm.FilterEq("user_did", userDid))
370 if err != nil {
371 return nil, err
372 }
373
374 p, ok := prefs[syntax.DID(userDid)]
375 if !ok {
376 return models.DefaultNotificationPreferences(syntax.DID(userDid)), nil
377 }
378
379 return p, nil
380}
381
382func GetNotificationPreferences(e Execer, filters ...orm.Filter) (map[syntax.DID]*models.NotificationPreferences, error) {
383 prefsMap := make(map[syntax.DID]*models.NotificationPreferences)
384
385 var conditions []string
386 var args []any
387 for _, filter := range filters {
388 conditions = append(conditions, filter.Condition())
389 args = append(args, filter.Arg()...)
390 }
391
392 whereClause := ""
393 if conditions != nil {
394 whereClause = " where " + strings.Join(conditions, " and ")
395 }
396
397 query := fmt.Sprintf(`
398 select
399 id,
400 user_did,
401 repo_starred,
402 issue_created,
403 issue_commented,
404 pull_created,
405 pull_commented,
406 followed,
407 user_mentioned,
408 pull_merged,
409 issue_closed,
410 email_notifications
411 from
412 notification_preferences
413 %s
414 `, whereClause)
415
416 rows, err := e.Query(query, args...)
417 if err != nil {
418 return nil, err
419 }
420 defer rows.Close()
421
422 for rows.Next() {
423 var prefs models.NotificationPreferences
424 if err := rows.Scan(
425 &prefs.ID,
426 &prefs.UserDid,
427 &prefs.RepoStarred,
428 &prefs.IssueCreated,
429 &prefs.IssueCommented,
430 &prefs.PullCreated,
431 &prefs.PullCommented,
432 &prefs.Followed,
433 &prefs.UserMentioned,
434 &prefs.PullMerged,
435 &prefs.IssueClosed,
436 &prefs.EmailNotifications,
437 ); err != nil {
438 return nil, err
439 }
440
441 prefsMap[prefs.UserDid] = &prefs
442 }
443
444 if err := rows.Err(); err != nil {
445 return nil, err
446 }
447
448 return prefsMap, nil
449}
450
451func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error {
452 query := `
453 INSERT OR REPLACE INTO notification_preferences
454 (user_did, repo_starred, issue_created, issue_commented, pull_created,
455 pull_commented, followed, user_mentioned, pull_merged, issue_closed,
456 email_notifications)
457 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
458 `
459
460 result, err := d.DB.ExecContext(ctx, query,
461 prefs.UserDid,
462 prefs.RepoStarred,
463 prefs.IssueCreated,
464 prefs.IssueCommented,
465 prefs.PullCreated,
466 prefs.PullCommented,
467 prefs.Followed,
468 prefs.UserMentioned,
469 prefs.PullMerged,
470 prefs.IssueClosed,
471 prefs.EmailNotifications,
472 )
473 if err != nil {
474 return fmt.Errorf("failed to update notification preferences: %w", err)
475 }
476
477 if prefs.ID == 0 {
478 id, err := result.LastInsertId()
479 if err != nil {
480 return fmt.Errorf("failed to get preferences ID: %w", err)
481 }
482 prefs.ID = id
483 }
484
485 return nil
486}
487
488func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error {
489 cutoff := time.Now().Add(-olderThan)
490 createdFilter := orm.FilterLte("created", cutoff)
491
492 query := fmt.Sprintf(`
493 DELETE FROM notifications
494 WHERE %s
495 `, createdFilter.Condition())
496
497 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...)
498 if err != nil {
499 return fmt.Errorf("failed to cleanup old notifications: %w", err)
500 }
501
502 return nil
503}