Monorepo for Tangled tangled.org
8

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "strings" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/eventconsumer" 13 "tangled.org/core/rbac" 14 "tangled.org/core/spindle/db" 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 "github.com/bluesky-social/indigo/atproto/identity" 18 "github.com/bluesky-social/indigo/atproto/syntax" 19 "github.com/bluesky-social/indigo/xrpc" 20 "github.com/bluesky-social/jetstream/pkg/models" 21 securejoin "github.com/cyphar/filepath-securejoin" 22) 23 24type Ingester func(ctx context.Context, e *models.Event) error 25 26func (s *Spindle) ingest() Ingester { 27 return func(ctx context.Context, e *models.Event) error { 28 if e.Kind != models.EventKindCommit { 29 return nil 30 } 31 32 var err error 33 switch e.Commit.Collection { 34 case tangled.SpindleMemberNSID: 35 err = s.ingestMember(ctx, e) 36 case tangled.RepoNSID: 37 err = s.ingestRepo(ctx, e) 38 case tangled.RepoCollaboratorNSID: 39 err = s.ingestCollaborator(ctx, e) 40 } 41 42 if err != nil { 43 s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err) 44 } 45 46 lastTimeUs := e.TimeUS + 1 47 if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 48 s.l.Error("failed to save cursor", "err", saveErr) 49 } 50 51 return nil 52 } 53} 54 55func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 56 var err error 57 did := e.Did 58 rkey := e.Commit.RKey 59 60 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 61 62 switch e.Commit.Operation { 63 case models.CommitOperationCreate, models.CommitOperationUpdate: 64 raw := e.Commit.Record 65 record := tangled.SpindleMember{} 66 err = json.Unmarshal(raw, &record) 67 if err != nil { 68 l.Error("invalid record", "error", err) 69 return err 70 } 71 72 domain := s.cfg.Server.Hostname 73 recordInstance := record.Instance 74 75 if recordInstance != domain { 76 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 77 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 78 } 79 80 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 81 if err != nil || !ok { 82 l.Error("failed to add member", "did", did, "error", err) 83 return fmt.Errorf("failed to enforce permissions: %w", err) 84 } 85 86 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 87 Did: syntax.DID(did), 88 Rkey: rkey, 89 Instance: recordInstance, 90 Subject: syntax.DID(record.Subject), 91 Created: time.Now(), 92 }); err != nil { 93 l.Error("failed to add member", "error", err) 94 return fmt.Errorf("failed to add member: %w", err) 95 } 96 97 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 98 l.Error("failed to add member", "error", err) 99 return fmt.Errorf("failed to add member: %w", err) 100 } 101 l.Info("added member from firehose", "member", record.Subject) 102 103 if err := s.db.AddDid(record.Subject); err != nil { 104 l.Error("failed to add did", "error", err) 105 return fmt.Errorf("failed to add did: %w", err) 106 } 107 s.jc.AddDid(record.Subject) 108 109 return nil 110 111 case models.CommitOperationDelete: 112 record, err := db.GetSpindleMember(s.db, did, rkey) 113 if err != nil { 114 l.Error("failed to find member", "error", err) 115 return fmt.Errorf("failed to find member: %w", err) 116 } 117 118 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 119 l.Error("failed to remove member", "error", err) 120 return fmt.Errorf("failed to remove member: %w", err) 121 } 122 123 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 124 l.Error("failed to add member", "error", err) 125 return fmt.Errorf("failed to add member: %w", err) 126 } 127 l.Info("added member from firehose", "member", record.Subject) 128 129 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 130 l.Error("failed to add did", "error", err) 131 return fmt.Errorf("failed to add did: %w", err) 132 } 133 s.jc.RemoveDid(record.Subject.String()) 134 135 } 136 return nil 137} 138 139func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 140 var err error 141 did := e.Did 142 143 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 144 145 l.Info("ingesting repo record", "did", did) 146 147 switch e.Commit.Operation { 148 case models.CommitOperationCreate, models.CommitOperationUpdate: 149 raw := e.Commit.Record 150 record := tangled.Repo{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 l.Error("invalid record", "error", err) 154 return err 155 } 156 157 domain := s.cfg.Server.Hostname 158 rkey := e.Commit.RKey 159 160 // no spindle configured for this repo 161 if record.Spindle == nil { 162 l.Info("no spindle configured", "rkey", rkey) 163 return nil 164 } 165 166 // this repo did not want this spindle 167 if *record.Spindle != domain { 168 l.Info("different spindle configured", "rkey", rkey, "spindle", *record.Spindle, "domain", domain) 169 return nil 170 } 171 172 // add this repo to the watch list 173 if err := s.db.AddRepo(record.Knot, did, rkey); err != nil { 174 l.Error("failed to add repo", "error", err) 175 return fmt.Errorf("failed to add repo: %w", err) 176 } 177 178 didSlashRepo, err := securejoin.SecureJoin(did, rkey) 179 if err != nil { 180 return err 181 } 182 183 // add repo to rbac 184 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil { 185 l.Error("failed to add repo to enforcer", "error", err) 186 return fmt.Errorf("failed to add repo: %w", err) 187 } 188 189 // add collaborators to rbac 190 owner, err := s.res.ResolveIdent(ctx, did) 191 if err != nil || owner.Handle.IsInvalidHandle() { 192 return err 193 } 194 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 195 return err 196 } 197 198 // add this knot to the event consumer 199 src := eventconsumer.NewKnotSource(record.Knot) 200 s.ks.AddSource(context.Background(), src) 201 202 return nil 203 204 } 205 return nil 206} 207 208func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 209 var err error 210 211 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 212 213 l.Info("ingesting collaborator record") 214 215 switch e.Commit.Operation { 216 case models.CommitOperationCreate, models.CommitOperationUpdate: 217 raw := e.Commit.Record 218 record := tangled.RepoCollaborator{} 219 err = json.Unmarshal(raw, &record) 220 if err != nil { 221 l.Error("invalid record", "error", err) 222 return err 223 } 224 225 subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 226 if err != nil || subjectId.Handle.IsInvalidHandle() { 227 return err 228 } 229 230 var rbacResource string 231 var ownerDid string 232 switch { 233 case strings.HasPrefix(record.Repo, "did:"): 234 resolvedOwner, repoName, lookupErr := s.resolveRepoDid(ctx, e.Did, record.Repo) 235 if lookupErr != nil { 236 return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr) 237 } 238 ownerDid = resolvedOwner 239 rbacResource, _ = securejoin.SecureJoin(ownerDid, repoName) 240 241 case strings.Contains(record.Repo, "/"): 242 repoAt, parseErr := syntax.ParseATURI(record.Repo) 243 if parseErr != nil { 244 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 245 return nil 246 } 247 248 owner, resolveErr := s.res.ResolveIdent(ctx, repoAt.Authority().String()) 249 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 250 return fmt.Errorf("failed to resolve handle: %w", resolveErr) 251 } 252 253 xrpcc := xrpc.Client{ 254 Host: owner.PDSEndpoint(), 255 } 256 257 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 258 if getErr != nil { 259 return getErr 260 } 261 262 if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 263 return fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 264 } 265 rbacResource, _ = securejoin.SecureJoin(owner.DID.String(), repoAt.RecordKey().String()) 266 ownerDid = owner.DID.String() 267 268 default: 269 l.Info("rejecting collaborator record with unrecognized repo format", "repo", record.Repo) 270 return nil 271 } 272 273 if ok, err := s.e.IsCollaboratorInviteAllowed(ownerDid, rbac.ThisServer, rbacResource); !ok || err != nil { 274 return fmt.Errorf("insufficient permissions: %w", err) 275 } 276 277 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, rbacResource); err != nil { 278 l.Error("failed to add collaborator to enforcer", "error", err) 279 return fmt.Errorf("failed to add collaborator: %w", err) 280 } 281 282 return nil 283 } 284 return nil 285} 286 287func (s *Spindle) resolveRepoDid(ctx context.Context, ownerDid string, repoDid string) (string, string, error) { 288 owner, resolveErr := s.res.ResolveIdent(ctx, ownerDid) 289 if resolveErr != nil || owner.Handle.IsInvalidHandle() { 290 return "", "", fmt.Errorf("failed to resolve owner %s: %w", ownerDid, resolveErr) 291 } 292 293 xrpcc := xrpc.Client{ 294 Host: owner.PDSEndpoint(), 295 } 296 297 cursor := "" 298 for { 299 resp, listErr := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoNSID, cursor, 100, ownerDid, false) 300 if listErr != nil { 301 return "", "", fmt.Errorf("failed to list repo records for %s: %w", ownerDid, listErr) 302 } 303 304 for _, r := range resp.Records { 305 if r == nil { 306 continue 307 } 308 repo, ok := r.Value.Val.(*tangled.Repo) 309 if !ok { 310 continue 311 } 312 if repo.RepoDid != nil && *repo.RepoDid == repoDid { 313 rkey := r.Uri[strings.LastIndex(r.Uri, "/")+1:] 314 return ownerDid, rkey, nil 315 } 316 } 317 318 if resp.Cursor == nil || *resp.Cursor == "" { 319 break 320 } 321 cursor = *resp.Cursor 322 } 323 324 return "", "", fmt.Errorf("repo DID %s not found in records for %s", repoDid, ownerDid) 325} 326 327func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 328 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 329 330 l.Info("fetching and adding existing collaborators") 331 332 xrpcc := xrpc.Client{ 333 Host: owner.PDSEndpoint(), 334 } 335 336 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 337 if err != nil { 338 return err 339 } 340 341 var errs error 342 for _, r := range resp.Records { 343 if r == nil { 344 continue 345 } 346 record := r.Value.Val.(*tangled.RepoCollaborator) 347 348 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 349 l.Error("failed to add repo to enforcer", "error", err) 350 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 351 } 352 } 353 354 return errs 355}