Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

at icy/kxpzqo 14 kB View raw
1package db 2 3import ( 4 "context" 5 "slices" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/appview/db" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/appview/notify" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20 assigneeLabelAt = "at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/assignee" 21) 22 23type databaseNotifier struct { 24 db *db.DB 25 res *idresolver.Resolver 26} 27 28func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 29 return &databaseNotifier{ 30 db: database, 31 res: resolver, 32 } 33} 34 35var _ notify.Notifier = &databaseNotifier{} 36 37func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 38 // no-op for now 39} 40func (n *databaseNotifier) DeleteRepo(ctx context.Context, repo *models.Repo) { 41 // no-op for now 42} 43 44func (n *databaseNotifier) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) { 45} 46 47func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 48 l := log.FromContext(ctx) 49 50 if star.SubjectType != models.StarSubjectRepo { 51 return 52 } 53 54 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", star.Subject)) 55 if err != nil { 56 l.Error("failed to get repos", "err", err) 57 return 58 } 59 60 actorDid := syntax.DID(star.Did) 61 recipients := sets.Singleton(syntax.DID(repo.Did)) 62 eventType := models.NotificationTypeRepoStarred 63 entityType := "repo" 64 entityId := star.Subject 65 repoId := &repo.Id 66 var issueId *int64 67 var pullId *int64 68 69 n.notifyEvent( 70 ctx, 71 actorDid, 72 recipients, 73 eventType, 74 entityType, 75 entityId, 76 repoId, 77 issueId, 78 pullId, 79 ) 80} 81 82func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 83 // no-op 84} 85 86func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 87 l := log.FromContext(ctx) 88 89 var ( 90 // built the recipients list: 91 // - the owner of the repo 92 // - | if the comment is a reply -> everybody on that thread 93 // | if the comment is a top level -> just the issue owner 94 // - remove mentioned users from the recipients list 95 recipients = sets.New[syntax.DID]() 96 entityType string 97 entityId string 98 repoId *int64 99 issueId *int64 100 pullId *int64 101 ) 102 103 subjectAt := syntax.ATURI(comment.Subject.Uri) 104 105 switch subjectAt.Collection() { 106 case tangled.RepoIssueNSID: 107 issues, err := db.GetIssues( 108 n.db, 109 orm.FilterEq("at_uri", subjectAt), 110 ) 111 if err != nil { 112 l.Error("failed to get issues", "err", err) 113 return 114 } 115 if len(issues) == 0 { 116 l.Error("no issue found", "subject", comment.Subject) 117 return 118 } 119 issue := issues[0] 120 121 recipients.Insert(syntax.DID(issue.Repo.Did)) 122 if comment.IsReply() { 123 // if this comment is a reply, then notify everybody in that thread 124 parent := *comment.ReplyTo 125 126 // find the parent thread, and add all DIDs from here to the recipient list 127 for _, t := range models.NewCommentList(issue.Comments) { 128 if t.Self.AtUri() == syntax.ATURI(parent.Uri) { 129 for _, p := range t.Participants() { 130 recipients.Insert(p) 131 } 132 } 133 } 134 } else { 135 // not a reply, notify just the issue author 136 recipients.Insert(syntax.DID(issue.Did)) 137 } 138 139 entityType = "issue" 140 entityId = issue.AtUri().String() 141 repoId = &issue.Repo.Id 142 issueId = &issue.Id 143 144 for _, m := range mentions { 145 recipients.Remove(m) 146 } 147 148 n.notifyEvent( 149 ctx, 150 comment.Did, 151 recipients, 152 models.NotificationTypeIssueCommented, 153 entityType, 154 entityId, 155 repoId, 156 issueId, 157 pullId, 158 ) 159 160 case tangled.RepoPullNSID: 161 pull, err := db.GetPull( 162 n.db, 163 orm.FilterEq("owner_did", subjectAt.Authority()), 164 orm.FilterEq("rkey", subjectAt.RecordKey()), 165 ) 166 if err != nil { 167 l.Error("NewComment: failed to get pull", "err", err) 168 return 169 } 170 171 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("repo_did", pull.RepoDid)) 172 if err != nil { 173 l.Error("NewComment: failed to get repo", "err", err) 174 return 175 } 176 177 recipients.Insert(syntax.DID(pull.Repo.Did)) 178 for _, p := range pull.Participants() { 179 recipients.Insert(syntax.DID(p)) 180 } 181 182 entityType = "pull" 183 entityId = pull.AtUri().String() 184 repoId = &pull.Repo.Id 185 p := int64(pull.ID) 186 pullId = &p 187 188 for _, m := range mentions { 189 recipients.Remove(m) 190 } 191 192 n.notifyEvent( 193 ctx, 194 comment.Did, 195 recipients, 196 models.NotificationTypePullCommented, 197 entityType, 198 entityId, 199 repoId, 200 issueId, 201 pullId, 202 ) 203 default: 204 return // no-op 205 } 206 207 n.notifyEvent( 208 ctx, 209 comment.Did, 210 sets.Collect(slices.Values(mentions)), 211 models.NotificationTypeUserMentioned, 212 entityType, 213 entityId, 214 repoId, 215 issueId, 216 pullId, 217 ) 218} 219 220func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 221 // no-op 222} 223 224func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 225 l := log.FromContext(ctx) 226 227 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid))) 228 if err != nil { 229 l.Error("failed to fetch collaborators", "err", err) 230 return 231 } 232 233 // build the recipients list 234 // - owner of the repo 235 // - collaborators in the repo 236 // - remove users already mentioned 237 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 238 for _, c := range collaborators { 239 recipients.Insert(c.SubjectDid) 240 } 241 for _, m := range mentions { 242 recipients.Remove(m) 243 } 244 245 actorDid := syntax.DID(issue.Did) 246 entityType := "issue" 247 entityId := issue.AtUri().String() 248 repoId := &issue.Repo.Id 249 issueId := &issue.Id 250 var pullId *int64 251 252 n.notifyEvent( 253 ctx, 254 actorDid, 255 recipients, 256 models.NotificationTypeIssueCreated, 257 entityType, 258 entityId, 259 repoId, 260 issueId, 261 pullId, 262 ) 263 n.notifyEvent( 264 ctx, 265 actorDid, 266 sets.Collect(slices.Values(mentions)), 267 models.NotificationTypeUserMentioned, 268 entityType, 269 entityId, 270 repoId, 271 issueId, 272 pullId, 273 ) 274} 275 276func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 277 // no-op for now 278} 279 280func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, actor syntax.DID, issue *models.Issue, ops []models.LabelOp) { 281 entityType := "issue" 282 entityId := issue.AtUri().String() 283 repoId := &issue.Repo.Id 284 issueId := &issue.Id 285 var pullId *int64 286 287 assigned := sets.New[syntax.DID]() 288 unassigned := sets.New[syntax.DID]() 289 for _, op := range ops { 290 if op.OperandKey != assigneeLabelAt { 291 continue 292 } 293 assignee := syntax.DID(op.OperandValue) 294 switch op.Operation { 295 case models.LabelOperationAdd: 296 assigned.Insert(assignee) 297 case models.LabelOperationDel: 298 unassigned.Insert(assignee) 299 default: 300 continue 301 } 302 } 303 304 n.notifyEvent(ctx, actor, assigned, models.NotificationTypeIssueAssigned, entityType, entityId, repoId, issueId, pullId) 305 n.notifyEvent(ctx, actor, unassigned, models.NotificationTypeIssueUnassigned, entityType, entityId, repoId, issueId, pullId) 306} 307 308func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, actor syntax.DID, pull *models.Pull, ops []models.LabelOp) { 309 entityType := "pull" 310 entityId := pull.AtUri().String() 311 repoId := &pull.Repo.Id 312 var issueId *int64 313 p := int64(pull.ID) 314 pullId := &p 315 316 assigned := sets.New[syntax.DID]() 317 unassigned := sets.New[syntax.DID]() 318 for _, op := range ops { 319 if op.OperandKey != assigneeLabelAt { 320 continue 321 } 322 assignee := syntax.DID(op.OperandValue) 323 switch op.Operation { 324 case models.LabelOperationAdd: 325 assigned.Insert(assignee) 326 case models.LabelOperationDel: 327 unassigned.Insert(assignee) 328 default: 329 continue 330 } 331 } 332 333 n.notifyEvent(ctx, actor, assigned, models.NotificationTypePullAssigned, entityType, entityId, repoId, issueId, pullId) 334 n.notifyEvent(ctx, actor, unassigned, models.NotificationTypePullUnassigned, entityType, entityId, repoId, issueId, pullId) 335} 336 337func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 338 actorDid := syntax.DID(follow.UserDid) 339 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 340 eventType := models.NotificationTypeFollowed 341 entityType := "follow" 342 entityId := follow.UserDid 343 var repoId, issueId, pullId *int64 344 345 n.notifyEvent( 346 ctx, 347 actorDid, 348 recipients, 349 eventType, 350 entityType, 351 entityId, 352 repoId, 353 issueId, 354 pullId, 355 ) 356} 357 358func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 359 // no-op 360} 361 362func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 363 l := log.FromContext(ctx) 364 365 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 366 if err != nil { 367 l.Error("failed to get repos", "err", err) 368 return 369 } 370 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 371 if err != nil { 372 l.Error("failed to fetch collaborators", "err", err) 373 return 374 } 375 376 // build the recipients list 377 // - owner of the repo 378 // - collaborators in the repo 379 recipients := sets.Singleton(syntax.DID(repo.Did)) 380 for _, c := range collaborators { 381 recipients.Insert(c.SubjectDid) 382 } 383 384 actorDid := syntax.DID(pull.OwnerDid) 385 eventType := models.NotificationTypePullCreated 386 entityType := "pull" 387 entityId := pull.AtUri().String() 388 repoId := &repo.Id 389 var issueId *int64 390 p := int64(pull.ID) 391 pullId := &p 392 393 n.notifyEvent( 394 ctx, 395 actorDid, 396 recipients, 397 eventType, 398 entityType, 399 entityId, 400 repoId, 401 issueId, 402 pullId, 403 ) 404} 405 406func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 407 // no-op 408} 409 410func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 411 // no-op 412} 413 414func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 415 // no-op 416} 417 418func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 419 // no-op 420} 421 422func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 423 // no-op for now; webhooks are handled by the webhook notifier 424} 425 426func (n *databaseNotifier) Clone(ctx context.Context, repo *models.Repo) { 427 // no-op 428} 429 430func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 431 l := log.FromContext(ctx) 432 433 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(issue.RepoDid))) 434 if err != nil { 435 l.Error("failed to fetch collaborators", "err", err) 436 return 437 } 438 439 // build up the recipients list: 440 // - repo owner 441 // - repo collaborators 442 // - all issue participants 443 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 444 for _, c := range collaborators { 445 recipients.Insert(c.SubjectDid) 446 } 447 for _, p := range issue.Participants() { 448 recipients.Insert(syntax.DID(p)) 449 } 450 451 entityType := "issue" 452 entityId := issue.AtUri().String() 453 repoId := &issue.Repo.Id 454 issueId := &issue.Id 455 var pullId *int64 456 var eventType models.NotificationType 457 458 if issue.Open { 459 eventType = models.NotificationTypeIssueReopen 460 } else { 461 eventType = models.NotificationTypeIssueClosed 462 } 463 464 n.notifyEvent( 465 ctx, 466 actor, 467 recipients, 468 eventType, 469 entityType, 470 entityId, 471 repoId, 472 issueId, 473 pullId, 474 ) 475} 476 477func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 478 l := log.FromContext(ctx) 479 480 // Get repo details 481 repo, err := db.GetRepo(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 482 if err != nil { 483 l.Error("failed to get repos", "err", err) 484 return 485 } 486 487 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_did", string(pull.RepoDid))) 488 if err != nil { 489 l.Error("failed to fetch collaborators", "err", err) 490 return 491 } 492 493 // build up the recipients list: 494 // - repo owner 495 // - all pull participants 496 recipients := sets.Singleton(syntax.DID(repo.Did)) 497 for _, c := range collaborators { 498 recipients.Insert(c.SubjectDid) 499 } 500 for _, p := range pull.Participants() { 501 recipients.Insert(p) 502 } 503 504 entityType := "pull" 505 entityId := pull.AtUri().String() 506 repoId := &repo.Id 507 var issueId *int64 508 var eventType models.NotificationType 509 switch pull.State { 510 case models.PullClosed: 511 eventType = models.NotificationTypePullClosed 512 case models.PullOpen: 513 eventType = models.NotificationTypePullReopen 514 case models.PullMerged: 515 eventType = models.NotificationTypePullMerged 516 default: 517 l.Error("unexpected new PR state", "state", pull.State) 518 return 519 } 520 p := int64(pull.ID) 521 pullId := &p 522 523 n.notifyEvent( 524 ctx, 525 actor, 526 recipients, 527 eventType, 528 entityType, 529 entityId, 530 repoId, 531 issueId, 532 pullId, 533 ) 534} 535 536func (n *databaseNotifier) notifyEvent( 537 ctx context.Context, 538 actorDid syntax.DID, 539 recipients sets.Set[syntax.DID], 540 eventType models.NotificationType, 541 entityType string, 542 entityId string, 543 repoId *int64, 544 issueId *int64, 545 pullId *int64, 546) { 547 l := log.FromContext(ctx) 548 549 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 550 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 551 return 552 } 553 554 recipients.Remove(actorDid) 555 556 prefMap, err := db.GetNotificationPreferences( 557 n.db, 558 orm.FilterIn("user_did", slices.Collect(recipients.All())), 559 ) 560 if err != nil { 561 // failed to get prefs for users 562 return 563 } 564 565 // create a transaction for bulk notification storage 566 tx, err := n.db.Begin() 567 if err != nil { 568 // failed to start tx 569 return 570 } 571 defer tx.Rollback() 572 573 // filter based on preferences 574 for recipientDid := range recipients.All() { 575 prefs, ok := prefMap[recipientDid] 576 if !ok { 577 prefs = models.DefaultNotificationPreferences(recipientDid) 578 } 579 580 // skip users who don’t want this type 581 if !prefs.ShouldNotify(eventType) { 582 continue 583 } 584 585 // create notification 586 notif := &models.Notification{ 587 RecipientDid: recipientDid.String(), 588 ActorDid: actorDid.String(), 589 Type: eventType, 590 EntityType: entityType, 591 EntityId: entityId, 592 RepoId: repoId, 593 IssueId: issueId, 594 PullId: pullId, 595 } 596 597 if err := db.CreateNotification(tx, notif); err != nil { 598 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err) 599 } 600 } 601 602 if err := tx.Commit(); err != nil { 603 // failed to commit 604 return 605 } 606}