···11+use clap::Parser;
22+use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
33+use std::path::PathBuf;
44+use tokio::{task::JoinSet, io::AsyncRead};
55+use repo_stream::{DriverBuilder, Driver, DiskBuilder, DiskStore, drive::NeedDisk};
66+77+88+type Result<T> = anyhow::Result<T>; //std::result::Result<T, Box<dyn std::error::Error>>;
99+1010+1111+#[derive(Debug, Parser)]
1212+struct Args {
1313+ #[arg(long)]
1414+ cars_folder: PathBuf,
1515+ #[arg(long)]
1616+ mem_workers: usize,
1717+ #[arg(long)]
1818+ disk_workers: usize,
1919+ #[arg(long)]
2020+ disk_folder: PathBuf,
2121+}
2222+2323+async fn get_cars(cars_folder: PathBuf, tx: async_channel::Sender<tokio::io::BufReader<tokio::fs::File>>) -> Result<()> {
2424+ let mut dir = tokio::fs::read_dir(cars_folder).await?;
2525+ while let Some(entry) = dir.next_entry().await? {
2626+ if !entry.file_type().await?.is_file() {
2727+ continue;
2828+ }
2929+ let reader = tokio::fs::File::open(&entry.path()).await?;
3030+ let reader = tokio::io::BufReader::new(reader);
3131+ tx.send(reader).await?;
3232+ }
3333+ Ok(())
3434+}
3535+3636+async fn drive_mem<R: AsyncRead + Unpin + Send + Sync + 'static>(
3737+ f: R,
3838+ disk_tx: &async_channel::Sender<NeedDisk<R, usize>>,
3939+) -> Result<Option<usize>> {
4040+ let mut n = 0;
4141+ match DriverBuilder::new()
4242+ .with_block_processor(|_| 0_usize) // don't care just counting records
4343+ .with_mem_limit_mb(32)
4444+ .load_car(f)
4545+ .await?
4646+ {
4747+ Driver::Memory(_commit, mut driver) => {
4848+ while let Some(chunk) = driver.next_chunk(512).await? {
4949+ n += chunk.len();
5050+ }
5151+ Ok(Some(n))
5252+ }
5353+ Driver::Disk(need_disk) => {
5454+ disk_tx.send(need_disk).await?;
5555+ Ok(None)
5656+ }
5757+ }
5858+}
5959+6060+async fn mem_worker<R: AsyncRead + Unpin + Send + Sync + 'static>(
6161+ car_rx: async_channel::Receiver<R>,
6262+ disk_tx: async_channel::Sender<NeedDisk<R, usize>>,
6363+ n: Arc<AtomicUsize>,
6464+) -> Result<()> {
6565+ while let Ok(f) = car_rx.recv().await {
6666+ let driven = match drive_mem(f, &disk_tx).await {
6767+ Ok(d) => d,
6868+ Err(e) => {
6969+ eprintln!("failed to drive mem: {e:?}. skipping...");
7070+ continue;
7171+ }
7272+ };
7373+ if let Some(drove) = driven {
7474+ n.fetch_add(drove, Ordering::Relaxed);
7575+ }
7676+ }
7777+ Ok(())
7878+}
7979+8080+async fn drive_disk<R: AsyncRead + Unpin>(
8181+ needed: NeedDisk<R, usize>,
8282+ store: DiskStore,
8383+) -> Result<(usize, DiskStore)> {
8484+ let (_commit, mut driver) = needed.finish_loading(store).await?;
8585+ let mut n = 0;
8686+ while let Some(chunk) = driver.next_chunk(512).await? {
8787+ n += chunk.len();
8888+ }
8989+ let store = driver.reset_store().await?;
9090+ Ok((n, store))
9191+}
9292+9393+async fn disk_worker<R: AsyncRead + Unpin>(
9494+ worker_id: usize,
9595+ disk_rx: async_channel::Receiver<NeedDisk<R, usize>>,
9696+ folder: PathBuf,
9797+ n: Arc<AtomicUsize>,
9898+ disk_workers_active: Arc<AtomicUsize>,
9999+) -> Result<()> {
100100+ let mut file = folder;
101101+ file.push(format!("disk-worker-{worker_id}.sqlite"));
102102+ let mut store = DiskBuilder::new()
103103+ .with_cache_size_mb(128)
104104+ .open(file.clone())
105105+ .await?;
106106+ while let Ok(needed) = disk_rx.recv().await {
107107+ let active = disk_workers_active.fetch_add(1, Ordering::AcqRel);
108108+ println!("-> disk workers active: {}", active + 1);
109109+ let drove = match drive_disk(needed, store).await {
110110+ Ok((d, s)) => {
111111+ store = s;
112112+ d
113113+ }
114114+ Err(e) => {
115115+ eprintln!("failed to drive disk: {e:?}. skipping...");
116116+ store = DiskBuilder::new()
117117+ .with_cache_size_mb(128)
118118+ .open(file.clone())
119119+ .await?;
120120+ continue;
121121+ }
122122+ };
123123+ n.fetch_add(drove, Ordering::Relaxed);
124124+ let were_active = disk_workers_active.fetch_sub(1, Ordering::AcqRel);
125125+ println!("<- disk workers active: {}", were_active - 1);
126126+ }
127127+ Ok(())
128128+}
129129+130130+131131+#[tokio::main]
132132+async fn main() -> Result<()> {
133133+ env_logger::init();
134134+135135+ let Args { cars_folder, disk_folder, disk_workers, mem_workers } = Args::parse();
136136+137137+ let mut set = JoinSet::<Result<()>>::new();
138138+139139+140140+ let (cars_tx, cars_rx) = async_channel::bounded(2);
141141+ set.spawn(get_cars(cars_folder, cars_tx));
142142+143143+ let n: Arc<AtomicUsize> = Arc::new(0.into());
144144+ let disk_workers_active: Arc<AtomicUsize> = Arc::new(0.into());
145145+146146+ set.spawn({
147147+ let n = n.clone();
148148+ let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
149149+ async move {
150150+ let mut last_n = n.load(Ordering::Relaxed);
151151+ loop {
152152+ interval.tick().await;
153153+ let n = n.load(Ordering::Relaxed);
154154+ let diff = n - last_n;
155155+ println!("rate: {} rec/sec", diff / 10);
156156+ if diff == 0 {
157157+ println!("zero encountered, stopping rate calculation polling.");
158158+ break Ok(());
159159+ }
160160+ last_n = n;
161161+ }
162162+ }
163163+ });
164164+165165+166166+ let (needs_disk_tx, needs_disk_rx) = async_channel::bounded(disk_workers);
167167+168168+169169+ for _ in 0..mem_workers {
170170+ set.spawn(mem_worker(cars_rx.clone(), needs_disk_tx.clone(), n.clone()));
171171+ }
172172+ drop(cars_rx);
173173+ drop(needs_disk_tx);
174174+175175+ tokio::fs::create_dir_all(disk_folder.clone()).await?;
176176+ for id in 0..disk_workers {
177177+ set.spawn(disk_worker(
178178+ id,
179179+ needs_disk_rx.clone(),
180180+ disk_folder.clone(),
181181+ n.clone(),
182182+ disk_workers_active.clone(),
183183+ ));
184184+ }
185185+ drop(needs_disk_rx);
186186+187187+ while let Some(res) = set.join_next().await {
188188+ println!("task from set joined: {res:?}");
189189+ }
190190+191191+ eprintln!("total records processed: {n:?}");
192192+193193+ Ok(())
194194+}
+138
spacedust/src/bin/scrape_pds.rs
···11+use tokio::io::AsyncWriteExt;
22+use clap::Parser;
33+use std::path::PathBuf;
44+use reqwest::Url;
55+use tokio::{sync::mpsc, time};
66+use serde::Deserialize;
77+88+type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
99+1010+use futures::StreamExt;
1111+1212+#[derive(Debug, Parser)]
1313+struct Args {
1414+ #[arg(long)]
1515+ pds: Url,
1616+ #[arg(long)]
1717+ throttle_ms: u64, // 100ms per pds?
1818+ #[arg(long)]
1919+ folder: PathBuf,
2020+}
2121+2222+async fn download_repo(
2323+ client: &reqwest::Client,
2424+ mut pds: Url,
2525+ did: String,
2626+ mut path: PathBuf,
2727+) -> Result<()> {
2828+ path.push(format!("{did}.car"));
2929+ let f = tokio::fs::File::create(path).await?;
3030+ let mut w = tokio::io::BufWriter::new(f);
3131+3232+ pds.set_path("/xrpc/com.atproto.sync.getRepo");
3333+ pds.set_query(Some(&format!("did={did}")));
3434+ let mut byte_stream = client
3535+ .get(pds)
3636+ .send()
3737+ .await?
3838+ .bytes_stream();
3939+4040+ while let Some(stuff) = byte_stream.next().await {
4141+ tokio::io::copy(&mut stuff?.as_ref(), &mut w).await?;
4242+ }
4343+ w.flush().await?;
4444+4545+ Ok(())
4646+}
4747+4848+4949+#[derive(Debug, Deserialize)]
5050+struct RepoInfo {
5151+ did: String,
5252+ active: bool,
5353+}
5454+5555+#[derive(Debug, Deserialize)]
5656+struct ListReposResponse {
5757+ cursor: Option<String>,
5858+ repos: Vec<RepoInfo>,
5959+}
6060+6161+fn get_pds_dids(client: reqwest::Client, mut pds: Url) -> mpsc::Receiver<String> {
6262+ let (tx, rx) = mpsc::channel(2);
6363+ tokio::task::spawn(async move {
6464+ pds.set_path("/xrpc/com.atproto.sync.listRepos");
6565+ let mut cursor = None;
6666+6767+ loop {
6868+ if let Some(c) = cursor {
6969+ pds.set_query(Some(&format!("cursor={c}")));
7070+ }
7171+ let res: ListReposResponse = client
7272+ .get(pds.clone())
7373+ .send()
7474+ .await
7575+ .expect("to send request")
7676+ .error_for_status()
7777+ .expect("to be ok")
7878+ .json()
7979+ .await
8080+ .expect("json response");
8181+ for repo in res.repos {
8282+ if repo.active {
8383+ tx.send(repo.did).await.expect("to be able to send on the channel");
8484+ }
8585+ }
8686+ cursor = res.cursor;
8787+ if cursor.is_none() {
8888+ break;
8989+ }
9090+ }
9191+9292+ });
9393+ rx
9494+}
9595+9696+9797+#[tokio::main]
9898+async fn main() -> Result<()> {
9999+ env_logger::init();
100100+101101+ let Args { pds, throttle_ms, folder } = Args::parse();
102102+103103+ tokio::fs::create_dir_all(folder.clone()).await?;
104104+105105+ let client = reqwest::Client::builder()
106106+ .user_agent("microcosm/spacedust-testing")
107107+ .build()?;
108108+109109+ let mut dids = get_pds_dids(client.clone(), pds.clone());
110110+111111+ let mut interval = time::interval(time::Duration::from_millis(throttle_ms));
112112+ let mut oks = 0;
113113+ let mut single_fails = 0;
114114+ let mut double_fails = 0;
115115+116116+ while let Some(did) = dids.recv().await {
117117+ interval.tick().await;
118118+ println!("did: {did:?}");
119119+ if let Err(e) = download_repo(&client, pds.clone(), did.clone(), folder.clone()).await {
120120+ single_fails += 1;
121121+ eprintln!("failed to download repo for did: {did:?}: {e:?}. retrying in a moment...");
122122+ tokio::time::sleep(time::Duration::from_secs(3)).await;
123123+ interval.reset();
124124+ if let Err(e) = download_repo(&client, pds.clone(), did.clone(), folder.clone()).await {
125125+ double_fails += 1;
126126+ eprintln!("failed again: {e:?}. moving on in a moment...");
127127+ tokio::time::sleep(time::Duration::from_secs(1)).await;
128128+ continue;
129129+ }
130130+ }
131131+ oks += 1;
132132+ println!(" -> done. did: {did:?}");
133133+ }
134134+135135+ eprintln!("got {oks} repos. single fails: {single_fails}; doubles: {double_fails}.");
136136+137137+ Ok(())
138138+}
+1
spacedust/src/lib.rs
···33pub mod error;
44pub mod removable_delay_queue;
55pub mod server;
66+pub mod storage;
67pub mod subscriber;
7889use jetstream::events::CommitEvent;