Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

appview: upsert star/reaction/follow records

Most service flow will be:

1. start db transaction
2. run db operation
3. run PDS operation
4. rollback db if anything above failed
5. commit transaction

If PDS operation succeed, don't try rollback anymore. The ingester will
backfill the missed db operations.

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 25, 2026, 2:51 AM +0900) commit 67b55bae parent 70893649 change-id zwpskkpn
+149 -82
+12 -3
appview/db/follow.go
··· 11 11 "tangled.org/core/orm" 12 12 ) 13 13 14 - func AddFollow(e Execer, follow *models.Follow) error { 15 - query := `insert or ignore into follows (did, subject_did, rkey) values (?, ?, ?)` 16 - _, err := e.Exec(query, follow.UserDid, follow.SubjectDid, follow.Rkey) 14 + func UpsertFollow(e Execer, follow models.Follow) error { 15 + _, err := e.Exec( 16 + `insert into follows (did, rkey, subject_did, created) 17 + values (?, ?, ?, ?) 18 + on conflict(did, rkey) do update set 19 + subject_did = excluded.subject_did, 20 + created = excluded.created`, 21 + follow.UserDid, 22 + follow.Rkey, 23 + follow.SubjectDid, 24 + follow.FollowedAt.Format(time.RFC3339), 25 + ) 17 26 return err 18 27 } 19 28
+14 -3
appview/db/reaction.go
··· 10 10 "tangled.org/core/orm" 11 11 ) 12 12 13 - func AddReaction(e Execer, did string, subjectAt syntax.ATURI, kind models.ReactionKind, rkey string, created time.Time) error { 14 - query := `insert or ignore into reactions (did, subject_at, kind, rkey, created) values (?, ?, ?, ?, ?)` 15 - _, err := e.Exec(query, did, subjectAt, kind, rkey, created.UTC().Format(time.RFC3339)) 13 + func UpsertReaction(e Execer, reaction models.Reaction) error { 14 + _, err := e.Exec( 15 + `insert into reactions (did, rkey, subject_at, kind, created) 16 + values (?, ?, ?, ?, ?) 17 + on conflict(did, rkey) do update set 18 + subject_at = excluded.subject_at, 19 + kind = excluded.kind, 20 + created = excluded.created`, 21 + reaction.ReactedByDid, 22 + reaction.Rkey, 23 + reaction.ThreadAt, 24 + reaction.Kind, 25 + reaction.Created.Format(time.RFC3339), 26 + ) 16 27 return err 17 28 } 18 29
+9 -4
appview/db/star.go
··· 13 13 "tangled.org/core/orm" 14 14 ) 15 15 16 - func AddStar(e Execer, star *models.Star) error { 17 - query := `insert or ignore into stars (did, subject_type, subject, rkey) values (?, ?, ?, ?)` 16 + func UpsertStar(e Execer, star models.Star) error { 18 17 _, err := e.Exec( 19 - query, 18 + `insert into stars (did, rkey, subject_type, subject, created) 19 + values (?, ?, ?, ?, ?) 20 + on conflict(did, rkey) do update set 21 + subject_type = excluded.subject_type, 22 + subject = excluded.subject, 23 + created = excluded.created`, 20 24 star.Did, 25 + star.Rkey, 21 26 string(star.SubjectType), 22 27 star.Subject, 23 - star.Rkey, 28 + star.Created.Format(time.RFC3339), 24 29 ) 25 30 return err 26 31 }
+12 -18
appview/ingester.go
··· 200 200 record := tangled.FeedStar{} 201 201 unmarshalErr := json.Unmarshal(raw, &record) 202 202 203 - star := &models.Star{ 203 + star := models.Star{ 204 204 Did: did, 205 205 Rkey: e.Commit.RKey, 206 206 } 207 207 208 208 switch { 209 209 case unmarshalErr != nil: 210 - resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 210 + resolved, resolveErr := i.resolveOldFormatStar(raw, &star, l) 211 211 if resolveErr != nil { 212 212 l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 213 213 return unmarshalErr ··· 236 236 return fmt.Errorf("star record has empty subject union") 237 237 } 238 238 239 - err = db.AddStar(i.Db, star) 239 + err = db.UpsertStar(i.Db, star) 240 240 case jmodels.CommitOperationDelete: 241 241 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 242 242 } ··· 265 265 return err 266 266 } 267 267 268 - err = db.AddFollow(i.Db, &models.Follow{ 268 + err = db.UpsertFollow(i.Db, models.Follow{ 269 269 UserDid: did, 270 270 SubjectDid: record.Subject, 271 271 Rkey: e.Commit.RKey, ··· 1733 1733 created = time.Now() 1734 1734 } 1735 1735 1736 - tx, err := i.Db.Begin() 1737 - if err != nil { 1738 - return fmt.Errorf("failed to start transaction: %w", err) 1739 - } 1740 - defer tx.Rollback() 1741 - 1742 - if err := db.DeleteReactionByRkey(tx, did, rkey); err != nil { 1743 - return fmt.Errorf("failed to clear existing reaction: %w", err) 1744 - } 1745 - if err := db.AddReaction(tx, did, subjectUri, kind, rkey, created); err != nil { 1746 - return fmt.Errorf("failed to add reaction: %w", err) 1736 + reaction := models.Reaction{ 1737 + ReactedByDid: did, 1738 + Rkey: rkey, 1739 + ThreadAt: subjectUri, 1740 + Kind: kind, 1741 + Created: created, 1747 1742 } 1748 - 1749 - if err := tx.Commit(); err != nil { 1750 - return err 1743 + if err := db.UpsertReaction(i.Db, reaction); err != nil { 1744 + return fmt.Errorf("failed to upsert reaction: %w", err) 1751 1745 } 1752 1746 1753 1747 case jmodels.CommitOperationDelete:
+9
appview/models/follow.go
··· 2 2 3 3 import ( 4 4 "time" 5 + 6 + "tangled.org/core/api/tangled" 5 7 ) 6 8 7 9 type Follow struct { ··· 9 11 SubjectDid string 10 12 FollowedAt time.Time 11 13 Rkey string 14 + } 15 + 16 + func (f *Follow) AsRecord() tangled.GraphFollow { 17 + return tangled.GraphFollow{ 18 + Subject: f.SubjectDid, 19 + CreatedAt: f.FollowedAt.Format(time.RFC3339), 20 + } 12 21 } 13 22 14 23 type FollowStats struct {
+8
appview/models/reaction.go
··· 59 59 Kind ReactionKind 60 60 } 61 61 62 + func (r *Reaction) AsRecord() tangled.FeedReaction { 63 + return tangled.FeedReaction{ 64 + Subject: r.ThreadAt.String(), 65 + Reaction: r.Kind.String(), 66 + CreatedAt: r.Created.Format(time.RFC3339), 67 + } 68 + } 69 + 62 70 type ReactionDisplayData struct { 63 71 Count int 64 72 Users []string
+27 -18
appview/state/follow.go
··· 43 43 44 44 switch r.Method { 45 45 case http.MethodPost: 46 - createdAt := time.Now().Format(time.RFC3339) 47 - rkey := tid.TID() 46 + follow := models.Follow{ 47 + UserDid: currentUser.Did, 48 + SubjectDid: subjectIdent.DID.String(), 49 + Rkey: tid.TID(), 50 + FollowedAt: time.Now(), 51 + } 52 + 53 + tx, err := s.db.BeginTx(r.Context(), nil) 54 + if err != nil { 55 + s.logger.Error("failed to start transaction", "err", err) 56 + return 57 + } 58 + defer tx.Rollback() 59 + 60 + if err := db.UpsertFollow(tx, follow); err != nil { 61 + s.logger.Error("failed to follow", "err", err) 62 + return 63 + } 64 + 65 + record := follow.AsRecord() 48 66 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 49 67 Collection: tangled.GraphFollowNSID, 50 68 Repo: currentUser.Did, 51 - Rkey: rkey, 69 + Rkey: follow.Rkey, 52 70 Record: &lexutil.LexiconTypeDecoder{ 53 - Val: &tangled.GraphFollow{ 54 - Subject: subjectIdent.DID.String(), 55 - CreatedAt: createdAt, 56 - }}, 71 + Val: &record, 72 + }, 57 73 }) 58 74 if err != nil { 59 75 l.Error("failed to create atproto record", "err", err) ··· 62 78 63 79 l.Info("created atproto record", "uri", resp.Uri) 64 80 65 - follow := &models.Follow{ 66 - UserDid: currentUser.Did, 67 - SubjectDid: subjectIdent.DID.String(), 68 - Rkey: rkey, 81 + if err := tx.Commit(); err != nil { 82 + s.logger.Error("failed to commit transaction", "err", err) 83 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 69 84 } 70 85 71 - err = db.AddFollow(s.db, follow) 72 - if err != nil { 73 - l.Error("failed to follow", "err", err) 74 - return 75 - } 76 - 77 - s.notifier.NewFollow(r.Context(), follow) 86 + s.notifier.NewFollow(r.Context(), &follow) 78 87 79 88 followStats, err := db.GetFollowerFollowingCount(s.db, subjectIdent.DID.String()) 80 89 if err != nil {
+28 -15
appview/state/reaction.go
··· 48 48 49 49 switch r.Method { 50 50 case http.MethodPost: 51 - createdAt := time.Now() 52 - rkey := tid.TID() 51 + reaction := models.Reaction{ 52 + ReactedByDid: currentUser.Did, 53 + Rkey: tid.TID(), 54 + Kind: reactionKind, 55 + ThreadAt: subjectUri, 56 + Created: time.Now(), 57 + } 58 + 59 + tx, err := s.db.BeginTx(r.Context(), nil) 60 + if err != nil { 61 + s.logger.Error("failed to start transaction", "err", err) 62 + return 63 + } 64 + defer tx.Rollback() 65 + 66 + if err := db.UpsertReaction(tx, reaction); err != nil { 67 + l.Error("db: failed to upsert reaction", "err", err) 68 + return 69 + } 70 + 71 + record := reaction.AsRecord() 53 72 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 54 73 Collection: tangled.FeedReactionNSID, 55 74 Repo: currentUser.Did, 56 - Rkey: rkey, 75 + Rkey: reaction.Rkey, 57 76 Record: &lexutil.LexiconTypeDecoder{ 58 - Val: &tangled.FeedReaction{ 59 - Subject: subjectUri.String(), 60 - Reaction: reactionKind.String(), 61 - CreatedAt: createdAt.Format(time.RFC3339), 62 - }, 77 + Val: &record, 63 78 }, 64 79 }) 65 80 if err != nil { 66 81 l.Error("failed to create atproto record", "err", err) 67 82 return 68 83 } 84 + l.Info("created atproto record", "uri", resp.Uri) 69 85 70 - err = db.AddReaction(s.db, currentUser.Did, subjectUri, reactionKind, rkey, createdAt) 71 - if err != nil { 72 - l.Error("failed to react", "err", err) 73 - return 86 + if err := tx.Commit(); err != nil { 87 + s.logger.Error("failed to commit transaction", "err", err) 88 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 74 89 } 75 90 76 91 reactionMap, err := db.GetReactionMap(s.db, 20, subjectUri) 77 92 if err != nil { 78 - l.Error("failed to get reactions", "subjectUri", subjectUri, "err", err) 93 + l.Error("failed to get reactions", "subject", subjectUri) 79 94 } 80 - 81 - l.Info("created atproto record", "uri", resp.Uri) 82 95 83 96 s.pages.ThreadReactionFragment(w, pages.ThreadReactionFragmentParams{ 84 97 Kind: reactionKind,
+30 -21
appview/state/star.go
··· 77 77 78 78 switch r.Method { 79 79 case http.MethodPost: 80 - createdAt := time.Now().Format(time.RFC3339) 81 - rkey := tid.TID() 80 + star := models.Star{ 81 + Did: currentUser.Did, 82 + Rkey: tid.TID(), 83 + SubjectType: subjectType, 84 + Subject: subjectKey, 85 + Created: time.Now(), 86 + } 82 87 83 - starRecord := &tangled.FeedStar{ 84 - CreatedAt: createdAt, 85 - Subject: starSubject, 88 + tx, err := s.db.BeginTx(r.Context(), nil) 89 + if err != nil { 90 + l.Error("failed to start transaction", "err", err) 91 + return 92 + } 93 + defer tx.Rollback() 94 + 95 + if err := db.UpsertStar(tx, star); err != nil { 96 + l.Error("failed to star", "err", err) 97 + return 86 98 } 87 99 88 100 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 89 101 Collection: tangled.FeedStarNSID, 90 102 Repo: currentUser.Did, 91 - Rkey: rkey, 92 - Record: &lexutil.LexiconTypeDecoder{Val: starRecord}, 103 + Rkey: star.Rkey, 104 + Record: &lexutil.LexiconTypeDecoder{ 105 + Val: &tangled.FeedStar{ 106 + CreatedAt: star.Created.Format(time.RFC3339), 107 + Subject: starSubject, 108 + }, 109 + }, 93 110 }) 94 111 if err != nil { 95 112 l.Error("failed to create atproto record", "err", err) ··· 97 114 } 98 115 l.Info("created atproto record", "uri", resp.Uri) 99 116 100 - star := &models.Star{ 101 - Did: currentUser.Did, 102 - SubjectType: subjectType, 103 - Subject: subjectKey, 104 - Rkey: rkey, 117 + if err := tx.Commit(); err != nil { 118 + l.Error("failed to commit transaction", "err", err) 119 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 105 120 } 106 121 107 - err = db.AddStar(s.db, star) 108 - if err != nil { 109 - l.Error("failed to star", "err", err) 110 - return 111 - } 122 + s.notifier.NewStar(r.Context(), &star) 112 123 113 124 starCount, err := db.GetStarCount(s.db, subjectType, subjectKey) 114 125 if err != nil { 115 126 l.Error("failed to get star count", "subject", subjectKey, "err", err) 116 127 } 117 - 118 - s.notifier.NewStar(r.Context(), star) 119 128 120 129 s.pages.StarBtnFragment(w, pages.StarBtnFragmentParams{ 121 130 IsStarred: true, ··· 162 171 } 163 172 164 173 s.notifier.DeleteStar(r.Context(), &models.Star{ 165 - Did: currentUser.Did, 174 + Did: currentUser.Did, 166 175 SubjectType: subjectType, 167 - Subject: subjectKey, 176 + Subject: subjectKey, 168 177 // Rkey 169 178 // Created 170 179 })