alpha
Login
or
Join now
microcosm.blue
/
microcosm-rs
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
Star
0
Fork
3
Atom
Configure Feed
Issues
Pull Requests
Commits
Tags
Feed URL
Select the types of activity you want to include in your feed.
Overview
Issues
Pulls
Pipelines
whatever, ufos can have tests sure
author
phil
date
1 year ago
(Apr 5, 2025, 1:58 PM -0400)
commit
52ff3eaa
52ff3eaa83b8e4842baa2b2441df34211664ed2f
parent
2e8fb71f
2e8fb71faf2b15e727b1d1abf0d46ac0b2678daf
+63
-23
5 changed files
Expand all
Collapse all
Unified
Split
Cargo.lock
ufos
Cargo.toml
src
lib.rs
main.rs
storage_fjall.rs
+1
Cargo.lock
Reviewed
···
3784
3784
"semver",
3785
3785
"serde",
3786
3786
"serde_json",
3787
3787
+
"tempfile",
3787
3788
"thiserror 2.0.12",
3788
3789
"tikv-jemallocator",
3789
3790
"tokio",
+3
ufos/Cargo.toml
Reviewed
···
23
23
24
24
[target.'cfg(not(target_env = "msvc"))'.dependencies]
25
25
tikv-jemallocator = "0.6.0"
26
26
+
27
27
+
[dev-dependencies]
28
28
+
tempfile = "3.19.1"
+4
ufos/src/lib.rs
Reviewed
···
2
2
pub mod db_types;
3
3
pub mod error;
4
4
pub mod server;
5
5
+
pub mod storage;
5
6
pub mod storage_fjall;
6
7
pub mod store_types;
7
8
···
121
122
} else {
122
123
None
123
124
}
125
125
+
}
126
126
+
pub fn is_empty(&self) -> bool {
127
127
+
self.commits_by_nsid.is_empty() && self.account_removes.is_empty()
124
128
}
125
129
}
+6
-5
ufos/src/main.rs
Reviewed
···
1
1
-
use ufos::error::StorageError;
2
2
-
use ufos::storage_fjall::StoreWriter;
3
3
-
use ufos::storage_fjall::StorageWhatever;
4
1
use clap::Parser;
5
2
use std::path::PathBuf;
3
3
+
use ufos::error::StorageError;
4
4
+
use ufos::storage::{StorageWhatever, StoreWriter};
5
5
+
use ufos::storage_fjall::FjallStorage;
6
6
use ufos::{consumer, storage_fjall};
7
7
8
8
#[cfg(not(target_env = "msvc"))]
···
47
47
let args = Args::parse();
48
48
let jetstream = args.jetstream.clone();
49
49
let (_read_store, mut write_store, cursor) =
50
50
-
storage_fjall::FjallStorage::init(args.data, jetstream, args.jetstream_force)?;
50
50
+
FjallStorage::init(args.data, jetstream, args.jetstream_force, Default::default())?;
51
51
52
52
// println!("starting server with storage...");
53
53
// let serving = server::serve(storage.clone());
···
69
69
70
70
tokio::task::spawn_blocking(move || {
71
71
while let Some(event_batch) = batches.blocking_recv() {
72
72
-
write_store.insert_batch(event_batch)?
72
72
+
write_store.insert_batch(event_batch)?;
73
73
+
write_store.step_rollup()?;
73
74
}
74
75
Ok::<(), StorageError>(())
75
76
}).await??;
+49
-18
ufos/src/storage_fjall.rs
Reviewed
···
1
1
+
use crate::storage::{StorageWhatever, StoreReader, StoreWriter};
1
2
use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr};
2
3
use crate::store_types::{
3
4
ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue,
···
115
116
* TODO: moderation actions
116
117
* TODO: account privacy preferences. Might wait for the protocol-level (PDS-level?) stuff to land. Will probably do lazy fetching + caching on read.
117
118
**/
118
118
-
pub trait StorageWhatever { // TODO: extract this
119
119
-
fn init(
120
120
-
path: impl AsRef<Path>,
121
121
-
endpoint: String,
122
122
-
force_endpoint: bool,
123
123
-
) -> Result<(impl StoreReader, impl StoreWriter, Option<Cursor>), StorageError> where Self: Sized;
124
124
-
}
119
119
+
#[derive(Debug)]
120
120
+
pub struct FjallStorage {}
125
121
126
126
-
pub trait StoreWriter {
127
127
-
fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError>;
122
122
+
#[derive(Debug, Default)]
123
123
+
pub struct FjallConfig {
124
124
+
/// drop the db when the storage is dropped
125
125
+
///
126
126
+
/// this is only meant for tests
127
127
+
#[cfg(test)]
128
128
+
pub temp: bool,
128
129
}
129
130
130
130
-
pub trait StoreReader: Clone {}
131
131
-
132
132
-
pub struct FjallStorage {}
133
133
-
impl StorageWhatever for FjallStorage {
131
131
+
impl StorageWhatever<FjallReader, FjallWriter, FjallConfig> for FjallStorage {
134
132
fn init(
135
133
path: impl AsRef<Path>,
136
134
endpoint: String,
137
135
force_endpoint: bool,
138
138
-
) -> Result<(impl StoreReader, impl StoreWriter, Option<Cursor>), StorageError> {
139
139
-
let keyspace = Config::new(path).fsync_ms(Some(4_000)).open()?;
136
136
+
_config: FjallConfig,
137
137
+
) -> Result<(FjallReader, FjallWriter, Option<Cursor>), StorageError> {
138
138
+
let keyspace = {
139
139
+
let config = Config::new(path);
140
140
+
141
141
+
#[cfg(not(test))]
142
142
+
let config = config.fsync_ms(Some(4_000));
143
143
+
144
144
+
config.open()?
145
145
+
};
140
146
141
147
let global = keyspace.open_partition("global", PartitionCreateOptions::default())?;
142
148
let feeds = keyspace.open_partition("feeds", PartitionCreateOptions::default())?;
···
266
272
267
273
impl StoreWriter for FjallWriter {
268
274
fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError> {
275
275
+
if event_batch.is_empty() {
276
276
+
return Ok(())
277
277
+
}
278
278
+
269
279
let mut batch = self.keyspace.batch();
270
280
271
281
// would be nice not to have to iterate everything at once here
···
331
341
);
332
342
333
343
batch.commit()?;
334
334
-
335
335
-
eprintln!("ok stepping rollup now...");
336
336
-
self.step_rollup()?;
337
344
Ok(())
338
345
}
339
346
}
···
1100
1107
batch.latest_cursor().map(|c| c.elapsed()),
1101
1108
)
1102
1109
}
1110
1110
+
1111
1111
+
1112
1112
+
1113
1113
+
#[cfg(test)]
1114
1114
+
mod tests {
1115
1115
+
use super::*;
1116
1116
+
1117
1117
+
#[test]
1118
1118
+
fn test_hello() -> anyhow::Result<()> {
1119
1119
+
// let db_path = tempfile::tempdir()?;
1120
1120
+
let (_read, mut write, _) = FjallStorage::init(
1121
1121
+
tempfile::tempdir()?,
1122
1122
+
"offline test (no real jetstream endpoint)".to_string(),
1123
1123
+
false,
1124
1124
+
FjallConfig { temp: true },
1125
1125
+
)?;
1126
1126
+
1127
1127
+
write.insert_batch(EventBatch {
1128
1128
+
..Default::default()
1129
1129
+
})?;
1130
1130
+
1131
1131
+
Ok(())
1132
1132
+
}
1133
1133
+
}