Monorepo for Tangled tangled.org
5

Configure Feed

Select the types of activity you want to include in your feed.

appview: use blob based strings

Signed-off-by: Seongmin Lee <git@boltless.me>

author
Seongmin Lee
date (Jun 17, 2026, 12:31 AM +0900) commit e0b06edd parent 8e9a5b4b change-id rumvlukx
+510 -165
+18
appview/db/db.go
··· 2283 2283 return nil 2284 2284 }) 2285 2285 2286 + orm.RunMigration(conn, logger, "use-blobs-for-string-files", func(tx *sql.Tx) error { 2287 + _, err := tx.Exec(` 2288 + create table string_files ( 2289 + id integer primary key autoincrement, 2290 + at_uri text not null, 2291 + name text not null, 2292 + content_ref text not null, 2293 + content_size integer not null, 2294 + content_mimetype text not null, 2295 + gzip_realsize integer, 2296 + gzip_realmime text, 2297 + gzip_realcontent text, -- decoded content of gzipped text blobs 2298 + foreign key (at_uri) references strings(at_uri) on delete cascade 2299 + ); 2300 + `) 2301 + return err 2302 + }) 2303 + 2286 2304 return &DB{ 2287 2305 db, 2288 2306 logger,
+124 -41
appview/db/strings.go
··· 9 9 "time" 10 10 11 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 + lexutil "github.com/bluesky-social/indigo/lex/util" 13 + "github.com/ipfs/go-cid" 14 + "tangled.org/core/api/tangled" 12 15 "tangled.org/core/appview/models" 13 16 "tangled.org/core/orm" 14 17 ) ··· 60 63 } 61 64 if num == 0 { 62 65 return nil 66 + } 67 + 68 + _, err = tx.Exec(`delete from string_files where at_uri = ?`, s.AtUri()) 69 + if err != nil { 70 + return fmt.Errorf("deleting old files: %w", err) 71 + } 72 + 73 + // legacy single-file strings carry their content in file_name/file_content 74 + // and have no rows in string_files; nothing more to insert. 75 + if len(s.Files) == 0 { 76 + if err := tx.Commit(); err != nil { 77 + return fmt.Errorf("commiting transaction: %w", err) 78 + } 79 + return nil 80 + } 81 + 82 + vals := make([]string, len(s.Files)) 83 + args := make([]any, 0, len(s.Files)*8) 84 + for i, file := range s.Files { 85 + vals[i] = "(?, ?, ?, ?, ?, ?, ?, ?)" 86 + var gzipRealSize *int64 87 + var gzipRealMime *string 88 + var gzipRealContent *string 89 + if file.Gzip != nil { 90 + gzipRealSize = &file.Gzip.RealSize 91 + gzipRealMime = &file.Gzip.RealMime 92 + gzipRealContent = &file.Gzip.Content 93 + } 94 + args = append(args, 95 + s.AtUri(), 96 + file.Name, 97 + file.Content.Ref.String(), 98 + file.Content.Size, 99 + file.Content.MimeType, 100 + gzipRealSize, 101 + gzipRealMime, 102 + gzipRealContent, 103 + ) 104 + } 105 + _, err = tx.Exec( 106 + fmt.Sprintf( 107 + `insert into string_files ( 108 + at_uri, 109 + name, 110 + content_ref, 111 + content_size, 112 + content_mimetype, 113 + gzip_realsize, 114 + gzip_realmime, 115 + gzip_realcontent 116 + ) 117 + values %s`, 118 + strings.Join(vals, ","), 119 + ), 120 + args..., 121 + ) 122 + if err != nil { 123 + return fmt.Errorf("inserting files: %w", err) 63 124 } 64 125 65 126 if err := tx.Commit(); err != nil { ··· 194 255 i++ 195 256 } 196 257 197 - // // get files 198 - // { 199 - // rows, err := e.Query( 200 - // fmt.Sprintf( 201 - // `select at_uri, name, blob from string_files where at_uri in (%s) order by at_uri, id`, 202 - // inClause, 203 - // ), 204 - // args..., 205 - // ) 206 - // if err != nil { 207 - // return nil, fmt.Errorf("failed to execute string_files query: %w", err) 208 - // } 209 - // defer rows.Close() 210 - // 211 - // for rows.Next() { 212 - // var stringAt syntax.ATURI 213 - // var file models.String_File 214 - // file.Blob = &util.LexBlob{} 215 - // var gzipMimeType sql.Null[string] 216 - // var gzipSize sql.Null[int64] 217 - // if err := rows.Scan( 218 - // &stringAt, 219 - // &file.Name, 220 - // &blob, 221 - // ); err != nil { 222 - // return nil, fmt.Errorf("failed to execute string_files query: %w", err) 223 - // } 224 - // if gzipMimeType.Valid && gzipSize.Valid { 225 - // file.Gzip = &models.GzipInfo{ 226 - // MimeType: gzipMimeType.V, 227 - // Size: gzipSize.V, 228 - // } 229 - // } 230 - // if s, ok := stringMap[stringAt]; ok { 231 - // s.Files = append(s.Files, file) 232 - // } 233 - // } 234 - // if err = rows.Err(); err != nil { 235 - // return nil, fmt.Errorf("failed to execute string_files query: %w", err) 236 - // } 237 - // } 258 + // get files 259 + { 260 + rows, err := e.Query( 261 + fmt.Sprintf( 262 + `select 263 + at_uri, 264 + name, 265 + content_ref, 266 + content_size, 267 + content_mimetype, 268 + gzip_realsize, 269 + gzip_realmime, 270 + gzip_realcontent 271 + from string_files 272 + where at_uri in (%s) order by at_uri, id`, 273 + inClause, 274 + ), 275 + args..., 276 + ) 277 + if err != nil { 278 + return nil, fmt.Errorf("failed to execute string_files query: %w", err) 279 + } 280 + defer rows.Close() 281 + 282 + for rows.Next() { 283 + var stringAt syntax.ATURI 284 + var file models.String_File 285 + 286 + var contentRef string 287 + var gzipRealSize sql.Null[int64] 288 + var gzipRealMime, gzipRealContent sql.Null[string] 289 + if err := rows.Scan( 290 + &stringAt, 291 + &file.Name, 292 + &contentRef, 293 + &file.Content.Size, 294 + &file.Content.MimeType, 295 + &gzipRealSize, 296 + &gzipRealMime, 297 + &gzipRealContent, 298 + ); err != nil { 299 + return nil, fmt.Errorf("failed to execute string_files query: %w", err) 300 + } 301 + 302 + file.Content.Ref = lexutil.LexLink(cid.MustParse(contentRef)) 303 + 304 + if gzipRealMime.Valid && gzipRealSize.Valid && gzipRealContent.Valid { 305 + file.Gzip = &models.String_GzipInfo{ 306 + String_File_Gzip: tangled.String_File_Gzip{ 307 + RealMime: gzipRealMime.V, 308 + RealSize: gzipRealSize.V, 309 + }, 310 + Content: gzipRealContent.V, 311 + } 312 + } 313 + if s, ok := stringMap[stringAt]; ok { 314 + s.Files = append(s.Files, file) 315 + } 316 + } 317 + if err = rows.Err(); err != nil { 318 + return nil, fmt.Errorf("failed to execute string_files query: %w", err) 319 + } 320 + } 238 321 239 322 // get star counts 240 323 {
+38 -5
appview/ingester.go
··· 1 1 package appview 2 2 3 3 import ( 4 + "compress/gzip" 4 5 "context" 5 6 "database/sql" 6 7 "encoding/json" ··· 34 35 "tangled.org/core/appview/repoverify" 35 36 "tangled.org/core/appview/serververify" 36 37 "tangled.org/core/appview/validator" 38 + "tangled.org/core/blobstore" 37 39 "tangled.org/core/idresolver" 38 40 "tangled.org/core/orm" 39 41 "tangled.org/core/rbac" ··· 45 47 Enforcer *rbac.Enforcer 46 48 Acl *knotacl.Service 47 49 IdResolver *idresolver.Resolver 50 + BlobStore blobstore.BlobStore 48 51 Cache *cache.Cache 49 52 Config *config.Config 50 53 Logger *slog.Logger ··· 105 108 case tangled.KnotNSID: 106 109 err = i.ingestKnot(ctx, e, l) 107 110 case tangled.StringNSID: 108 - err = i.ingestString(e, l) 111 + err = i.ingestString(ctx, e, l) 109 112 case tangled.RepoIssueNSID: 110 113 err = i.ingestIssue(ctx, e, l) 111 114 case tangled.RepoPullNSID: ··· 913 916 return nil 914 917 } 915 918 916 - func (i *Ingester) ingestString(e *jmodels.Event, l *slog.Logger) error { 919 + func (i *Ingester) ingestString(ctx context.Context, e *jmodels.Event, l *slog.Logger) error { 917 920 did := e.Did 918 921 rkey := e.Commit.RKey 919 922 ··· 931 934 return err 932 935 } 933 936 934 - string, err := models.StringFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(e.Commit.CID), record) 937 + str, err := models.StringFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(e.Commit.CID), record) 935 938 if err != nil { 936 939 return fmt.Errorf("failed to parse string record: %w", err) 937 940 } 938 - if err = string.Validate(); err != nil { 941 + if err = str.Validate(); err != nil { 939 942 l.Error("invalid record", "err", err) 940 943 return err 941 944 } 942 945 943 - if err = db.AddString(i.Db, string); err != nil { 946 + g, gctx := errgroup.WithContext(ctx) 947 + for idx, file := range str.Files { 948 + if file.Gzip == nil { 949 + continue 950 + } 951 + g.Go(func() error { 952 + blob, err := i.BlobStore.GetBlob(gctx, str.Did, cid.Cid(file.Content.Ref)) 953 + if err != nil { 954 + return fmt.Errorf("files[%d]: failed to fetch blob: %w", idx, err) 955 + } 956 + defer blob.Close() 957 + gzr, err := gzip.NewReader(blob) 958 + if err != nil { 959 + return fmt.Errorf("files[%d]: invalid gzip stream: %w", idx, err) 960 + } 961 + gzr.Close() 962 + 963 + content, err := io.ReadAll(gzr) 964 + if err != nil { 965 + return fmt.Errorf("files[%d]: failed to read blob: %w", idx, err) 966 + } 967 + file.Gzip.Content = string(content) 968 + str.Files[idx] = file 969 + return nil 970 + }) 971 + } 972 + if err := g.Wait(); err != nil { 973 + return err 974 + } 975 + 976 + if err = db.AddString(i.Db, str); err != nil { 944 977 l.Error("failed to add string", "err", err) 945 978 return err 946 979 }
+27 -27
appview/ingester_string_test.go
··· 89 89 created := time.Date(2025, 9, 14, 10, 30, 0, 0, time.UTC) 90 90 91 91 e := makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:boltless", "rk1", tangled.String{ 92 - Filename: "hello.txt", 93 - Description: "a greeting", 94 - Contents: "hello world\n", 92 + Filename: ptr("hello.txt"), 93 + Description: ptr("a greeting"), 94 + Contents: ptr("hello world\n"), 95 95 CreatedAt: created.Format(time.RFC3339), 96 96 }) 97 97 98 - if err := ing.ingestString(e, ing.Logger); err != nil { 98 + if err := ing.ingestString(t.Context(), e, ing.Logger); err != nil { 99 99 t.Fatalf("ingestString: %v", err) 100 100 } 101 101 ··· 127 127 ing := newStringIngester(t) 128 128 created := time.Date(2025, 9, 14, 10, 30, 0, 0, time.UTC) 129 129 base := tangled.String{ 130 - Filename: "hello.txt", 131 - Description: "a greeting", 132 - Contents: "hello world\n", 130 + Filename: ptr("hello.txt"), 131 + Description: ptr("a greeting"), 132 + Contents: ptr("hello world\n"), 133 133 CreatedAt: created.Format(time.RFC3339), 134 134 } 135 135 136 - if err := ing.ingestString(makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:boltless", "rk1", base), ing.Logger); err != nil { 136 + if err := ing.ingestString(t.Context(), makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:boltless", "rk1", base), ing.Logger); err != nil { 137 137 t.Fatalf("ingestString create: %v", err) 138 138 } 139 139 140 140 updated := base 141 - updated.Contents = "hello, world!\n" 142 - if err := ing.ingestString(makeStringEvent(t, jmodels.CommitOperationUpdate, "did:plc:boltless", "rk1", updated), ing.Logger); err != nil { 141 + updated.Contents = ptr("hello, world!\n") 142 + if err := ing.ingestString(t.Context(), makeStringEvent(t, jmodels.CommitOperationUpdate, "did:plc:boltless", "rk1", updated), ing.Logger); err != nil { 143 143 t.Fatalf("ingestString update: %v", err) 144 144 } 145 145 ··· 161 161 func TestIngestString_UpdateNoChangeKeepsEditedNil(t *testing.T) { 162 162 ing := newStringIngester(t) 163 163 rec := tangled.String{ 164 - Filename: "hello.txt", 165 - Description: "a greeting", 166 - Contents: "hello world\n", 164 + Filename: ptr("hello.txt"), 165 + Description: ptr("a greeting"), 166 + Contents: ptr("hello world\n"), 167 167 CreatedAt: time.Date(2025, 9, 14, 10, 30, 0, 0, time.UTC).Format(time.RFC3339), 168 168 } 169 169 170 - if err := ing.ingestString(makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "rk2", rec), ing.Logger); err != nil { 170 + if err := ing.ingestString(t.Context(), makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "rk2", rec), ing.Logger); err != nil { 171 171 t.Fatalf("create: %v", err) 172 172 } 173 - if err := ing.ingestString(makeStringEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "rk2", rec), ing.Logger); err != nil { 173 + if err := ing.ingestString(t.Context(), makeStringEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "rk2", rec), ing.Logger); err != nil { 174 174 t.Fatalf("update: %v", err) 175 175 } 176 176 ··· 183 183 func TestIngestString_DeleteRemovesRow(t *testing.T) { 184 184 ing := newStringIngester(t) 185 185 rec := tangled.String{ 186 - Filename: "hello.txt", 187 - Contents: "x", 186 + Filename: ptr("hello.txt"), 187 + Contents: ptr("x"), 188 188 CreatedAt: time.Now().UTC().Format(time.RFC3339), 189 189 } 190 - if err := ing.ingestString(makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:boltless", "rk1", rec), ing.Logger); err != nil { 190 + if err := ing.ingestString(t.Context(), makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:boltless", "rk1", rec), ing.Logger); err != nil { 191 191 t.Fatalf("create: %v", err) 192 192 } 193 193 ··· 201 201 CID: makeCID(t, &rec), 202 202 }, 203 203 } 204 - if err := ing.ingestString(del, ing.Logger); err != nil { 204 + if err := ing.ingestString(t.Context(), del, ing.Logger); err != nil { 205 205 t.Fatalf("delete: %v", err) 206 206 } 207 207 ··· 218 218 name string 219 219 rec tangled.String 220 220 }{ 221 - {"long filename", tangled.String{Filename: strings.Repeat("a", 141), Contents: "x", CreatedAt: now}}, 222 - {"long description", tangled.String{Filename: "x", Description: strings.Repeat("d", 281), Contents: "x", CreatedAt: now}}, 221 + {"long filename", tangled.String{Filename: ptr(strings.Repeat("a", 141)), Contents: ptr("x"), CreatedAt: now}}, 222 + {"long description", tangled.String{Filename: ptr("x"), Description: ptr(strings.Repeat("d", 281)), Contents: ptr("x"), CreatedAt: now}}, 223 223 } 224 224 225 225 for _, tc := range cases { 226 226 t.Run(tc.name, func(t *testing.T) { 227 227 e := makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "bad", tc.rec) 228 - if err := ing.ingestString(e, ing.Logger); err == nil { 228 + if err := ing.ingestString(t.Context(), e, ing.Logger); err == nil { 229 229 t.Fatal("expected validator error, got nil") 230 230 } 231 231 if _, ok := loadString(t, ing, "did:plc:akshay", "bad"); ok { ··· 238 238 func TestIngestString_ColdReplayPreservesCreated(t *testing.T) { 239 239 created := time.Date(2024, 1, 2, 3, 4, 5, 0, time.UTC) 240 240 rec := tangled.String{ 241 - Filename: "snippet.go", 242 - Description: "first cut", 243 - Contents: "package main\n", 241 + Filename: ptr("snippet.go"), 242 + Description: ptr("first cut"), 243 + Contents: ptr("package main\n"), 244 244 CreatedAt: created.Format(time.RFC3339), 245 245 } 246 246 event := makeStringEvent(t, jmodels.CommitOperationCreate, "did:plc:boltless", "rkcold", rec) 247 247 248 248 first := newStringIngester(t) 249 - if err := first.ingestString(event, first.Logger); err != nil { 249 + if err := first.ingestString(t.Context(), event, first.Logger); err != nil { 250 250 t.Fatalf("first ingest: %v", err) 251 251 } 252 252 live, _ := loadString(t, first, "did:plc:boltless", "rkcold") 253 253 254 254 second := newStringIngester(t) 255 - if err := second.ingestString(event, second.Logger); err != nil { 255 + if err := second.ingestString(t.Context(), event, second.Logger); err != nil { 256 256 t.Fatalf("replay ingest: %v", err) 257 257 } 258 258 replayed, _ := loadString(t, second, "did:plc:boltless", "rkcold")
+51 -19
appview/models/string.go
··· 37 37 Stats *StringStats 38 38 } 39 39 40 - // TODO: replace this with [tangled.String_File] 40 + // String_File is [tangled.String_File] with optional prefetched & decompressed text blob content 41 41 type String_File struct { 42 - Name string 43 - Blob *lexutil.LexBlob 44 - Gzip *GzipInfo 42 + Name string 43 + Content lexutil.LexBlob 44 + Gzip *String_GzipInfo 45 45 } 46 - type GzipInfo struct { 47 - MimeType string 48 - Size int64 46 + type String_GzipInfo struct { 47 + tangled.String_File_Gzip 48 + // Optional uncompressed content. 49 + // Populated when the content is first requested. 50 + Content string 49 51 } 50 52 51 53 func (s *String) AtUri() syntax.ATURI { ··· 53 55 } 54 56 55 57 func (s *String) AsRecord() *tangled.String { 56 - var description string 57 - if s.Description != nil { 58 - description = *s.Description 58 + var files []*tangled.String_File 59 + for _, f := range s.Files { 60 + var gzip *tangled.String_File_Gzip 61 + if f.Gzip != nil { 62 + gzip = &tangled.String_File_Gzip{ 63 + RealSize: f.Gzip.RealSize, 64 + RealMime: f.Gzip.RealMime, 65 + } 66 + } 67 + files = append(files, &tangled.String_File{ 68 + Name: f.Name, 69 + Content: &f.Content, 70 + Gzip: gzip, 71 + }) 59 72 } 60 73 return &tangled.String{ 61 - Filename: s.FileName, 62 - Description: description, 63 - Contents: s.FileContent, 74 + Title: s.Title, 75 + Description: s.Description, 76 + Files: files, 64 77 CreatedAt: s.Created.Format(time.RFC3339), 65 78 } 66 79 } ··· 116 129 return len(s.Files) == 0 117 130 } 118 131 132 + // StringFromRecord creates [String] from [tangled.String]. 133 + // NOTE: This won't prefetch blobs 119 134 func StringFromRecord(did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, record tangled.String) (String, error) { 120 135 created, err := time.Parse(time.RFC3339, record.CreatedAt) 121 136 if err != nil { 122 137 return String{}, fmt.Errorf("invalid createdAt: %w", err) 123 138 } 124 - var description *string 125 - if record.Description != "" { 126 - description = &record.Description 139 + var files []String_File 140 + for _, f := range record.Files { 141 + var gzip *String_GzipInfo 142 + if f.Gzip != nil { 143 + gzip = &String_GzipInfo{String_File_Gzip: *f.Gzip} 144 + } 145 + files = append(files, String_File{ 146 + Name: f.Name, 147 + Content: *f.Content, 148 + Gzip: gzip, 149 + }) 127 150 } 128 151 return String{ 129 152 Did: did, 130 153 Rkey: rkey, 131 154 Cid: &cid, 132 - Description: description, 155 + Title: record.Title, 156 + Description: record.Description, 157 + Files: files, 133 158 Created: created, 134 - FileName: record.Filename, 135 - FileContent: record.Contents, 159 + FileName: stringPtr(record.Filename), 160 + FileContent: stringPtr(record.Contents), 136 161 }, nil 137 162 } 138 163 ··· 145 170 LineCount int 146 171 ByteCount int 147 172 } 173 + 174 + func stringPtr(s *string) string { 175 + if s == nil { 176 + return "" 177 + } 178 + return *s 179 + }
+7 -6
appview/state/router.go
··· 334 334 logger := log.SubLogger(s.logger, "strings") 335 335 336 336 strs := &avstrings.Strings{ 337 - Db: s.db, 338 - OAuth: s.oauth, 339 - Pages: s.pages, 340 - Dir: s.idResolver.Directory(), 341 - Notifier: s.notifier, 342 - Logger: logger, 337 + Db: s.db, 338 + OAuth: s.oauth, 339 + Pages: s.pages, 340 + Dir: s.idResolver.Directory(), 341 + Notifier: s.notifier, 342 + Logger: logger, 343 + BlobStore: s.blobStore, 343 344 } 344 345 345 346 return strs.Router(mw)
+6
appview/state/state.go
··· 36 36 "tangled.org/core/appview/repoverify" 37 37 "tangled.org/core/appview/validator" 38 38 xrpcclient "tangled.org/core/appview/xrpcclient" 39 + "tangled.org/core/blobstore" 39 40 "tangled.org/core/consts" 40 41 "tangled.org/core/eventconsumer" 41 42 "tangled.org/core/idresolver" ··· 63 64 enforcer *rbac.Enforcer 64 65 pages *pages.Pages 65 66 idResolver *idresolver.Resolver 67 + blobStore blobstore.BlobStore 66 68 rdb *cache.Cache 67 69 mentionsResolver *mentions.Resolver 68 70 posthog posthog.Client ··· 127 129 128 130 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 129 131 132 + blobStore := blobstore.NewPdsBlobStore(res.Directory()) 133 + 130 134 jc, err := jetstream.NewJetstreamClient( 131 135 config.Jetstream.Endpoint, 132 136 "appview", ··· 191 195 Enforcer: enforcer, 192 196 Acl: aclService, 193 197 IdResolver: res, 198 + BlobStore: blobStore, 194 199 Cache: rdb, 195 200 Config: config, 196 201 Logger: log.SubLogger(logger, "ingester"), ··· 237 242 enforcer: enforcer, 238 243 pages: pages, 239 244 idResolver: res, 245 + blobStore: blobStore, 240 246 rdb: rdb, 241 247 mentionsResolver: mentionsResolver, 242 248 posthog: posthog,
+180 -67
appview/strings/strings.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "compress/gzip" 5 6 "context" 6 7 "database/sql" 7 8 "errors" ··· 21 22 "tangled.org/core/appview/oauth" 22 23 "tangled.org/core/appview/pages" 23 24 "tangled.org/core/appview/pages/markup" 25 + "tangled.org/core/blobstore" 24 26 "tangled.org/core/orm" 25 27 "tangled.org/core/tid" 28 + "tangled.org/core/xrpc" 26 29 27 30 "github.com/bluesky-social/indigo/api/agnostic" 28 31 "github.com/bluesky-social/indigo/api/atproto" 29 32 "github.com/bluesky-social/indigo/atproto/identity" 30 33 "github.com/bluesky-social/indigo/atproto/syntax" 31 - "github.com/bluesky-social/indigo/xrpc" 34 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 32 35 "github.com/go-chi/chi/v5" 36 + "github.com/ipfs/go-cid" 33 37 34 38 comatproto "github.com/bluesky-social/indigo/api/atproto" 35 39 lexutil "github.com/bluesky-social/indigo/lex/util" 36 40 ) 41 + 42 + const ApplicationGzip = "application/gzip" 37 43 38 44 type Strings struct { 39 - Db *db.DB 40 - OAuth *oauth.OAuth 41 - Pages *pages.Pages 42 - Dir identity.Directory 43 - Logger *slog.Logger 44 - Notifier notify.Notifier 45 + Db *db.DB 46 + OAuth *oauth.OAuth 47 + Pages *pages.Pages 48 + Dir identity.Directory 49 + BlobStore blobstore.BlobStore 50 + Logger *slog.Logger 51 + Notifier notify.Notifier 45 52 } 46 53 47 54 func (s *Strings) Router(mw *middleware.Middleware) http.Handler { ··· 117 124 return 118 125 } 119 126 120 - string, err := db.GetString(s.Db, orm.FilterEq("did", id.DID), orm.FilterEq("rkey", rkey)) 127 + str, err := db.GetString(s.Db, orm.FilterEq("did", id.DID), orm.FilterEq("rkey", rkey)) 121 128 if errors.Is(err, sql.ErrNoRows) { 122 129 s.Pages.Error404(w) 123 130 return ··· 127 134 return 128 135 } 129 136 130 - ctx := context.WithValue(r.Context(), stringCtxKey{}, string) 137 + ctx := context.WithValue(r.Context(), stringCtxKey{}, str) 131 138 next.ServeHTTP(w, r.WithContext(ctx)) 132 139 }) 133 140 } ··· 170 177 l := s.Logger.With("handler", "SingleString") 171 178 ctx := r.Context() 172 179 173 - string, ok := stringFromContext(ctx) 180 + str, ok := stringFromContext(ctx) 174 181 if !ok { 175 182 l.Error("malformed middleware. string missing") 176 183 s.Pages.Error404(w) 177 184 return 178 185 } 179 186 180 - starCount, err := db.GetStarCount(s.Db, models.StarSubjectString, string.AtUri().String()) 187 + starCount, err := db.GetStarCount(s.Db, models.StarSubjectString, str.AtUri().String()) 181 188 if err != nil { 182 189 l.Error("failed to get star count", "err", err) 183 190 } 184 191 user := s.OAuth.GetMultiAccountUser(r) 185 192 isStarred := false 186 193 if user != nil { 187 - isStarred = db.GetStarStatus(s.Db, user.Did, string.AtUri().String()) 194 + isStarred = db.GetStarStatus(s.Db, user.Did, str.AtUri().String()) 188 195 } 189 196 190 - comments, err := db.GetComments(s.Db, orm.FilterEq("subject_uri", string.AtUri())) 197 + comments, err := db.GetComments(s.Db, orm.FilterEq("subject_uri", str.AtUri())) 191 198 if err != nil { 192 199 l.Error("failed to get comments", "err", err) 193 200 } ··· 223 230 224 231 var files []pages.StringFileFragmentParams 225 232 226 - if string.IsLegacySingleFile() { 233 + if str.IsLegacySingleFile() { 227 234 files = []pages.StringFileFragmentParams{ 228 - s.makeFileFragmentParams(&string, string.FileName, string.FileContent, false), 235 + s.makeFileFragmentParams(&str, str.FileName, str.FileContent, false), 229 236 } 230 237 } else { 231 - files = make([]pages.StringFileFragmentParams, len(string.Files)) 232 - for i, file := range string.Files { 233 - // TODO: read blob 234 - content := "" 235 - files[i] = s.makeFileFragmentParams(&string, file.Name, content, false) 238 + files = make([]pages.StringFileFragmentParams, len(str.Files)) 239 + for i, file := range str.Files { 240 + var content string 241 + if file.Gzip != nil { 242 + content = file.Gzip.Content 243 + } else { 244 + blob, err := s.BlobStore.GetBlob(r.Context(), str.Did, cid.Cid(file.Content.Ref)) 245 + if err != nil { 246 + l.Warn("failed to fetch blob", "err", err) 247 + http.NotFound(w, r) 248 + return 249 + } 250 + defer blob.Close() 251 + 252 + contentBytes, err := io.ReadAll(blob) 253 + if err != nil { 254 + l.Error("failed to read blob", "err", err) 255 + } 256 + content = string(contentBytes) 257 + } 258 + 259 + files[i] = s.makeFileFragmentParams(&str, file.Name, content, false) 236 260 } 237 261 } 238 262 239 263 err = s.Pages.SingleString(w, pages.SingleStringParams{ 240 264 BaseParams: pages.BaseParamsFromContext(r.Context()), 241 - String: &string, 265 + String: &str, 242 266 FileParams: files, 243 267 IsStarred: isStarred, 244 268 StarCount: starCount, ··· 288 312 } else { 289 313 files = make([]pages.StringFileEditFragmentParams, len(oldString.Files)) 290 314 for i, file := range oldString.Files { 291 - // TODO: read blob 292 - content := "" 315 + var content string 316 + if file.Gzip != nil { 317 + content = file.Gzip.Content 318 + } else { 319 + blob, err := s.BlobStore.GetBlob(r.Context(), oldString.Did, cid.Cid(file.Content.Ref)) 320 + if err != nil { 321 + l.Warn("failed to fetch blob", "err", err) 322 + http.NotFound(w, r) 323 + return 324 + } 325 + defer blob.Close() 326 + 327 + contentBytes, err := io.ReadAll(blob) 328 + if err != nil { 329 + l.Error("failed to read blob", "err", err) 330 + } 331 + content = string(contentBytes) 332 + } 293 333 files[i] = pages.StringFileEditFragmentParams{ 294 334 Name: file.Name, 295 335 Content: content, 296 - Size: uint64(file.Blob.Size), 336 + Size: uint64(file.Content.Size), 297 337 } 298 338 } 299 339 } ··· 333 373 return 334 374 } 335 375 336 - newString := oldString 337 - newString.Title = title 338 - newString.Description = description 339 - newString.FileName = filename 340 - newString.FileContent = content 341 - 342 376 client, err := s.OAuth.AuthorizedClient(r) 343 377 if err != nil { 344 378 fail("Failed to create record.", err) 345 379 return 346 380 } 347 381 382 + blob, err := xrpc.RepoUploadBlob(ctx, client, gz(content), ApplicationGzip) 383 + if err != nil { 384 + fail("Failed to create record.", err) 385 + return 386 + } 387 + 388 + newString := oldString 389 + newString.Title = title 390 + newString.Description = description 391 + newString.Files = []models.String_File{ 392 + { 393 + Name: filename, 394 + Content: *blob.Blob, 395 + Gzip: &models.String_GzipInfo{ 396 + String_File_Gzip: tangled.String_File_Gzip{ 397 + RealMime: "text/plain", 398 + RealSize: int64(len(content)), 399 + }, 400 + Content: content, 401 + }, 402 + }, 403 + } 404 + 348 405 // first replace the existing record in the PDS 349 406 var exCid string 350 407 if newString.Cid != nil { ··· 390 447 391 448 func (s *Strings) create(w http.ResponseWriter, r *http.Request) { 392 449 l := s.Logger.With("handler", "create") 450 + ctx := r.Context() 393 451 user := s.OAuth.GetMultiAccountUser(r) 394 452 395 453 switch r.Method { ··· 428 486 return 429 487 } 430 488 431 - string := models.String{ 432 - Did: syntax.DID(user.Did), 433 - Rkey: syntax.RecordKey(tid.TID()), 434 - Title: title, 435 - Description: description, 436 - FileName: filename, 437 - FileContent: content, 438 - Created: time.Now(), 489 + client, err := s.OAuth.AuthorizedClient(r) 490 + if err != nil { 491 + fail("Failed to create record.", err) 492 + return 439 493 } 440 494 441 - client, err := s.OAuth.AuthorizedClient(r) 495 + blob, err := xrpc.RepoUploadBlob(ctx, client, gz(content), ApplicationGzip) 442 496 if err != nil { 443 497 fail("Failed to create record.", err) 444 498 return 445 499 } 446 500 447 - resp, err := comatproto.RepoPutRecord(r.Context(), client, &atproto.RepoPutRecord_Input{ 501 + newString := models.String{ 502 + Did: syntax.DID(user.Did), 503 + Rkey: syntax.RecordKey(tid.TID()), 504 + Title: title, 505 + Description: description, 506 + Files: []models.String_File{ 507 + { 508 + Name: filename, 509 + Content: *blob.Blob, 510 + Gzip: &models.String_GzipInfo{ 511 + String_File_Gzip: tangled.String_File_Gzip{ 512 + RealMime: "text/plain", 513 + RealSize: int64(len(content)), 514 + }, 515 + Content: content, 516 + }, 517 + }, 518 + }, 519 + Created: time.Now(), 520 + } 521 + 522 + resp, err := comatproto.RepoPutRecord(ctx, client, &atproto.RepoPutRecord_Input{ 448 523 Collection: tangled.StringNSID, 449 - Repo: string.Did.String(), 450 - Rkey: string.Rkey.String(), 451 - Record: &lexutil.LexiconTypeDecoder{Val: string.AsRecord()}, 524 + Repo: newString.Did.String(), 525 + Rkey: newString.Rkey.String(), 526 + Record: &lexutil.LexiconTypeDecoder{Val: newString.AsRecord()}, 452 527 }) 453 528 if err != nil { 454 529 fail("Failed to create record.", err) 455 530 return 456 531 } 457 532 l := l.With("aturi", resp.Uri) 458 - l.Info("created record") 533 + l.Info("created record", "files", len(newString.Files)) 459 534 460 535 // insert into DB 461 - if err = db.AddString(s.Db, string); err != nil { 536 + if err = db.AddString(s.Db, newString); err != nil { 462 537 fail("Failed to create string.", err) 463 538 return 464 539 } 465 540 466 - s.Notifier.NewString(r.Context(), &string) 541 + s.Notifier.NewString(ctx, &newString) 467 542 468 543 // successful 469 - s.Pages.HxRedirect(w, fmt.Sprintf("/strings/%s/%s", string.Did, string.Rkey)) 544 + s.Pages.HxRedirect(w, fmt.Sprintf("/strings/%s/%s", newString.Did, newString.Rkey)) 470 545 } 471 546 } 472 547 ··· 533 608 l := s.Logger.With("handler", "FileRaw") 534 609 ctx := r.Context() 535 610 536 - string, ok := stringFromContext(ctx) 611 + str, ok := stringFromContext(ctx) 537 612 if !ok { 538 613 l.Error("malformed middleware. string missing") 539 614 s.Pages.Error404(w) ··· 541 616 } 542 617 filename := chi.URLParam(r, "filename") 543 618 544 - if string.IsLegacySingleFile() { 545 - if filename != string.FileName { 619 + if str.IsLegacySingleFile() { 620 + if filename != str.FileName { 546 621 http.NotFound(w, r) 547 622 return 548 623 } 549 624 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 550 625 w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", filename)) 551 - w.Header().Set("Content-Length", strconv.Itoa(len(string.FileContent))) 552 - _, err := w.Write([]byte(string.FileContent)) 626 + w.Header().Set("Content-Length", strconv.Itoa(len(str.FileContent))) 627 + _, err := w.Write([]byte(str.FileContent)) 553 628 if err != nil { 554 629 l.Error("failed to write raw response", "err", err) 555 630 } 556 631 } else { 557 - file, ok := string.FileByName(filename) 632 + file, ok := str.FileByName(filename) 558 633 if !ok { 559 634 http.NotFound(w, r) 560 635 return 561 636 } 562 637 563 - content := "" 638 + mimeType := file.Content.MimeType 639 + size := file.Content.Size 640 + 641 + var reader io.Reader 642 + if file.Gzip != nil { 643 + reader = strings.NewReader(file.Gzip.Content) 644 + } else { 645 + blob, err := s.BlobStore.GetBlob(r.Context(), str.Did, cid.Cid(file.Content.Ref)) 646 + if err != nil { 647 + l.Warn("failed to fetch blob", "err", err) 648 + http.NotFound(w, r) 649 + return 650 + } 651 + defer blob.Close() 652 + reader = blob 653 + } 564 654 565 - w.Header().Set("Content-Type", "text/plain; charset=utf-8") 655 + w.Header().Set("Content-Type", mimeType) 566 656 w.Header().Set("Content-Disposition", fmt.Sprintf("inline; filename=%q", filename)) 567 - w.Header().Set("Content-Length", strconv.FormatInt(file.Blob.Size, 10)) 568 - _, err := w.Write([]byte(content)) 569 - if err != nil { 657 + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) 658 + if _, err := io.Copy(w, reader); err != nil { 570 659 l.Error("failed to write raw response", "err", err) 571 660 } 572 661 } ··· 611 700 l := s.Logger.With("handler", "FileFragment") 612 701 ctx := r.Context() 613 702 614 - string, ok := stringFromContext(ctx) 703 + str, ok := stringFromContext(ctx) 615 704 if !ok { 616 705 l.Error("malformed middleware. string missing") 617 706 http.NotFound(w, r) ··· 621 710 forceCode := r.URL.Query().Get("code") == "true" 622 711 623 712 var params pages.StringFileFragmentParams 624 - if string.IsLegacySingleFile() { 625 - if filename != string.FileName { 713 + if str.IsLegacySingleFile() { 714 + if filename != str.FileName { 626 715 http.NotFound(w, r) 627 716 return 628 717 } 629 - params = s.makeFileFragmentParams(&string, string.FileName, string.FileContent, forceCode) 718 + params = s.makeFileFragmentParams(&str, str.FileName, str.FileContent, forceCode) 630 719 } else { 631 - file, ok := string.FileByName(filename) 720 + file, ok := str.FileByName(filename) 632 721 if !ok { 633 722 l.Error("malformed middleware. string missing") 634 723 http.NotFound(w, r) 635 724 return 636 725 } 637 726 638 - // TODO: read blob 639 - content := "" 727 + var content string 728 + if file.Gzip != nil && file.Gzip.Content != "" { 729 + content = file.Gzip.Content 730 + } else { 731 + blob, err := s.BlobStore.GetBlob(r.Context(), str.Did, cid.Cid(file.Content.Ref)) 732 + if err != nil { 733 + l.Warn("failed to fetch blob", "err", err) 734 + http.NotFound(w, r) 735 + return 736 + } 737 + defer blob.Close() 640 738 641 - params = s.makeFileFragmentParams(&string, file.Name, content, forceCode) 739 + contentBytes, err := io.ReadAll(blob) 740 + if err != nil { 741 + l.Error("failed to read blob", "err", err) 742 + } 743 + content = string(contentBytes) 744 + } 745 + 746 + params = s.makeFileFragmentParams(&str, file.Name, content, forceCode) 642 747 } 643 748 s.Pages.StringFileFragment(w, params) 644 749 } ··· 653 758 return "", err 654 759 } 655 760 656 - xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 761 + xrpcc := indigoxrpc.Client{Host: ident.PDSEndpoint()} 657 762 out, err := agnostic.RepoGetRecord(ctx, &xrpcc, "", uri.Collection().String(), ident.DID.String(), uri.RecordKey().String()) 658 763 if err != nil { 659 764 return "", err ··· 669 774 670 775 return cid, nil 671 776 } 777 + 778 + func gz(s string) io.Reader { 779 + var b bytes.Buffer 780 + w := gzip.NewWriter(&b) 781 + w.Write([]byte(s)) 782 + w.Close() 783 + return &b 784 + }
+13
blobstore/blobstore.go
··· 1 + package blobstore 2 + 3 + import ( 4 + "context" 5 + "io" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + "github.com/ipfs/go-cid" 9 + ) 10 + 11 + type BlobStore interface { 12 + GetBlob(ctx context.Context, did syntax.DID, cid cid.Cid) (io.ReadCloser, error) 13 + }
+46
blobstore/pds.go
··· 1 + package blobstore 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "net/http" 8 + "net/url" 9 + 10 + "github.com/bluesky-social/indigo/atproto/identity" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/ipfs/go-cid" 13 + ) 14 + 15 + type Pds struct { 16 + dir identity.Directory 17 + } 18 + 19 + func NewPdsBlobStore(dir identity.Directory) *Pds { 20 + return &Pds{dir} 21 + } 22 + 23 + func (s *Pds) GetBlob(ctx context.Context, did syntax.DID, cid cid.Cid) (io.ReadCloser, error) { 24 + id, err := s.dir.LookupDID(ctx, did) 25 + if err != nil { 26 + return nil, err 27 + } 28 + 29 + url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", id.PDSEndpoint())) 30 + q := url.Query() 31 + q.Set("did", did.String()) 32 + q.Set("cid", cid.String()) 33 + url.RawQuery = q.Encode() 34 + 35 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) 36 + resp, err := http.DefaultClient.Do(req) 37 + if err != nil { 38 + return nil, err 39 + } 40 + 41 + if resp.StatusCode != http.StatusOK { 42 + return nil, fmt.Errorf("unexpected status: %s", resp.Status) 43 + } 44 + 45 + return resp.Body, nil 46 + }