···77use anyhow::{anyhow, bail, Result};
88use bincode::Options as BincodeOptions;
99use links::CollectedLink;
1010-use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
1010+use metrics::{counter, histogram};
1111use ratelimit::Ratelimiter;
1212use rocksdb::backup::{BackupEngine, BackupEngineOptions};
1313use rocksdb::{
···256256257257impl RocksStorage {
258258 pub fn new(path: impl AsRef<Path>) -> Result<Self> {
259259- Self::describe_metrics();
260259 let me = RocksStorage::open_readmode(path, false)?;
261260 me.global_init()?;
262261 Ok(me)
···308307 }
309308310309 fn global_init(&self) -> Result<()> {
311311- let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
312312- if first_run {
310310+ if self.db.get(STARTED_AT_KEY)?.is_none() {
313311 self.db.put(STARTED_AT_KEY, _rv(now()))?;
314314-315315- // hack / temporary: if we're a new db, put in a completed repair
316316- // state so we don't run repairs (repairs are for old-code dbs)
317317- let completed = TargetIdRepairState {
318318- id_when_started: 0,
319319- current_us_started_at: 0,
320320- latest_repaired_i: 0,
321321- };
322322- self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
323312 }
324313 Ok(())
325314 }
326315327327- pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
328328- let mut state = match self
329329- .db
330330- .get(TARGET_ID_REPAIR_STATE_KEY)?
331331- .map(|s| _vr(&s))
332332- .transpose()?
333333- {
334334- Some(s) => s,
335335- None => TargetIdRepairState {
336336- id_when_started: self.did_id_table.priv_id_seq,
337337- current_us_started_at: now(),
338338- latest_repaired_i: 0,
339339- },
340340- };
341341-342342- eprintln!("initial repair state: {state:?}");
343343-344344- let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
345345-346346- let mut iter = self.db.raw_iterator_cf(&cf);
347347- iter.seek_to_first();
348348-349349- eprintln!("repair iterator sent to first key");
350350-351351- // skip ahead if we're done some, or take a single first step
352352- for _ in 0..state.latest_repaired_i {
353353- iter.next();
316316+ pub fn reset_start(&self) -> Result<bool> {
317317+ let existing = self.db.get(STARTED_AT_KEY)?;
318318+ if existing.is_none() {
319319+ bail!("not resetting started-at key because one wasn't set");
354320 }
355355-356356- eprintln!(
357357- "repair iterator skipped to {}th key",
358358- state.latest_repaired_i
359359- );
360360-361361- let mut maybe_done = false;
362362-363363- let mut write_fast = rocksdb::WriteOptions::default();
364364- write_fast.set_sync(false);
365365- write_fast.disable_wal(true);
366366-367367- while !stay_alive.is_cancelled() && !maybe_done {
368368- // let mut batch = WriteBatch::default();
369369-370370- let mut any_written = false;
371371-372372- for _ in 0..1000 {
373373- if state.latest_repaired_i % 1_000_000 == 0 {
374374- eprintln!("target iter at {}", state.latest_repaired_i);
375375- }
376376- state.latest_repaired_i += 1;
377377-378378- if !iter.valid() {
379379- eprintln!("invalid iter, are we done repairing?");
380380- maybe_done = true;
381381- break;
382382- };
383383-384384- // eprintln!("iterator seems to be valid! getting the key...");
385385- let raw_key = iter.key().unwrap();
386386- if raw_key.len() == 8 {
387387- // eprintln!("found an 8-byte key, skipping it since it's probably an id...");
388388- iter.next();
389389- continue;
390390- }
391391- let target: TargetKey = _kr::<TargetKey>(raw_key)?;
392392- let target_id: TargetId = _vr(iter.value().unwrap())?;
393393-394394- self.db
395395- .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
396396- any_written = true;
397397- iter.next();
398398- }
399399-400400- if any_written {
401401- self.db
402402- .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
403403- std::thread::sleep(breather);
404404- }
405405- }
406406-407407- eprintln!("repair iterator done.");
408408-409409- Ok(false)
321321+ self.db.put(STARTED_AT_KEY, _rv(COZY_FIRST_CURSOR))?;
322322+ println!("started-at key reset to {COZY_FIRST_CURSOR}");
323323+ Ok(true)
410324 }
411325412326 pub fn start_backup(
···503417 BackupEngine::open(&BackupEngineOptions::new(path)?, &rocksdb::Env::new()?)?;
504418 engine.purge_old_backups(num_backups_to_keep)?;
505419 Ok(())
506506- }
507507-508508- fn describe_metrics() {
509509- describe_histogram!(
510510- "storage_rocksdb_read_seconds",
511511- Unit::Seconds,
512512- "duration of the read stage of actions"
513513- );
514514- describe_histogram!(
515515- "storage_rocksdb_action_seconds",
516516- Unit::Seconds,
517517- "duration of read + write of actions"
518518- );
519519- describe_counter!(
520520- "storage_rocksdb_batch_ops_total",
521521- Unit::Count,
522522- "total batched operations from actions"
523523- );
524524- describe_histogram!(
525525- "storage_rocksdb_delete_account_ops",
526526- Unit::Count,
527527- "total batched ops for account deletions"
528528- );
529420 }
530421531422 fn merge_op_extend_did_ids(
···827718impl Drop for RocksStorage {
828719 fn drop(&mut self) {
829720 if self.is_writer {
721721+ // TODO: cloning a writer is possible and currently breaks things
722722+ // (constellation code currently doesn't/shouldn't clone the writer)
830723 println!("rocksdb writer: cleaning up for shutdown...");
831724 if let Err(e) = self.db.flush_wal(true) {
832725 eprintln!("rocks: flushing wal failed: {e:?}");
···17951688 }
1796168917971690 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...)
16911691+16921692+ #[test]
16931693+ fn rocks_started_at_persists_across_opens() -> Result<()> {
16941694+ let dir = tempdir()?;
16951695+16961696+ let mut store = RocksStorage::new(dir.path())?;
16971697+ store.push(
16981698+ &ActionableEvent::CreateLinks {
16991699+ record_id: RecordId {
17001700+ did: "did:plc:asdf".into(),
17011701+ collection: "a.b.c".into(),
17021702+ rkey: "asdf".into(),
17031703+ },
17041704+ links: vec![CollectedLink {
17051705+ target: Link::Uri("e.com".into()),
17061706+ path: ".uri".into(),
17071707+ }],
17081708+ },
17091709+ 0,
17101710+ )?;
17111711+ let first = store.get_stats()?.started_at;
17121712+ drop(store);
17131713+17141714+ std::thread::sleep(Duration::from_millis(5));
17151715+17161716+ let store = RocksStorage::new(dir.path())?;
17171717+ let second = store.get_stats()?.started_at;
17181718+17191719+ assert_eq!(first, second, "STARTED_AT must not change across opens");
17201720+ Ok(())
17211721+ }
17981722}
+7-1
constellation/templates/hello.html.j2
···243243244244 <p>A DID like <code>did:plc:hdhoaan3xa3jiuq4fg4mefid</code>, or an AT-URI like <code>at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3lgu4lg6j2k2v</code>, or a URI like <code>https://example.com</code>.</p>
245245246246+ <h3>Source</h3>
247247+248248+ <p>A <em>link source</em>, made of a collection and path, like <code>app.bsky.feed.like:subject<code>. The <code>:</code> separates them.
249249+246250 <h3>Collection</h3>
247251248252 <p>A record NSID like <code>app.bsky.feed.like</code>.</p>
249253250254 <h3>Path</h3>
251255252252- <p>A (currently-very-very-hacky) json-path-ish representation of the source of a link in a record. Records may contain multiple links with different meanings, so this specifies which specific link is of interest. Like <code>.subject.uri</code>.</p>
256256+ <p>A (currently-hacky) json-path-ish representation of the source of a link in a record. Records may contain multiple links with different meanings, so this specifies which specific link is of interest. Like <code>.subject.uri</code>.</p>
257257+258258+ <p>A special path, <code>.</code>, represents <em>the record's <code>rkey</code></em>. Tangled's vouch system puts the vouch subject in the <code>rkey</code> instead of inside the actual record. Its link source looks like this: <code>sh.tangled.graph.vouch:.</code></p>
253259254260 <h3>Cursor</h3>
255261
+7-1
jetstream/src/error.rs
···2626 #[error("failed to construct url: {0}")]
2727 InvalidEndpointUri(#[from] tokio_tungstenite::tungstenite::http::uri::InvalidUri),
2828 #[error("failed to connect to Jetstream instance: {0}")]
2929- WebSocketFailure(#[from] tokio_tungstenite::tungstenite::Error),
2929+ WebSocketFailure(Box<tokio_tungstenite::tungstenite::Error>),
3030 #[error("the Jetstream config is invalid (this really should not happen here): {0}")]
3131 InvalidConfig(#[from] ConfigValidationError),
3232+}
3333+3434+impl From<tokio_tungstenite::tungstenite::Error> for ConnectionError {
3535+ fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
3636+ Self::WebSocketFailure(Box::new(e))
3737+ }
3238}
33393440/// Possible errors that can occur when receiving events from a Jetstream instance over WebSockets.