Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/appview/pagination" 14 "tangled.org/core/orm" 15) 16 17func CreateNotification(e Execer, notification *models.Notification) error { 18 query := ` 19 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id) 20 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 21 ` 22 23 result, err := e.Exec(query, 24 notification.RecipientDid, 25 notification.ActorDid, 26 string(notification.Type), 27 notification.EntityType, 28 notification.EntityId, 29 notification.Read, 30 notification.RepoId, 31 notification.IssueId, 32 notification.PullId, 33 ) 34 if err != nil { 35 return fmt.Errorf("failed to create notification: %w", err) 36 } 37 38 id, err := result.LastInsertId() 39 if err != nil { 40 return fmt.Errorf("failed to get notification ID: %w", err) 41 } 42 43 notification.ID = id 44 return nil 45} 46 47// GetNotificationsPaginated retrieves notifications with filters and pagination 48func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]*models.Notification, error) { 49 var conditions []string 50 var args []any 51 52 for _, filter := range filters { 53 conditions = append(conditions, filter.Condition()) 54 args = append(args, filter.Arg()...) 55 } 56 57 whereClause := "" 58 if len(conditions) > 0 { 59 whereClause = "WHERE " + conditions[0] 60 for _, condition := range conditions[1:] { 61 whereClause += " AND " + condition 62 } 63 } 64 pageClause := "" 65 if page.Limit > 0 { 66 pageClause = " limit ? offset ? " 67 args = append(args, page.Limit, page.Offset) 68 } 69 70 query := fmt.Sprintf(` 71 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id 72 from notifications 73 %s 74 order by created desc 75 %s 76 `, whereClause, pageClause) 77 78 rows, err := e.QueryContext(context.Background(), query, args...) 79 if err != nil { 80 return nil, fmt.Errorf("failed to query notifications: %w", err) 81 } 82 defer rows.Close() 83 84 var notifications []*models.Notification 85 for rows.Next() { 86 var n models.Notification 87 var typeStr string 88 var createdStr string 89 err := rows.Scan( 90 &n.ID, 91 &n.RecipientDid, 92 &n.ActorDid, 93 &typeStr, 94 &n.EntityType, 95 &n.EntityId, 96 &n.Read, 97 &createdStr, 98 &n.RepoId, 99 &n.IssueId, 100 &n.PullId, 101 ) 102 if err != nil { 103 return nil, fmt.Errorf("failed to scan notification: %w", err) 104 } 105 n.Type = models.NotificationType(typeStr) 106 n.Created, err = time.Parse(time.RFC3339, createdStr) 107 if err != nil { 108 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 109 } 110 notifications = append(notifications, &n) 111 } 112 113 return notifications, nil 114} 115 116// GetNotificationsWithEntities retrieves notifications with their related entities 117func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...orm.Filter) ([]*models.NotificationWithEntity, error) { 118 var conditions []string 119 var args []any 120 121 for _, filter := range filters { 122 conditions = append(conditions, filter.Condition()) 123 args = append(args, filter.Arg()...) 124 } 125 126 whereClause := "" 127 if len(conditions) > 0 { 128 whereClause = "WHERE " + conditions[0] 129 for _, condition := range conditions[1:] { 130 whereClause += " AND " + condition 131 } 132 } 133 134 query := fmt.Sprintf(` 135 select 136 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id, 137 n.read, n.created, n.repo_id, n.issue_id, n.pull_id, 138 r.id as r_id, r.did as r_did, r.rkey as r_rkey, r.name as r_name, r.description as r_description, r.website as r_website, r.topics as r_topics, 139 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open, 140 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state 141 from notifications n 142 left join repos r on n.repo_id = r.id 143 left join issues i on n.issue_id = i.id 144 left join pulls p on n.pull_id = p.id 145 %s 146 order by n.created desc 147 limit ? offset ? 148 `, whereClause) 149 150 args = append(args, page.Limit, page.Offset) 151 152 rows, err := e.QueryContext(context.Background(), query, args...) 153 if err != nil { 154 return nil, fmt.Errorf("failed to query notifications with entities: %w", err) 155 } 156 defer rows.Close() 157 158 var notifications []*models.NotificationWithEntity 159 for rows.Next() { 160 var n models.Notification 161 var typeStr string 162 var createdStr string 163 var repo models.Repo 164 var issue models.Issue 165 var pull models.Pull 166 var rId, iId, pId sql.NullInt64 167 var rDid, rRkey, rName, rDescription, rWebsite, rTopicStr sql.NullString 168 var iDid sql.NullString 169 var iIssueId sql.NullInt64 170 var iTitle sql.NullString 171 var iOpen sql.NullBool 172 var pOwnerDid sql.NullString 173 var pPullId sql.NullInt64 174 var pTitle sql.NullString 175 var pState sql.NullInt64 176 177 err := rows.Scan( 178 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId, 179 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId, 180 &rId, &rDid, &rRkey, &rName, &rDescription, &rWebsite, &rTopicStr, 181 &iId, &iDid, &iIssueId, &iTitle, &iOpen, 182 &pId, &pOwnerDid, &pPullId, &pTitle, &pState, 183 ) 184 if err != nil { 185 return nil, fmt.Errorf("failed to scan notification with entities: %w", err) 186 } 187 188 n.Type = models.NotificationType(typeStr) 189 n.Created, err = time.Parse(time.RFC3339, createdStr) 190 if err != nil { 191 return nil, fmt.Errorf("failed to parse created timestamp: %w", err) 192 } 193 194 entry := &models.NotificationWithEntity{Notification: &n} 195 196 // populate repo if present 197 if rId.Valid { 198 repo.Id = rId.Int64 199 if rDid.Valid { 200 repo.Did = rDid.String 201 } 202 if rRkey.Valid { 203 repo.Rkey = rRkey.String 204 } 205 if rName.Valid { 206 repo.Name = rName.String 207 } 208 if rDescription.Valid { 209 repo.Description = rDescription.String 210 } 211 if rWebsite.Valid { 212 repo.Website = rWebsite.String 213 } 214 if rTopicStr.Valid { 215 repo.Topics = strings.Fields(rTopicStr.String) 216 } 217 entry.Repo = &repo 218 } 219 220 // populate issue if present 221 if iId.Valid { 222 issue.Id = iId.Int64 223 if iDid.Valid { 224 issue.Did = iDid.String 225 } 226 if iIssueId.Valid { 227 issue.IssueId = int(iIssueId.Int64) 228 } 229 if iTitle.Valid { 230 issue.Title = iTitle.String 231 } 232 if iOpen.Valid { 233 issue.Open = iOpen.Bool 234 } 235 entry.Issue = &issue 236 } 237 238 // populate pull if present 239 if pId.Valid { 240 pull.ID = int(pId.Int64) 241 if pOwnerDid.Valid { 242 pull.OwnerDid = pOwnerDid.String 243 } 244 if pPullId.Valid { 245 pull.PullId = int(pPullId.Int64) 246 } 247 if pTitle.Valid { 248 pull.Title = pTitle.String 249 } 250 if pState.Valid { 251 pull.State = models.PullState(pState.Int64) 252 } 253 entry.Pull = &pull 254 } 255 256 notifications = append(notifications, entry) 257 } 258 259 return notifications, nil 260} 261 262// GetNotifications retrieves notifications with filters 263func GetNotifications(e Execer, filters ...orm.Filter) ([]*models.Notification, error) { 264 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...) 265} 266 267func CountNotifications(e Execer, filters ...orm.Filter) (int64, error) { 268 var conditions []string 269 var args []any 270 for _, filter := range filters { 271 conditions = append(conditions, filter.Condition()) 272 args = append(args, filter.Arg()...) 273 } 274 275 whereClause := "" 276 if conditions != nil { 277 whereClause = " where " + strings.Join(conditions, " and ") 278 } 279 280 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause) 281 var count int64 282 err := e.QueryRow(query, args...).Scan(&count) 283 284 if !errors.Is(err, sql.ErrNoRows) && err != nil { 285 return 0, err 286 } 287 288 return count, nil 289} 290 291func MarkNotificationRead(e Execer, notificationID int64, userDID string) error { 292 idFilter := orm.FilterEq("id", notificationID) 293 recipientFilter := orm.FilterEq("recipient_did", userDID) 294 295 query := fmt.Sprintf(` 296 UPDATE notifications 297 SET read = 1 298 WHERE %s AND %s 299 `, idFilter.Condition(), recipientFilter.Condition()) 300 301 args := append(idFilter.Arg(), recipientFilter.Arg()...) 302 303 result, err := e.Exec(query, args...) 304 if err != nil { 305 return fmt.Errorf("failed to mark notification as read: %w", err) 306 } 307 308 rowsAffected, err := result.RowsAffected() 309 if err != nil { 310 return fmt.Errorf("failed to get rows affected: %w", err) 311 } 312 313 if rowsAffected == 0 { 314 return fmt.Errorf("notification not found or access denied") 315 } 316 317 return nil 318} 319 320func MarkAllNotificationsRead(e Execer, userDID string) error { 321 recipientFilter := orm.FilterEq("recipient_did", userDID) 322 readFilter := orm.FilterEq("read", 0) 323 324 query := fmt.Sprintf(` 325 UPDATE notifications 326 SET read = 1 327 WHERE %s AND %s 328 `, recipientFilter.Condition(), readFilter.Condition()) 329 330 args := append(recipientFilter.Arg(), readFilter.Arg()...) 331 332 _, err := e.Exec(query, args...) 333 if err != nil { 334 return fmt.Errorf("failed to mark all notifications as read: %w", err) 335 } 336 337 return nil 338} 339 340func DeleteNotification(e Execer, notificationID int64, userDID string) error { 341 idFilter := orm.FilterEq("id", notificationID) 342 recipientFilter := orm.FilterEq("recipient_did", userDID) 343 344 query := fmt.Sprintf(` 345 DELETE FROM notifications 346 WHERE %s AND %s 347 `, idFilter.Condition(), recipientFilter.Condition()) 348 349 args := append(idFilter.Arg(), recipientFilter.Arg()...) 350 351 result, err := e.Exec(query, args...) 352 if err != nil { 353 return fmt.Errorf("failed to delete notification: %w", err) 354 } 355 356 rowsAffected, err := result.RowsAffected() 357 if err != nil { 358 return fmt.Errorf("failed to get rows affected: %w", err) 359 } 360 361 if rowsAffected == 0 { 362 return fmt.Errorf("notification not found or access denied") 363 } 364 365 return nil 366} 367 368func GetNotificationPreference(e Execer, userDid string) (*models.NotificationPreferences, error) { 369 prefs, err := GetNotificationPreferences(e, orm.FilterEq("user_did", userDid)) 370 if err != nil { 371 return nil, err 372 } 373 374 p, ok := prefs[syntax.DID(userDid)] 375 if !ok { 376 return models.DefaultNotificationPreferences(syntax.DID(userDid)), nil 377 } 378 379 return p, nil 380} 381 382func GetNotificationPreferences(e Execer, filters ...orm.Filter) (map[syntax.DID]*models.NotificationPreferences, error) { 383 prefsMap := make(map[syntax.DID]*models.NotificationPreferences) 384 385 var conditions []string 386 var args []any 387 for _, filter := range filters { 388 conditions = append(conditions, filter.Condition()) 389 args = append(args, filter.Arg()...) 390 } 391 392 whereClause := "" 393 if conditions != nil { 394 whereClause = " where " + strings.Join(conditions, " and ") 395 } 396 397 query := fmt.Sprintf(` 398 select 399 id, 400 user_did, 401 repo_starred, 402 issue_created, 403 issue_commented, 404 pull_created, 405 pull_commented, 406 followed, 407 user_mentioned, 408 pull_merged, 409 issue_closed, 410 email_notifications 411 from 412 notification_preferences 413 %s 414 `, whereClause) 415 416 rows, err := e.Query(query, args...) 417 if err != nil { 418 return nil, err 419 } 420 defer rows.Close() 421 422 for rows.Next() { 423 var prefs models.NotificationPreferences 424 if err := rows.Scan( 425 &prefs.ID, 426 &prefs.UserDid, 427 &prefs.RepoStarred, 428 &prefs.IssueCreated, 429 &prefs.IssueCommented, 430 &prefs.PullCreated, 431 &prefs.PullCommented, 432 &prefs.Followed, 433 &prefs.UserMentioned, 434 &prefs.PullMerged, 435 &prefs.IssueClosed, 436 &prefs.EmailNotifications, 437 ); err != nil { 438 return nil, err 439 } 440 441 prefsMap[prefs.UserDid] = &prefs 442 } 443 444 if err := rows.Err(); err != nil { 445 return nil, err 446 } 447 448 return prefsMap, nil 449} 450 451func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error { 452 query := ` 453 INSERT OR REPLACE INTO notification_preferences 454 (user_did, repo_starred, issue_created, issue_commented, pull_created, 455 pull_commented, followed, user_mentioned, pull_merged, issue_closed, 456 email_notifications) 457 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 458 ` 459 460 result, err := d.DB.ExecContext(ctx, query, 461 prefs.UserDid, 462 prefs.RepoStarred, 463 prefs.IssueCreated, 464 prefs.IssueCommented, 465 prefs.PullCreated, 466 prefs.PullCommented, 467 prefs.Followed, 468 prefs.UserMentioned, 469 prefs.PullMerged, 470 prefs.IssueClosed, 471 prefs.EmailNotifications, 472 ) 473 if err != nil { 474 return fmt.Errorf("failed to update notification preferences: %w", err) 475 } 476 477 if prefs.ID == 0 { 478 id, err := result.LastInsertId() 479 if err != nil { 480 return fmt.Errorf("failed to get preferences ID: %w", err) 481 } 482 prefs.ID = id 483 } 484 485 return nil 486} 487 488func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error { 489 cutoff := time.Now().Add(-olderThan) 490 createdFilter := orm.FilterLte("created", cutoff) 491 492 query := fmt.Sprintf(` 493 DELETE FROM notifications 494 WHERE %s 495 `, createdFilter.Condition()) 496 497 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...) 498 if err != nil { 499 return fmt.Errorf("failed to cleanup old notifications: %w", err) 500 } 501 502 return nil 503}