Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "tangled.org/core/api/tangled" 10 "tangled.org/core/spindle/db" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/bluesky-social/jetstream/pkg/models" 14) 15 16type Ingester func(ctx context.Context, e *models.Event) error 17 18func (s *Spindle) ingest() Ingester { 19 return func(ctx context.Context, e *models.Event) error { 20 if e.Kind != models.EventKindCommit { 21 return nil 22 } 23 24 var err error 25 switch e.Commit.Collection { 26 case tangled.SpindleMemberNSID: 27 err = s.ingestMember(ctx, e) 28 } 29 30 if err != nil { 31 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err) 32 } 33 34 lastTimeUs := e.TimeUS + 1 35 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 36 s.l.Error("failed to save cursor", "err", saveErr) 37 } 38 39 return nil 40 } 41} 42 43func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 44 var err error 45 did := e.Did 46 rkey := e.Commit.RKey 47 48 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 49 50 switch e.Commit.Operation { 51 case models.CommitOperationCreate, models.CommitOperationUpdate: 52 raw := e.Commit.Record 53 record := tangled.SpindleMember{} 54 err = json.Unmarshal(raw, &record) 55 if err != nil { 56 l.Error("invalid record", "error", err) 57 return err 58 } 59 60 domain := s.cfg.Server.Hostname 61 recordInstance := record.Instance 62 63 if recordInstance != domain { 64 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 65 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 66 } 67 68 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 69 if err != nil || !ok { 70 l.Error("failed to add member", "did", did, "error", err) 71 return fmt.Errorf("failed to enforce permissions: %w", err) 72 } 73 74 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 75 Did: syntax.DID(did), 76 Rkey: rkey, 77 Instance: recordInstance, 78 Subject: syntax.DID(record.Subject), 79 Created: time.Now(), 80 }); err != nil { 81 l.Error("failed to add member", "error", err) 82 return fmt.Errorf("failed to add member: %w", err) 83 } 84 85 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 86 l.Error("failed to add member", "error", err) 87 return fmt.Errorf("failed to add member: %w", err) 88 } 89 l.Info("added member from firehose", "member", record.Subject) 90 91 if err := s.db.AddDid(record.Subject); err != nil { 92 l.Error("failed to add did", "error", err) 93 return fmt.Errorf("failed to add did: %w", err) 94 } 95 s.jc.AddDid(record.Subject) 96 97 return nil 98 99 case models.CommitOperationDelete: 100 record, err := db.GetSpindleMember(s.db, did, rkey) 101 if err != nil { 102 l.Error("failed to find member", "error", err) 103 return fmt.Errorf("failed to find member: %w", err) 104 } 105 106 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 107 l.Error("failed to remove member", "error", err) 108 return fmt.Errorf("failed to remove member: %w", err) 109 } 110 111 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 112 l.Error("failed to add member", "error", err) 113 return fmt.Errorf("failed to add member: %w", err) 114 } 115 l.Info("added member from firehose", "member", record.Subject) 116 117 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 118 l.Error("failed to add did", "error", err) 119 return fmt.Errorf("failed to add did: %w", err) 120 } 121 s.jc.RemoveDid(record.Subject.String()) 122 123 } 124 return nil 125}