Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

1package db 2 3import ( 4 "context" 5 "slices" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/appview/db" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/appview/notify" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39func (n *databaseNotifier) DeleteRepo(ctx context.Context, repo *models.Repo) { 40 // no-op for now 41} 42 43func (n *databaseNotifier) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) { 44} 45 46func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 47 l := log.FromContext(ctx) 48 49 if star.SubjectType != models.StarSubjectRepo { 50 return 51 } 52 53 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", star.Subject)) 54 if err != nil { 55 l.Error("failed to get repos", "err", err) 56 return 57 } 58 59 actorDid := syntax.DID(star.Did) 60 recipients := sets.Singleton(syntax.DID(repo.Did)) 61 eventType := models.NotificationTypeRepoStarred 62 entityType := "repo" 63 entityId := star.Subject 64 repoId := &repo.Id 65 var issueId *int64 66 var pullId *int64 67 68 n.notifyEvent( 69 ctx, 70 actorDid, 71 recipients, 72 eventType, 73 entityType, 74 entityId, 75 repoId, 76 issueId, 77 pullId, 78 ) 79} 80 81func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 82 // no-op 83} 84 85func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 86 l := log.FromContext(ctx) 87 88 var ( 89 // built the recipients list: 90 // - the owner of the repo 91 // - | if the comment is a reply -> everybody on that thread 92 // | if the comment is a top level -> just the issue owner 93 // - remove mentioned users from the recipients list 94 recipients = sets.New[syntax.DID]() 95 entityType string 96 entityId string 97 repoId *int64 98 issueId *int64 99 pullId *int64 100 ) 101 102 subjectAt := syntax.ATURI(comment.Subject.Uri) 103 104 switch subjectAt.Collection() { 105 case tangled.RepoIssueNSID: 106 issues, err := db.GetIssues( 107 n.db, 108 orm.FilterEq("at_uri", subjectAt), 109 ) 110 if err != nil { 111 l.Error("failed to get issues", "err", err) 112 return 113 } 114 if len(issues) == 0 { 115 l.Error("no issue found", "subject", comment.Subject) 116 return 117 } 118 issue := issues[0] 119 120 recipients.Insert(syntax.DID(issue.Repo.Did)) 121 if comment.IsReply() { 122 // if this comment is a reply, then notify everybody in that thread 123 parent := *comment.ReplyTo 124 125 // find the parent thread, and add all DIDs from here to the recipient list 126 for _, t := range models.NewCommentList(issue.Comments) { 127 if t.Self.AtUri() == syntax.ATURI(parent.Uri) { 128 for _, p := range t.Participants() { 129 recipients.Insert(p) 130 } 131 } 132 } 133 } else { 134 // not a reply, notify just the issue author 135 recipients.Insert(syntax.DID(issue.Did)) 136 } 137 138 entityType = "issue" 139 entityId = issue.AtUri().String() 140 repoId = &issue.Repo.Id 141 issueId = &issue.Id 142 143 for _, m := range mentions { 144 recipients.Remove(m) 145 } 146 147 n.notifyEvent( 148 ctx, 149 comment.Did, 150 recipients, 151 models.NotificationTypeIssueCommented, 152 entityType, 153 entityId, 154 repoId, 155 issueId, 156 pullId, 157 ) 158 159 case tangled.RepoPullNSID: 160 pull, err := db.GetPull( 161 n.db, 162 orm.FilterEq("owner_did", subjectAt.Authority()), 163 orm.FilterEq("rkey", subjectAt.RecordKey()), 164 ) 165 if err != nil { 166 l.Error("NewComment: failed to get pull", "err", err) 167 return 168 } 169 170 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("repo_did", pull.RepoDid)) 171 if err != nil { 172 l.Error("NewComment: failed to get repo", "err", err) 173 return 174 } 175 176 recipients.Insert(syntax.DID(pull.Repo.Did)) 177 for _, p := range pull.Participants() { 178 recipients.Insert(syntax.DID(p)) 179 } 180 181 entityType = "pull" 182 entityId = pull.AtUri().String() 183 repoId = &pull.Repo.Id 184 p := int64(pull.ID) 185 pullId = &p 186 187 for _, m := range mentions { 188 recipients.Remove(m) 189 } 190 191 n.notifyEvent( 192 ctx, 193 comment.Did, 194 recipients, 195 models.NotificationTypePullCommented, 196 entityType, 197 entityId, 198 repoId, 199 issueId, 200 pullId, 201 ) 202 default: 203 return // no-op 204 } 205 206 n.notifyEvent( 207 ctx, 208 comment.Did, 209 sets.Collect(slices.Values(mentions)), 210 models.NotificationTypeUserMentioned, 211 entityType, 212 entityId, 213 repoId, 214 issueId, 215 pullId, 216 ) 217} 218 219func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 220 // no-op 221} 222 223func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 224 l := log.FromContext(ctx) 225 226 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid))) 227 if err != nil { 228 l.Error("failed to fetch collaborators", "err", err) 229 return 230 } 231 232 // build the recipients list 233 // - owner of the repo 234 // - collaborators in the repo 235 // - remove users already mentioned 236 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 237 for _, c := range collaborators { 238 recipients.Insert(c.SubjectDid) 239 } 240 for _, m := range mentions { 241 recipients.Remove(m) 242 } 243 244 actorDid := syntax.DID(issue.Did) 245 entityType := "issue" 246 entityId := issue.AtUri().String() 247 repoId := &issue.Repo.Id 248 issueId := &issue.Id 249 var pullId *int64 250 251 n.notifyEvent( 252 ctx, 253 actorDid, 254 recipients, 255 models.NotificationTypeIssueCreated, 256 entityType, 257 entityId, 258 repoId, 259 issueId, 260 pullId, 261 ) 262 n.notifyEvent( 263 ctx, 264 actorDid, 265 sets.Collect(slices.Values(mentions)), 266 models.NotificationTypeUserMentioned, 267 entityType, 268 entityId, 269 repoId, 270 issueId, 271 pullId, 272 ) 273} 274 275func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 276 // no-op for now 277} 278 279func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {} 280func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {} 281 282func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 283 actorDid := syntax.DID(follow.UserDid) 284 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 285 eventType := models.NotificationTypeFollowed 286 entityType := "follow" 287 entityId := follow.UserDid 288 var repoId, issueId, pullId *int64 289 290 n.notifyEvent( 291 ctx, 292 actorDid, 293 recipients, 294 eventType, 295 entityType, 296 entityId, 297 repoId, 298 issueId, 299 pullId, 300 ) 301} 302 303func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 304 // no-op 305} 306 307func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 308 l := log.FromContext(ctx) 309 310 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 311 if err != nil { 312 l.Error("failed to get repos", "err", err) 313 return 314 } 315 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 316 if err != nil { 317 l.Error("failed to fetch collaborators", "err", err) 318 return 319 } 320 321 // build the recipients list 322 // - owner of the repo 323 // - collaborators in the repo 324 recipients := sets.Singleton(syntax.DID(repo.Did)) 325 for _, c := range collaborators { 326 recipients.Insert(c.SubjectDid) 327 } 328 329 actorDid := syntax.DID(pull.OwnerDid) 330 eventType := models.NotificationTypePullCreated 331 entityType := "pull" 332 entityId := pull.AtUri().String() 333 repoId := &repo.Id 334 var issueId *int64 335 p := int64(pull.ID) 336 pullId := &p 337 338 n.notifyEvent( 339 ctx, 340 actorDid, 341 recipients, 342 eventType, 343 entityType, 344 entityId, 345 repoId, 346 issueId, 347 pullId, 348 ) 349} 350 351func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 352 // no-op 353} 354 355func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 356 // no-op 357} 358 359func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 360 // no-op 361} 362 363func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 364 // no-op 365} 366 367func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 368 // no-op for now; webhooks are handled by the webhook notifier 369} 370 371func (n *databaseNotifier) Clone(ctx context.Context, repo *models.Repo) { 372 // no-op 373} 374 375func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 376 l := log.FromContext(ctx) 377 378 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid))) 379 if err != nil { 380 l.Error("failed to fetch collaborators", "err", err) 381 return 382 } 383 384 // build up the recipients list: 385 // - repo owner 386 // - repo collaborators 387 // - all issue participants 388 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 389 for _, c := range collaborators { 390 recipients.Insert(c.SubjectDid) 391 } 392 for _, p := range issue.Participants() { 393 recipients.Insert(syntax.DID(p)) 394 } 395 396 entityType := "issue" 397 entityId := issue.AtUri().String() 398 repoId := &issue.Repo.Id 399 issueId := &issue.Id 400 var pullId *int64 401 var eventType models.NotificationType 402 403 if issue.Open { 404 eventType = models.NotificationTypeIssueReopen 405 } else { 406 eventType = models.NotificationTypeIssueClosed 407 } 408 409 n.notifyEvent( 410 ctx, 411 actor, 412 recipients, 413 eventType, 414 entityType, 415 entityId, 416 repoId, 417 issueId, 418 pullId, 419 ) 420} 421 422func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 423 l := log.FromContext(ctx) 424 425 // Get repo details 426 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 427 if err != nil { 428 l.Error("failed to get repos", "err", err) 429 return 430 } 431 432 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 433 if err != nil { 434 l.Error("failed to fetch collaborators", "err", err) 435 return 436 } 437 438 // build up the recipients list: 439 // - repo owner 440 // - all pull participants 441 recipients := sets.Singleton(syntax.DID(repo.Did)) 442 for _, c := range collaborators { 443 recipients.Insert(c.SubjectDid) 444 } 445 for _, p := range pull.Participants() { 446 recipients.Insert(p) 447 } 448 449 entityType := "pull" 450 entityId := pull.AtUri().String() 451 repoId := &repo.Id 452 var issueId *int64 453 var eventType models.NotificationType 454 switch pull.State { 455 case models.PullClosed: 456 eventType = models.NotificationTypePullClosed 457 case models.PullOpen: 458 eventType = models.NotificationTypePullReopen 459 case models.PullMerged: 460 eventType = models.NotificationTypePullMerged 461 default: 462 l.Error("unexpected new PR state", "state", pull.State) 463 return 464 } 465 p := int64(pull.ID) 466 pullId := &p 467 468 n.notifyEvent( 469 ctx, 470 actor, 471 recipients, 472 eventType, 473 entityType, 474 entityId, 475 repoId, 476 issueId, 477 pullId, 478 ) 479} 480 481func (n *databaseNotifier) notifyEvent( 482 ctx context.Context, 483 actorDid syntax.DID, 484 recipients sets.Set[syntax.DID], 485 eventType models.NotificationType, 486 entityType string, 487 entityId string, 488 repoId *int64, 489 issueId *int64, 490 pullId *int64, 491) { 492 l := log.FromContext(ctx) 493 494 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 495 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 496 return 497 } 498 499 recipients.Remove(actorDid) 500 501 prefMap, err := db.GetNotificationPreferences( 502 n.db, 503 orm.FilterIn("user_did", slices.Collect(recipients.All())), 504 ) 505 if err != nil { 506 // failed to get prefs for users 507 return 508 } 509 510 // create a transaction for bulk notification storage 511 tx, err := n.db.Begin() 512 if err != nil { 513 // failed to start tx 514 return 515 } 516 defer tx.Rollback() 517 518 // filter based on preferences 519 for recipientDid := range recipients.All() { 520 prefs, ok := prefMap[recipientDid] 521 if !ok { 522 prefs = models.DefaultNotificationPreferences(recipientDid) 523 } 524 525 // skip users who don’t want this type 526 if !prefs.ShouldNotify(eventType) { 527 continue 528 } 529 530 // create notification 531 notif := &models.Notification{ 532 RecipientDid: recipientDid.String(), 533 ActorDid: actorDid.String(), 534 Type: eventType, 535 EntityType: entityType, 536 EntityId: entityId, 537 RepoId: repoId, 538 IssueId: issueId, 539 PullId: pullId, 540 } 541 542 if err := db.CreateNotification(tx, notif); err != nil { 543 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err) 544 } 545 } 546 547 if err := tx.Commit(); err != nil { 548 // failed to commit 549 return 550 } 551}