Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "tangled.org/core/api/tangled" 10 "tangled.org/core/spindle/db" 11 "tangled.org/core/tapc" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/jetstream/pkg/models" 15) 16 17type Ingester func(ctx context.Context, e *models.Event) error 18 19func (s *Spindle) ingest() Ingester { 20 return func(ctx context.Context, e *models.Event) error { 21 if e.Kind != models.EventKindCommit { 22 return nil 23 } 24 25 var err error 26 switch e.Commit.Collection { 27 case tangled.SpindleMemberNSID: 28 err = s.ingestMember(ctx, e) 29 case tangled.RepoNSID, tangled.RepoCollaboratorNSID: 30 if evt, ok := jetstreamToTapEvent(e); ok { 31 err = s.tap.processEvent(ctx, evt) 32 } 33 } 34 35 if err != nil { 36 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err) 37 } 38 39 lastTimeUs := e.TimeUS + 1 40 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 41 s.l.Error("failed to save cursor", "err", saveErr) 42 } 43 44 return nil 45 } 46} 47 48func jetstreamToTapEvent(e *models.Event) (tapc.Event, bool) { 49 if e.Commit == nil { 50 return tapc.Event{}, false 51 } 52 did, err := syntax.ParseDID(e.Did) 53 if err != nil { 54 return tapc.Event{}, false 55 } 56 var action tapc.RecordAction 57 switch e.Commit.Operation { 58 case models.CommitOperationCreate: 59 action = tapc.RecordCreateAction 60 case models.CommitOperationUpdate: 61 action = tapc.RecordUpdateAction 62 case models.CommitOperationDelete: 63 action = tapc.RecordDeleteAction 64 default: 65 return tapc.Event{}, false 66 } 67 return tapc.Event{ 68 Type: tapc.EvtRecord, 69 Record: &tapc.RecordEventData{ 70 Did: did, 71 Rkey: syntax.RecordKey(e.Commit.RKey), 72 Collection: syntax.NSID(e.Commit.Collection), 73 Action: action, 74 Record: e.Commit.Record, 75 }, 76 }, true 77} 78 79func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 80 var err error 81 did := e.Did 82 rkey := e.Commit.RKey 83 84 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 85 86 switch e.Commit.Operation { 87 case models.CommitOperationCreate, models.CommitOperationUpdate: 88 raw := e.Commit.Record 89 record := tangled.SpindleMember{} 90 err = json.Unmarshal(raw, &record) 91 if err != nil { 92 l.Error("invalid record", "error", err) 93 return err 94 } 95 96 domain := s.cfg.Server.Hostname 97 recordInstance := record.Instance 98 99 if recordInstance != domain { 100 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 101 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 102 } 103 104 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 105 if err != nil || !ok { 106 l.Error("failed to add member", "did", did, "error", err) 107 return fmt.Errorf("failed to enforce permissions: %w", err) 108 } 109 110 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 111 Did: syntax.DID(did), 112 Rkey: rkey, 113 Instance: recordInstance, 114 Subject: syntax.DID(record.Subject), 115 Created: time.Now(), 116 }); err != nil { 117 l.Error("failed to add member", "error", err) 118 return fmt.Errorf("failed to add member: %w", err) 119 } 120 121 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 122 l.Error("failed to add member", "error", err) 123 return fmt.Errorf("failed to add member: %w", err) 124 } 125 l.Info("added member from firehose", "member", record.Subject) 126 127 if err := s.db.AddDid(record.Subject); err != nil { 128 l.Error("failed to add did", "error", err) 129 return fmt.Errorf("failed to add did: %w", err) 130 } 131 s.jc.AddDid(record.Subject) 132 133 return nil 134 135 case models.CommitOperationDelete: 136 record, err := db.GetSpindleMember(s.db, did, rkey) 137 if err != nil { 138 l.Error("failed to find member", "error", err) 139 return fmt.Errorf("failed to find member: %w", err) 140 } 141 142 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 143 l.Error("failed to remove member", "error", err) 144 return fmt.Errorf("failed to remove member: %w", err) 145 } 146 147 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 148 l.Error("failed to add member", "error", err) 149 return fmt.Errorf("failed to add member: %w", err) 150 } 151 l.Info("added member from firehose", "member", record.Subject) 152 153 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 154 l.Error("failed to add did", "error", err) 155 return fmt.Errorf("failed to add did: %w", err) 156 } 157 s.jc.RemoveDid(record.Subject.String()) 158 159 } 160 return nil 161}