Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver: stop ingesting member & collaborator records from firehose

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 16, 2026, 9:04 PM +0300) commit 88aad293 parent ba857c7c change-id mrqultzt
+7 -365
-40
knotserver/db/member.go
··· 11 11 type KnotMember struct { 12 12 Id int 13 13 Did syntax.DID 14 - Rkey string 15 14 Subject syntax.DID 16 15 Created string 17 16 } ··· 93 92 func(m KnotMember) int { return m.Id }, 94 93 ) 95 94 } 96 - 97 - func AddKnotMember(q DBTX, member KnotMember) error { 98 - _, err := q.Exec( 99 - `insert or ignore into knot_members (did, rkey, subject) values (?, ?, ?)`, 100 - member.Did, 101 - member.Rkey, 102 - member.Subject, 103 - ) 104 - return err 105 - } 106 - 107 - func RemoveKnotMember(q DBTX, ownerDid, rkey string) error { 108 - _, err := q.Exec( 109 - "delete from knot_members where did = ? and rkey = ?", 110 - ownerDid, 111 - rkey, 112 - ) 113 - return err 114 - } 115 - 116 - func GetKnotMember(q DBTX, did, rkey string) (*KnotMember, error) { 117 - query := 118 - `select id, did, rkey, subject 119 - from knot_members 120 - where did = ? and rkey = ?` 121 - 122 - var member KnotMember 123 - err := q.QueryRow(query, did, rkey).Scan( 124 - &member.Id, 125 - &member.Did, 126 - &member.Rkey, 127 - &member.Subject, 128 - ) 129 - if err != nil { 130 - return nil, err 131 - } 132 - 133 - return &member, nil 134 - }
-313
knotserver/ingester.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "database/sql" 6 5 "encoding/json" 7 - "errors" 8 6 "fmt" 9 7 "io" 10 8 "net/http" ··· 15 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 14 "github.com/bluesky-social/indigo/atproto/syntax" 17 15 "github.com/bluesky-social/indigo/xrpc" 18 - indigoxrpc "github.com/bluesky-social/indigo/xrpc" 19 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 20 17 "tangled.org/core/api/tangled" 21 18 "tangled.org/core/appview/models" ··· 24 21 "tangled.org/core/knotserver/git" 25 22 knotxrpc "tangled.org/core/knotserver/xrpc" 26 23 "tangled.org/core/log" 27 - "tangled.org/core/rbac" 28 24 "tangled.org/core/tid" 29 25 "tangled.org/core/workflow" 30 26 ) ··· 51 47 return nil 52 48 } 53 49 54 - func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 55 - did := event.Did 56 - rkey := event.Commit.RKey 57 - l := log.FromContext(ctx).With("handler", "processKnotMember", "did", did, "rkey", rkey) 58 - 59 - switch event.Commit.Operation { 60 - case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 61 - raw := json.RawMessage(event.Commit.Record) 62 - var record tangled.KnotMember 63 - if err := json.Unmarshal(raw, &record); err != nil { 64 - return fmt.Errorf("failed to unmarshal record: %w", err) 65 - } 66 - 67 - if record.Domain != h.c.Server.Hostname { 68 - return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 69 - } 70 - 71 - subject, err := syntax.ParseDID(record.Subject) 72 - if err != nil { 73 - return fmt.Errorf("invalid subject DID %q: %w", record.Subject, err) 74 - } 75 - 76 - ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 77 - if err != nil { 78 - return fmt.Errorf("failed to enforce permissions: %w", err) 79 - } 80 - if !ok { 81 - return fmt.Errorf("permission denied for %s", did) 82 - } 83 - 84 - sqlTx, err := h.db.BeginTx(ctx, nil) 85 - if err != nil { 86 - return fmt.Errorf("failed to start txn: %w", err) 87 - } 88 - committed := false 89 - defer func() { 90 - if !committed { 91 - sqlTx.Rollback() 92 - } 93 - }() 94 - 95 - existing, err := db.GetKnotMember(sqlTx, did, rkey) 96 - if err != nil && !errors.Is(err, sql.ErrNoRows) { 97 - return fmt.Errorf("failed to look up existing member: %w", err) 98 - } 99 - 100 - var staleSubject string 101 - if existing != nil && existing.Subject != subject { 102 - staleSubject = existing.Subject.String() 103 - if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 104 - return fmt.Errorf("failed to remove stale member row: %w", err) 105 - } 106 - } 107 - 108 - if err := db.AddKnotMember(sqlTx, db.KnotMember{ 109 - Did: syntax.DID(did), 110 - Rkey: rkey, 111 - Subject: subject, 112 - }); err != nil { 113 - return fmt.Errorf("failed to persist member row: %w", err) 114 - } 115 - 116 - if err := db.AddDid(sqlTx, subject.String()); err != nil { 117 - return fmt.Errorf("failed to add did: %w", err) 118 - } 119 - 120 - dropStaleAcl := false 121 - var staleDidDropped bool 122 - if staleSubject != "" { 123 - remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 124 - if err != nil { 125 - return fmt.Errorf("failed to count stale subject rows: %w", err) 126 - } 127 - if remaining == 0 { 128 - dropStaleAcl = true 129 - stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 130 - if err != nil { 131 - return fmt.Errorf("failed to check residual policies for stale subject: %w", err) 132 - } 133 - if !stillNeeded { 134 - if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 135 - return fmt.Errorf("failed to remove stale did: %w", err) 136 - } 137 - staleDidDropped = true 138 - } 139 - } 140 - l.Info("replaced stale knot member", "old_subject", staleSubject, "new_subject", subject, "stale_did_dropped", staleDidDropped) 141 - } 142 - 143 - if err := sqlTx.Commit(); err != nil { 144 - return fmt.Errorf("failed to commit txn: %w", err) 145 - } 146 - committed = true 147 - 148 - if dropStaleAcl { 149 - if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 150 - l.Error("post-commit: failed to remove stale ACL", "subject", staleSubject, "err", err) 151 - } 152 - } 153 - if _, err := h.e.TryAddKnotMember(rbac.ThisServer, subject.String()); err != nil { 154 - l.Error("post-commit: failed to add member ACL", "subject", subject, "err", err) 155 - } 156 - 157 - if staleDidDropped { 158 - h.jc.RemoveDid(staleSubject) 159 - } 160 - h.jc.AddDid(subject.String()) 161 - l.Info("added member from firehose", "member", subject) 162 - 163 - if err := h.fetchAndAddKeys(ctx, subject.String()); err != nil { 164 - return fmt.Errorf("failed to fetch and add keys: %w", err) 165 - } 166 - return nil 167 - 168 - case jmodels.CommitOperationDelete: 169 - sqlTx, err := h.db.BeginTx(ctx, nil) 170 - if err != nil { 171 - return fmt.Errorf("failed to start txn: %w", err) 172 - } 173 - committed := false 174 - defer func() { 175 - if !committed { 176 - sqlTx.Rollback() 177 - } 178 - }() 179 - 180 - member, err := db.GetKnotMember(sqlTx, did, rkey) 181 - if errors.Is(err, sql.ErrNoRows) { 182 - l.Info("knot member already removed") 183 - return nil 184 - } 185 - if err != nil { 186 - return fmt.Errorf("failed to look up knot member: %w", err) 187 - } 188 - 189 - staleSubject := member.Subject.String() 190 - 191 - if err := db.RemoveKnotMember(sqlTx, did, rkey); err != nil { 192 - return fmt.Errorf("failed to remove member row: %w", err) 193 - } 194 - 195 - remaining, err := db.CountKnotMembersBySubject(sqlTx, staleSubject) 196 - if err != nil { 197 - return fmt.Errorf("failed to count remaining member rows: %w", err) 198 - } 199 - 200 - dropAcl := false 201 - var staleDidDropped bool 202 - if remaining == 0 { 203 - dropAcl = true 204 - stillNeeded, err := h.e.WouldHaveAnyPolicyExcludingKnotMember(staleSubject, rbac.ThisServer) 205 - if err != nil { 206 - return fmt.Errorf("failed to check residual policies: %w", err) 207 - } 208 - if !stillNeeded { 209 - if err := db.RemoveDid(sqlTx, staleSubject); err != nil { 210 - return fmt.Errorf("failed to remove did: %w", err) 211 - } 212 - staleDidDropped = true 213 - } 214 - } 215 - 216 - if err := sqlTx.Commit(); err != nil { 217 - return fmt.Errorf("failed to commit txn: %w", err) 218 - } 219 - committed = true 220 - 221 - if dropAcl { 222 - if _, err := h.e.TryRemoveKnotMember(rbac.ThisServer, staleSubject); err != nil { 223 - l.Error("post-commit: failed to remove ACL", "subject", staleSubject, "err", err) 224 - } 225 - } 226 - 227 - if staleDidDropped { 228 - h.jc.RemoveDid(staleSubject) 229 - } 230 - l.Info("removed knot member from firehose", "member", member.Subject, "remaining_rows", remaining, "stale_did_dropped", staleDidDropped) 231 - return nil 232 - } 233 - 234 - return nil 235 - } 236 - 237 50 // returns a repo path on disk if present, and error if not 238 51 type targetRepo struct { 239 52 RepoPath string ··· 513 326 return h.db.InsertEvent(ev, h.n) 514 327 } 515 328 516 - // duplicated from add collaborator 517 - func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 518 - raw := json.RawMessage(event.Commit.Record) 519 - did := event.Did 520 - 521 - var record tangled.RepoCollaborator 522 - if err := json.Unmarshal(raw, &record); err != nil { 523 - return fmt.Errorf("failed to unmarshal record: %w", err) 524 - } 525 - 526 - subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 527 - if err != nil || subjectId.Handle.IsInvalidHandle() { 528 - return err 529 - } 530 - 531 - var rbacResource string 532 - switch { 533 - case strings.HasPrefix(record.Repo, "did:"): 534 - ownerDid, _, lookupErr := h.db.GetRepoKeyOwner(record.Repo) 535 - if lookupErr != nil { 536 - return fmt.Errorf("unknown repo DID %s: %w", record.Repo, lookupErr) 537 - } 538 - if ownerDid != did { 539 - return fmt.Errorf("collaborator record author %s does not own repo %s", did, record.Repo) 540 - } 541 - rbacResource = record.Repo 542 - 543 - case strings.Contains(record.Repo, "/"): 544 - // TODO: get rid of this PDS fetch once all repos have DIDs 545 - repoAt, parseErr := syntax.ParseATURI(record.Repo) 546 - if parseErr != nil { 547 - return parseErr 548 - } 549 - 550 - owner, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 551 - if resolveErr != nil || owner.Handle.IsInvalidHandle() { 552 - return fmt.Errorf("failed to resolve handle: %w", resolveErr) 553 - } 554 - 555 - xrpcc := xrpc.Client{ 556 - Host: owner.PDSEndpoint(), 557 - } 558 - 559 - resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 560 - if getErr != nil { 561 - return getErr 562 - } 563 - 564 - if _, ok := resp.Value.Val.(*tangled.Repo); !ok { 565 - return fmt.Errorf("record at %s is not a tangled.Repo", repoAt) 566 - } 567 - rkey := repoAt.RecordKey().String() 568 - repoDid, didErr := h.db.GetRepoDid(owner.DID.String(), rkey) 569 - if didErr != nil { 570 - return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", owner.DID.String(), rkey, didErr) 571 - } 572 - rbacResource = repoDid 573 - 574 - default: 575 - return fmt.Errorf("collaborator record has unrecognized repo format: %s", record.Repo) 576 - } 577 - 578 - ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, rbacResource) 579 - if err != nil { 580 - return fmt.Errorf("failed to check permissions: %w", err) 581 - } 582 - if !ok { 583 - return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", rbacResource) 584 - } 585 - 586 - if err := db.AddDid(h.db, subjectId.DID.String()); err != nil { 587 - return err 588 - } 589 - h.jc.AddDid(subjectId.DID.String()) 590 - 591 - if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, rbacResource); err != nil { 592 - return err 593 - } 594 - 595 - return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 596 - } 597 - 598 - func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 599 - l := log.FromContext(ctx) 600 - 601 - id, err := h.resolver.Directory().LookupDID(ctx, syntax.DID(did)) 602 - if err != nil { 603 - return fmt.Errorf("lookup did to fetch keys: %w", err) 604 - } 605 - 606 - serviceEndpoint, ok := id.Services["atproto_pds"] 607 - if !ok { 608 - l.Warn("did identity did not contain atproto_pds service while adding their keys", "did", did) 609 - return nil 610 - } 611 - 612 - xrpcc := indigoxrpc.Client{Host: serviceEndpoint.URL} 613 - resp, err := comatproto.RepoListRecords(context.Background(), &xrpcc, tangled.PublicKeyNSID, "", 50, did, false) 614 - if err != nil { 615 - return fmt.Errorf("fetching public keys for did: %w", err) 616 - } 617 - 618 - for _, record := range resp.Records { 619 - if record == nil { 620 - continue 621 - } 622 - key := record.Value.Val.(*tangled.PublicKey) 623 - if key == nil { 624 - continue 625 - } 626 - pk := db.PublicKey{ 627 - Did: did, 628 - PublicKey: *key, 629 - } 630 - err = h.db.AddPublicKey(pk) 631 - if err != nil { 632 - return fmt.Errorf("adding public key to db: %w", err) 633 - } 634 - } 635 - return nil 636 - } 637 - 638 329 func (h *Knot) processRepo(ctx context.Context, event *jmodels.Event) error { 639 330 l := log.FromContext(ctx).With("handler", "processRepo", "did", event.Did, "rkey", event.Commit.RKey) 640 331 ··· 705 396 switch event.Commit.Collection { 706 397 case tangled.PublicKeyNSID: 707 398 err = h.processPublicKey(ctx, event) 708 - case tangled.KnotMemberNSID: 709 - err = h.processKnotMember(ctx, event) 710 399 case tangled.RepoNSID: 711 400 err = h.processRepo(ctx, event) 712 401 case tangled.RepoPullNSID: 713 402 err = h.processPull(ctx, event) 714 - case tangled.RepoCollaboratorNSID: 715 - err = h.processCollaborator(ctx, event) 716 403 } 717 404 default: 718 405 return nil
+7 -6
knotserver/router.go
··· 14 14 "tangled.org/core/jetstream" 15 15 "tangled.org/core/knotserver/config" 16 16 "tangled.org/core/knotserver/db" 17 + "tangled.org/core/knotserver/keys" 17 18 "tangled.org/core/knotserver/sandbox" 18 19 "tangled.org/core/knotserver/xrpc" 19 20 "tangled.org/core/log" ··· 107 108 }) 108 109 109 110 // xrpc apis 110 - r.Mount("/xrpc", h.XrpcRouter()) 111 + x := h.newXrpc() 112 + r.Mount("/xrpc", x.Router()) 113 + r.Mount("/admin", x.AdminRouter()) 111 114 112 115 // Socket that streams git oplogs 113 116 r.Get("/events", h.Events) ··· 129 132 return h.motd 130 133 } 131 134 132 - func (h *Knot) XrpcRouter() http.Handler { 135 + func (h *Knot) newXrpc() *xrpc.Xrpc { 133 136 serviceAuth := serviceauth.NewServiceAuth(h.l, h.resolver.Directory(), h.c.Server.Did().String()) 134 137 135 138 l := log.SubLogger(h.l, "xrpc") 136 139 137 - xrpc := &xrpc.Xrpc{ 140 + return &xrpc.Xrpc{ 138 141 Config: h.c, 139 142 Db: h.db, 140 143 Ingester: h.jc, ··· 145 148 ServiceAuth: serviceAuth, 146 149 Sandbox: h.sandbox, 147 150 } 148 - 149 - return xrpc.Router() 150 151 } 151 152 152 153 func (h *Knot) resolveDidRedirect(next http.Handler) http.Handler { ··· 216 217 return fmt.Errorf("failed to add owner to RBAC: %w", err) 217 218 } 218 219 219 - err = h.fetchAndAddKeys(ctx, cfgOwner) 220 + err = keys.FetchAndStore(ctx, h.resolver.Directory(), h.db, cfgOwner) 220 221 if err != nil { 221 222 h.l.Error("fetching and adding owners public keys", "error", err, "did", cfgOwner) 222 223 }
-2
knotserver/server.go
··· 95 95 96 96 jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ 97 97 tangled.PublicKeyNSID, 98 - tangled.KnotMemberNSID, 99 98 tangled.RepoNSID, 100 99 tangled.RepoPullNSID, 101 - tangled.RepoCollaboratorNSID, 102 100 }, nil, log.SubLogger(logger, "jetstream"), db, true, c.Server.LogDids) 103 101 if err != nil { 104 102 logger.Error("failed to setup jetstream", "error", err)
-4
rbac/util.go
··· 95 95 return false, nil 96 96 } 97 97 98 - func (e *Enforcer) WouldHaveAnyPolicyExcludingKnotMember(user, domain string) (bool, error) { 99 - return e.wouldHaveAnyPolicyExcludingGrouping(user, "server:member", domain) 100 - } 101 - 102 98 func (e *Enforcer) WouldHaveAnyPolicyExcludingSpindleMember(user, domain string) (bool, error) { 103 99 return e.wouldHaveAnyPolicyExcludingGrouping(user, "server:member", intoSpindle(domain)) 104 100 }