···11+use clap::Parser;
22+use fjall::{Config, PartitionCreateOptions};
33+use std::path::PathBuf;
44+use std::time::Instant;
55+66+#[derive(Parser)]
77+#[command(about = "Run a major compaction over every ufos partition")]
88+struct Cli {
99+ /// path to the fjall data directory
1010+ ///
1111+ /// WARNING: MUST NOT RUN WHILE ANOTHER UFOS PROCESS IS USING IT
1212+ data: PathBuf,
1313+}
1414+1515+fn main() -> anyhow::Result<()> {
1616+ let cli = Cli::parse();
1717+1818+ eprintln!("opening db at {:?}...", cli.data);
1919+ let keyspace = Config::new(&cli.data).open()?;
2020+2121+ for name in ["global", "feeds", "records", "rollups", "queues"] {
2222+ let partition = keyspace.open_partition(name, PartitionCreateOptions::default())?;
2323+ let size0 = partition.disk_space();
2424+ eprintln!("beginning major compaction for {name} (original size: {size0})");
2525+ let t0 = Instant::now();
2626+ partition.major_compact()?;
2727+ let dt = t0.elapsed();
2828+ let sizef = partition.disk_space();
2929+ let dsize = (sizef as i64) - (size0 as i64);
3030+ eprintln!("completed compaction for {name} in {dt:?} (new size: {sizef}, {dsize})");
3131+ }
3232+3333+ Ok(())
3434+}
+2-22
ufos/src/main.rs
···99use ufos::file_consumer;
1010use ufos::server;
1111use ufos::storage::{StorageWhatever, StoreBackground, StoreReader, StoreWriter};
1212-use ufos::storage_fjall::{FjallConfig, FjallStorage};
1212+use ufos::storage_fjall::FjallStorage;
1313use ufos::store_types::SketchSecretPrefix;
1414use ufos::{nice_duration, ConsumerInfo};
1515···5959 /// DEBUG: interpret jetstream as a file fixture
6060 #[arg(long, action, env = "UFOS_JETSTREAM_FIXTURE")]
6161 jetstream_fixture: bool,
6262- /// HOPEFULLY only needed once
6363- ///
6464- /// brute-force garbage-collect all dangling records because we weren't deleting
6565- /// them before at all (oops)
6666- #[arg(long, action)]
6767- fjall_records_gc: bool,
6862 /// enable metrics collection and serving
6963 #[arg(long, action, env = "UFOS_COLLECT_METRICS")]
7064 collect_metrics: bool,
···8478 args.data.clone(),
8579 jetstream,
8680 args.jetstream_force,
8787- FjallConfig {
8888- major_compact: !args.fjall_records_gc,
8989- },
8181+ Default::default(),
9082 )?;
9191-9292- if args.fjall_records_gc {
9393- log::info!("beginning brute-force records gc");
9494- let t0 = std::time::Instant::now();
9595- let (n, m) = write_store.records_brute_gc_danger()?;
9696- let dt = t0.elapsed();
9797- log::info!(
9898- "completed brute-force records gc in {dt:?}, removed {n} and retained {m} records."
9999- );
100100- return Ok(());
101101- }
102102-10383 go(args, read_store, write_store, cursor, sketch_secret).await?;
10484 Ok(())
10585}
+2-87
ufos/src/storage_fjall.rs
···148148 /// this is only meant for tests
149149 #[cfg(test)]
150150 pub temp: bool,
151151- /// do major compaction on startup
152152- ///
153153- /// default is false. probably a good thing unless it's too slow.
154154- pub major_compact: bool,
155151}
156152157153impl StorageWhatever<FjallReader, FjallWriter, FjallBackground, FjallConfig> for FjallStorage {
···159155 path: impl AsRef<Path>,
160156 endpoint: String,
161157 force_endpoint: bool,
162162- config: FjallConfig,
158158+ _config: FjallConfig,
163159 ) -> StorageResult<(FjallReader, FjallWriter, Option<Cursor>, SketchSecretPrefix)> {
164160 let keyspace = {
165161 let config = Config::new(path);
···227223228224 sketch_secret
229225 };
230230-231231- if config.major_compact {
232232- for (partition, name) in [
233233- (&global, "global"),
234234- (&feeds, "feeds"),
235235- (&records, "records"),
236236- (&rollups, "rollups"),
237237- (&queues, "queues"),
238238- ] {
239239- let size0 = partition.disk_space();
240240- log::info!("beggining major compaction for {name} (original size: {size0})");
241241- let t0 = Instant::now();
242242- partition.major_compact().expect("compact better work 😬");
243243- let dt = t0.elapsed();
244244- let sizef = partition.disk_space();
245245- let dsize = (sizef as i64) - (size0 as i64);
246246- log::info!(
247247- "completed compaction for {name} in {dt:?} (new size: {sizef}, {dsize})"
248248- );
249249- }
250250- } else {
251251- log::info!("skipping major compaction on startup");
252252- }
253226254227 let reader = FjallReader {
255228 keyspace: keyspace.clone(),
···13811354 batch.commit()?;
13821355 Ok((cursors_advanced, dirty_nsids))
13831356 }
13841384- pub fn records_brute_gc_danger(&self) -> StorageResult<(usize, usize)> {
13851385- let (mut removed, mut retained) = (0, 0);
13861386- let mut to_retain = HashSet::<Vec<u8>>::new();
13871387-13881388- // Partition: 'feed'
13891389- //
13901390- // - Per-collection list of record references ordered by jetstream cursor
13911391- // - key: nullstr || u64 (collection nsid null-terminated, jetstream cursor)
13921392- // - val: nullstr || nullstr || nullstr (did, rkey, rev. rev is mostly a sanity-check for now.)
13931393- //
13941394- //
13951395- // Partition: 'records'
13961396- //
13971397- // - Actual records by their atproto location
13981398- // - key: nullstr || nullstr || nullstr (did, collection, rkey)
13991399- // - val: u64 || bool || nullstr || rawval (js_cursor, is_update, rev, actual record)
14001400- //
14011401- //
14021402-14031403- log::warn!("loading *all* record keys from feed into memory (yikes)");
14041404- let t0 = Instant::now();
14051405- for (i, kv) in self.feeds.iter().enumerate() {
14061406- if i > 0 && (i % 10_000_000 == 0) {
14071407- log::info!("{i}...");
14081408- }
14091409- let (key_bytes, val_bytes) = kv?;
14101410- let key = db_complete::<NsidRecordFeedKey>(&key_bytes)?;
14111411- let val = db_complete::<NsidRecordFeedVal>(&val_bytes)?;
14121412- let record_key: RecordLocationKey = (&key, &val).into();
14131413- to_retain.insert(record_key.to_db_bytes()?);
14141414- }
14151415- log::warn!(
14161416- "loaded. wow. took {:?}, found {} keys",
14171417- t0.elapsed(),
14181418- to_retain.len()
14191419- );
14201420-14211421- log::warn!("warmup OVER, iterating some billions of record keys now");
14221422- let t0 = Instant::now();
14231423- for (i, k) in self.records.keys().enumerate() {
14241424- let key_bytes = k?;
14251425- if to_retain.contains(&*key_bytes) {
14261426- retained += 1;
14271427- } else {
14281428- self.records.remove(key_bytes)?;
14291429- removed += 1;
14301430- }
14311431- if i > 0 && (i % 100_000_000) == 0 {
14321432- log::info!("{i}: {retained} retained, {removed} removed.");
14331433- }
14341434- }
14351435- log::warn!("whew! that took {:?}", t0.elapsed());
14361436-14371437- Ok((removed, retained))
14381438- }
14391357}
1440135814411359impl StoreWriter<FjallBackground> for FjallWriter {
···18871805 tempfile::tempdir().unwrap(),
18881806 "offline test (no real jetstream endpoint)".to_string(),
18891807 false,
18901890- FjallConfig {
18911891- temp: true,
18921892- ..Default::default()
18931893- },
18081808+ FjallConfig { temp: true },
18941809 )
18951810 .unwrap();
18961811 (read, write)