Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

at icy/yrolzt 7.2 kB View raw
1package spindle 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/spindle/db" 12 "tangled.org/core/tapc" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/jetstream/pkg/models" 16) 17 18type Ingester func(ctx context.Context, e *models.Event) error 19 20func (s *Spindle) ingest() Ingester { 21 return func(ctx context.Context, e *models.Event) error { 22 if e.Kind != models.EventKindCommit { 23 return nil 24 } 25 26 var err error 27 switch e.Commit.Collection { 28 case tangled.SpindleMemberNSID: 29 err = s.ingestMember(ctx, e) 30 case tangled.RepoNSID, tangled.RepoCollaboratorNSID: 31 if evt, ok := jetstreamToTapEvent(e); ok { 32 err = s.tap.processEvent(ctx, evt) 33 } 34 } 35 36 if err != nil { 37 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey, "err", err) 38 } 39 40 lastTimeUs := e.TimeUS + 1 41 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 42 s.l.Error("failed to save cursor", "err", saveErr) 43 } 44 45 return nil 46 } 47} 48 49func jetstreamToTapEvent(e *models.Event) (tapc.Event, bool) { 50 if e.Commit == nil { 51 return tapc.Event{}, false 52 } 53 did, err := syntax.ParseDID(e.Did) 54 if err != nil { 55 return tapc.Event{}, false 56 } 57 var action tapc.RecordAction 58 switch e.Commit.Operation { 59 case models.CommitOperationCreate: 60 action = tapc.RecordCreateAction 61 case models.CommitOperationUpdate: 62 action = tapc.RecordUpdateAction 63 case models.CommitOperationDelete: 64 action = tapc.RecordDeleteAction 65 default: 66 return tapc.Event{}, false 67 } 68 return tapc.Event{ 69 Type: tapc.EvtRecord, 70 Record: &tapc.RecordEventData{ 71 Did: did, 72 Rkey: syntax.RecordKey(e.Commit.RKey), 73 Collection: syntax.NSID(e.Commit.Collection), 74 Action: action, 75 Record: e.Commit.Record, 76 }, 77 }, true 78} 79 80func (s *Spindle) ingestMember(ctx context.Context, e *models.Event) error { 81 did := e.Did 82 rkey := e.Commit.RKey 83 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID, "did", did, "rkey", rkey) 84 85 switch e.Commit.Operation { 86 case models.CommitOperationCreate, models.CommitOperationUpdate: 87 raw := e.Commit.Record 88 record := tangled.SpindleMember{} 89 if err := json.Unmarshal(raw, &record); err != nil { 90 return fmt.Errorf("invalid record: %w", err) 91 } 92 93 domain := s.cfg.Server.Hostname 94 recordInstance := record.Instance 95 96 if recordInstance != domain { 97 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 98 } 99 100 subject, err := syntax.ParseDID(record.Subject) 101 if err != nil { 102 return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err) 103 } 104 105 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 106 if err != nil { 107 return fmt.Errorf("failed to enforce permissions: %w", err) 108 } 109 if !ok { 110 return fmt.Errorf("permission denied for %s", did) 111 } 112 113 sqlTx, err := s.db.BeginTx(ctx, nil) 114 if err != nil { 115 return fmt.Errorf("failed to start txn: %w", err) 116 } 117 committed := false 118 defer func() { 119 if !committed { 120 sqlTx.Rollback() 121 } 122 }() 123 124 existing, err := db.GetSpindleMember(sqlTx, did, rkey) 125 if err != nil && !errors.Is(err, sql.ErrNoRows) { 126 return fmt.Errorf("failed to look up existing member: %w", err) 127 } 128 129 var staleSubject string 130 if existing != nil && existing.Subject != subject { 131 staleSubject = existing.Subject.String() 132 if err := db.RemoveSpindleMember(sqlTx, did, rkey); err != nil { 133 return fmt.Errorf("failed to remove stale member row: %w", err) 134 } 135 } 136 137 if err := db.AddSpindleMember(sqlTx, db.SpindleMember{ 138 Did: syntax.DID(did), 139 Rkey: rkey, 140 Instance: recordInstance, 141 Subject: subject, 142 }); err != nil { 143 return fmt.Errorf("failed to add member: %w", err) 144 } 145 146 if err := db.AddDid(sqlTx, subject.String()); err != nil { 147 return fmt.Errorf("failed to add did: %w", err) 148 } 149 150 dropStaleAcl := false 151 var staleDidDropped bool 152 if staleSubject != "" { 153 remaining, err := db.CountSpindleMembersBySubject(sqlTx, staleSubject) 154 if err != nil { 155 return fmt.Errorf("failed to count stale subject rows: %w", err) 156 } 157 if remaining == 0 { 158 dropStaleAcl = true 159 stillNeeded, err := s.e.WouldHaveAnyPolicyExcludingSpindleMember(staleSubject, rbacDomain) 160 if err != nil { 161 return fmt.Errorf("failed to check residual policies for stale subject: %w", err) 162 } 163 if !stillNeeded { 164 if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 165 return fmt.Errorf("failed to remove stale did: %w", err) 166 } 167 staleDidDropped = true 168 } 169 } 170 l.Info("replaced stale spindle member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped) 171 } 172 173 if err := sqlTx.Commit(); err != nil { 174 return fmt.Errorf("failed to commit txn: %w", err) 175 } 176 committed = true 177 178 if dropStaleAcl { 179 if _, err := s.e.TryRemoveSpindleMember(rbacDomain, staleSubject); err != nil { 180 l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err) 181 } 182 } 183 if _, err := s.e.TryAddSpindleMember(rbacDomain, subject.String()); err != nil { 184 l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err) 185 } 186 187 if staleDidDropped { 188 s.jc.RemoveDid(staleSubject) 189 } 190 s.jc.AddDid(subject.String()) 191 l.Info("added member from firehose", "member", subject) 192 return nil 193 194 case models.CommitOperationDelete: 195 sqlTx, err := s.db.BeginTx(ctx, nil) 196 if err != nil { 197 return fmt.Errorf("failed to start txn: %w", err) 198 } 199 committed := false 200 defer func() { 201 if !committed { 202 sqlTx.Rollback() 203 } 204 }() 205 206 record, err := db.GetSpindleMember(sqlTx, did, rkey) 207 if errors.Is(err, sql.ErrNoRows) { 208 l.Info("spindle member already removed") 209 return nil 210 } 211 if err != nil { 212 return fmt.Errorf("failed to find member: %w", err) 213 } 214 215 staleSubject := record.Subject.String() 216 217 if err := db.RemoveSpindleMember(sqlTx, did, rkey); err != nil { 218 return fmt.Errorf("failed to remove member: %w", err) 219 } 220 221 remaining, err := db.CountSpindleMembersBySubject(sqlTx, staleSubject) 222 if err != nil { 223 return fmt.Errorf("failed to count remaining member rows: %w", err) 224 } 225 226 dropAcl := false 227 var staleDidDropped bool 228 if remaining == 0 { 229 dropAcl = true 230 stillNeeded, err := s.e.WouldHaveAnyPolicyExcludingSpindleMember(staleSubject, rbacDomain) 231 if err != nil { 232 return fmt.Errorf("failed to check residual policies: %w", err) 233 } 234 if !stillNeeded { 235 if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 236 return fmt.Errorf("failed to remove did: %w", err) 237 } 238 staleDidDropped = true 239 } 240 } 241 242 if err := sqlTx.Commit(); err != nil { 243 return fmt.Errorf("failed to commit txn: %w", err) 244 } 245 committed = true 246 247 if dropAcl { 248 if _, err := s.e.TryRemoveSpindleMember(rbacDomain, staleSubject); err != nil { 249 l.Error("post-commit: failed to remove member ACL", "subject", staleSubject, "err", err) 250 } 251 } 252 253 if staleDidDropped { 254 s.jc.RemoveDid(staleSubject) 255 } 256 l.Info("removed member from firehose", "member", record.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped) 257 } 258 return nil 259}