Monorepo for Tangled tangled.org
2

Configure Feed

Select the types of activity you want to include in your feed.

knotserver/db: knot-owned member & collaborator tables, pagination, and ACL update events

Lewis: May this revision serve well! <lewis@tangled.org>

author
Lewis
committer
Tangled
date (Jun 16, 2026, 9:04 PM +0300) commit dcde7265 parent 6dc5c9d8 change-id wlozpmkl
+389 -13
+25
knotserver/db/aclupdate.go
··· 1 + package db 2 + 3 + const ( 4 + KnotMemberUpdateNSID = "sh.tangled.knot.memberUpdate" 5 + RepoCollaboratorUpdateNSID = "sh.tangled.repo.collaboratorUpdate" 6 + ) 7 + 8 + type AclOp string 9 + 10 + const ( 11 + AclOpAdd AclOp = "add" 12 + AclOpRemove AclOp = "remove" 13 + ) 14 + 15 + type KnotMemberUpdate struct { 16 + Op AclOp `json:"op"` 17 + Subject string `json:"subject"` 18 + } 19 + 20 + // NOTE: no "addedBy" so for now we can't deduce who to suggest a vouch to, about having added a collaborator. 21 + type RepoCollaboratorUpdate struct { 22 + Op AclOp `json:"op"` 23 + Subject string `json:"subject"` 24 + Repo string `json:"repo"` 25 + }
+94
knotserver/db/collaborator.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + "tangled.org/core/orm" 9 + ) 10 + 11 + type Collaborator struct { 12 + Id int 13 + RepoDid syntax.DID 14 + Subject syntax.DID 15 + AddedBy syntax.DID 16 + Created string 17 + } 18 + 19 + func AddCollaborator(q DBTX, c Collaborator) error { 20 + _, err := q.Exec( 21 + `insert or ignore into collaborators (repo_did, subject_did, added_by_did) values (?, ?, ?)`, 22 + c.RepoDid, 23 + c.Subject, 24 + c.AddedBy, 25 + ) 26 + return err 27 + } 28 + 29 + func IsCollaborator(q DBTX, repoDid, subject syntax.DID) (bool, error) { 30 + var exists bool 31 + err := q.QueryRow( 32 + `select exists (select 1 from collaborators where repo_did = ? and subject_did = ?)`, 33 + repoDid, 34 + subject, 35 + ).Scan(&exists) 36 + return exists, err 37 + } 38 + 39 + func RemoveCollaborator(q DBTX, repoDid, subject syntax.DID) error { 40 + _, err := q.Exec( 41 + `delete from collaborators where repo_did = ? and subject_did = ?`, 42 + repoDid, 43 + subject, 44 + ) 45 + return err 46 + } 47 + 48 + func (d *DB) ApplyCollaboratorBackfill(ctx context.Context, rows []Collaborator, migrationName string, markApplied bool) error { 49 + insert := func(tx *sql.Tx) error { 50 + for _, c := range rows { 51 + if err := AddDid(tx, c.Subject.String()); err != nil { 52 + return err 53 + } 54 + if err := AddCollaborator(tx, c); err != nil { 55 + return err 56 + } 57 + } 58 + return nil 59 + } 60 + 61 + if markApplied { 62 + conn, err := d.db.Conn(ctx) 63 + if err != nil { 64 + return err 65 + } 66 + defer conn.Close() 67 + return orm.RunMigration(conn, d.logger, migrationName, insert) 68 + } 69 + 70 + tx, err := d.db.BeginTx(ctx, nil) 71 + if err != nil { 72 + return err 73 + } 74 + defer tx.Rollback() 75 + if err := insert(tx); err != nil { 76 + return err 77 + } 78 + return tx.Commit() 79 + } 80 + 81 + func ListCollaborators(q DBTX, repoDid syntax.DID, p ListPage) ([]Collaborator, *int, error) { 82 + return listPaged(q, 83 + `select id, repo_did, subject_did, added_by_did, created 84 + from collaborators 85 + where repo_did = ?`, 86 + []any{repoDid}, p, 87 + func(r *sql.Rows) (Collaborator, error) { 88 + var c Collaborator 89 + err := r.Scan(&c.Id, &c.RepoDid, &c.Subject, &c.AddedBy, &c.Created) 90 + return c, err 91 + }, 92 + func(c Collaborator) int { return c.Id }, 93 + ) 94 + }
+84
knotserver/db/db.go
··· 26 26 27 27 type DBTX interface { 28 28 QueryRow(query string, args ...any) *sql.Row 29 + Query(query string, args ...any) (*sql.Rows, error) 29 30 Exec(query string, args ...any) (sql.Result, error) 30 31 } 31 32 ··· 39 40 40 41 func (d *DB) QueryRow(query string, args ...any) *sql.Row { 41 42 return d.db.QueryRow(query, args...) 43 + } 44 + 45 + func (d *DB) Query(query string, args ...any) (*sql.Rows, error) { 46 + return d.db.Query(query, args...) 42 47 } 43 48 44 49 func Setup(ctx context.Context, dbPath string) (*DB, error) { ··· 244 249 return nil, err 245 250 } 246 251 252 + if err := orm.RunMigration(conn, logger, "knot-members-nullable-rkey", func(tx *sql.Tx) error { 253 + _, mErr := tx.ExecContext(ctx, ` 254 + create table knot_members_new ( 255 + id integer primary key autoincrement, 256 + did text not null, 257 + rkey text, 258 + subject text not null, 259 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 260 + unique (did, rkey) 261 + ); 262 + insert into knot_members_new (id, did, rkey, subject, created) 263 + select id, did, rkey, subject, created from knot_members; 264 + drop table knot_members; 265 + alter table knot_members_new rename to knot_members; 266 + `) 267 + return mErr 268 + }); err != nil { 269 + return nil, err 270 + } 271 + 272 + if err := orm.RunMigration(conn, logger, "create-collaborators", func(tx *sql.Tx) error { 273 + _, mErr := tx.ExecContext(ctx, ` 274 + create table if not exists collaborators ( 275 + id integer primary key autoincrement, 276 + repo_did text not null, 277 + subject_did text not null, 278 + added_by_did text not null, 279 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 280 + unique (repo_did, subject_did) 281 + ); 282 + create index if not exists idx_collaborators_repo_id 283 + on collaborators(repo_did, id); 284 + `) 285 + return mErr 286 + }); err != nil { 287 + return nil, err 288 + } 289 + 290 + if err := orm.RunMigration(conn, logger, "knot-members-direct-subject-unique", func(tx *sql.Tx) error { 291 + _, mErr := tx.ExecContext(ctx, ` 292 + create unique index if not exists idx_knot_members_direct_subject 293 + on knot_members(subject) where rkey is null; 294 + create index if not exists idx_knot_members_subject 295 + on knot_members(subject); 296 + `) 297 + return mErr 298 + }); err != nil { 299 + return nil, err 300 + } 301 + 302 + if err := orm.RunMigration(conn, logger, "add-events-created-index", func(tx *sql.Tx) error { 303 + _, mErr := tx.ExecContext(ctx, `create index if not exists idx_events_created on events(created)`) 304 + return mErr 305 + }); err != nil { 306 + return nil, err 307 + } 308 + 247 309 return &DB{ 248 310 db: db, 249 311 logger: logger, ··· 299 361 return err 300 362 } 301 363 364 + if _, err := tx.Exec(`DELETE FROM collaborators WHERE repo_did = ?`, repoDid); err != nil { 365 + return err 366 + } 367 + 302 368 return tx.Commit() 303 369 } 304 370 ··· 306 372 var count int 307 373 err := d.db.QueryRow(`SELECT count(1) FROM repo_keys WHERE repo_did = ?`, repoDid).Scan(&count) 308 374 return count > 0, err 375 + } 376 + 377 + func (d *DB) ListRepoDids() ([]string, error) { 378 + rows, err := d.db.Query(`SELECT repo_did FROM repo_keys`) 379 + if err != nil { 380 + return nil, err 381 + } 382 + defer rows.Close() 383 + 384 + dids := []string{} 385 + for rows.Next() { 386 + var did string 387 + if err := rows.Scan(&did); err != nil { 388 + return nil, err 389 + } 390 + dids = append(dids, did) 391 + } 392 + return dids, rows.Err() 309 393 } 310 394 311 395 func (d *DB) GetRepoDid(ownerDid, rkey string) (string, error) {
+38
knotserver/db/events.go
··· 4 4 "encoding/json" 5 5 "fmt" 6 6 7 + "github.com/bluesky-social/indigo/atproto/syntax" 7 8 "tangled.org/core/eventstream" 8 9 "tangled.org/core/notifier" 9 10 "tangled.org/core/tid" ··· 28 29 return d.InsertEvent(eventstream.Event{ 29 30 Rkey: tid.TID(), 30 31 Nsid: RepoDIDAssignNSID, 32 + EventJson: eventJson, 33 + }, n) 34 + } 35 + 36 + func (d *DB) EmitKnotMemberUpdate(n *notifier.Notifier, op AclOp, subject syntax.DID) error { 37 + payload := KnotMemberUpdate{ 38 + Op: op, 39 + Subject: subject.String(), 40 + } 41 + 42 + eventJson, err := json.Marshal(payload) 43 + if err != nil { 44 + return fmt.Errorf("marshal memberUpdate event: %w", err) 45 + } 46 + 47 + return d.InsertEvent(eventstream.Event{ 48 + Rkey: tid.TID(), 49 + Nsid: KnotMemberUpdateNSID, 50 + EventJson: eventJson, 51 + }, n) 52 + } 53 + 54 + func (d *DB) EmitCollaboratorUpdate(n *notifier.Notifier, op AclOp, subject, repoDid syntax.DID) error { 55 + payload := RepoCollaboratorUpdate{ 56 + Op: op, 57 + Subject: subject.String(), 58 + Repo: repoDid.String(), 59 + } 60 + 61 + eventJson, err := json.Marshal(payload) 62 + if err != nil { 63 + return fmt.Errorf("marshal collaboratorUpdate event: %w", err) 64 + } 65 + 66 + return d.InsertEvent(eventstream.Event{ 67 + Rkey: tid.TID(), 68 + Nsid: RepoCollaboratorUpdateNSID, 31 69 EventJson: eventJson, 32 70 }, n) 33 71 }
+71 -13
knotserver/db/member.go
··· 13 13 Did syntax.DID 14 14 Rkey string 15 15 Subject syntax.DID 16 + Created string 16 17 } 17 18 18 19 func (d *DB) IsMigrationApplied(name string) (bool, error) { ··· 24 25 return exists, err 25 26 } 26 27 28 + func (d *DB) ApplyKnotMemberBackfill(ctx context.Context, rows []KnotMember, migrationName string) error { 29 + conn, err := d.db.Conn(ctx) 30 + if err != nil { 31 + return err 32 + } 33 + defer conn.Close() 34 + 35 + return orm.RunMigration(conn, d.logger, migrationName, func(tx *sql.Tx) error { 36 + for _, m := range rows { 37 + if err := AddDid(tx, m.Subject.String()); err != nil { 38 + return err 39 + } 40 + if err := AddKnotMemberDirect(tx, m.Did, m.Subject); err != nil { 41 + return err 42 + } 43 + } 44 + return nil 45 + }) 46 + } 47 + 48 + func AddKnotMemberDirect(q DBTX, addedBy, subject syntax.DID) error { 49 + _, err := q.Exec( 50 + `insert or ignore into knot_members (did, rkey, subject) values (?, NULL, ?)`, 51 + addedBy, 52 + subject, 53 + ) 54 + return err 55 + } 56 + 57 + func RemoveKnotMemberBySubject(q DBTX, subject syntax.DID) error { 58 + _, err := q.Exec( 59 + "delete from knot_members where subject = ?", 60 + subject, 61 + ) 62 + return err 63 + } 64 + 65 + func RemoveKnotMemberDirect(q DBTX, subject syntax.DID) error { 66 + _, err := q.Exec( 67 + "delete from knot_members where subject = ? and rkey is null", 68 + subject, 69 + ) 70 + return err 71 + } 72 + 73 + func CountKnotMembersBySubject(q DBTX, subject string) (int, error) { 74 + var count int 75 + err := q.QueryRow( 76 + `select count(*) from knot_members where subject = ?`, 77 + subject, 78 + ).Scan(&count) 79 + return count, err 80 + } 81 + 82 + func ListKnotMembers(q DBTX, p ListPage) ([]KnotMember, *int, error) { 83 + return listPaged(q, 84 + `select id, did, subject, created 85 + from knot_members 86 + where id in (select min(id) from knot_members group by subject)`, 87 + nil, p, 88 + func(r *sql.Rows) (KnotMember, error) { 89 + var m KnotMember 90 + err := r.Scan(&m.Id, &m.Did, &m.Subject, &m.Created) 91 + return m, err 92 + }, 93 + func(m KnotMember) int { return m.Id }, 94 + ) 95 + } 96 + 27 97 func (d *DB) ApplyKnotMembersBackfill(ctx context.Context, rows []KnotMember, migrationName string) error { 28 98 conn, err := d.db.Conn(ctx) 29 99 if err != nil { ··· 33 103 34 104 return orm.RunMigration(conn, d.logger, migrationName, func(tx *sql.Tx) error { 35 105 for _, m := range rows { 36 - if _, err := tx.ExecContext(ctx, 37 - `insert or ignore into known_dids (did) values (?)`, 38 - m.Subject, 39 - ); err != nil { 106 + if err := AddDid(tx, m.Subject.String()); err != nil { 40 107 return err 41 108 } 42 109 if _, err := tx.ExecContext(ctx, ··· 67 134 rkey, 68 135 ) 69 136 return err 70 - } 71 - 72 - func CountKnotMembersBySubject(q DBTX, subject string) (int, error) { 73 - var count int 74 - err := q.QueryRow( 75 - `select count(*) from knot_members where subject = ?`, 76 - subject, 77 - ).Scan(&count) 78 - return count, err 79 137 } 80 138 81 139 func GetKnotMember(q DBTX, did, rkey string) (*KnotMember, error) {
+74
knotserver/db/page.go
··· 1 + package db 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + ) 7 + 8 + const ( 9 + ListDefaultLimit = 50 10 + ListMaxLimit = 1000 11 + ) 12 + 13 + type ListPage struct { 14 + Limit int 15 + Cursor *int 16 + Desc bool 17 + } 18 + 19 + func (p ListPage) limit() int { 20 + switch { 21 + case p.Limit <= 0: 22 + return ListDefaultLimit 23 + case p.Limit > ListMaxLimit: 24 + return ListMaxLimit 25 + default: 26 + return p.Limit 27 + } 28 + } 29 + 30 + func (p ListPage) clause() (string, []any) { 31 + dir, cmp := "asc", ">" 32 + if p.Desc { 33 + dir, cmp = "desc", "<" 34 + } 35 + if p.Cursor != nil { 36 + return fmt.Sprintf("where id %s ? order by id %s limit ?", cmp, dir), []any{*p.Cursor, p.limit() + 1} 37 + } 38 + return fmt.Sprintf("order by id %s limit ?", dir), []any{p.limit() + 1} 39 + } 40 + 41 + func listPaged[T any]( 42 + q DBTX, 43 + query string, 44 + args []any, 45 + p ListPage, 46 + scan func(*sql.Rows) (T, error), 47 + idOf func(T) int, 48 + ) ([]T, *int, error) { 49 + clause, pArgs := p.clause() 50 + rows, err := q.Query("select * from ("+query+") "+clause, append(args, pArgs...)...) 51 + if err != nil { 52 + return nil, nil, err 53 + } 54 + defer rows.Close() 55 + 56 + out := []T{} 57 + for rows.Next() { 58 + v, err := scan(rows) 59 + if err != nil { 60 + return nil, nil, err 61 + } 62 + out = append(out, v) 63 + } 64 + if err := rows.Err(); err != nil { 65 + return nil, nil, err 66 + } 67 + 68 + if limit := p.limit(); len(out) > limit { 69 + out = out[:limit] 70 + last := idOf(out[len(out)-1]) 71 + return out, &last, nil 72 + } 73 + return out, nil, nil 74 + }
+3
notifier/notifier.go
··· 31 31 } 32 32 33 33 func (n *Notifier) NotifyAll() { 34 + if n == nil { 35 + return 36 + } 34 37 n.mu.Lock() 35 38 for ch := range n.subscribers { 36 39 select {