Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

bugs: days-since-start reset, metrics idle clear

+153 -179
+98 -76
Cargo.lock
··· 18 18 checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" 19 19 20 20 [[package]] 21 - name = "ahash" 22 - version = "0.8.11" 23 - source = "registry+https://github.com/rust-lang/crates.io-index" 24 - checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" 25 - dependencies = [ 26 - "cfg-if", 27 - "once_cell", 28 - "version_check", 29 - "zerocopy 0.7.35", 30 - ] 31 - 32 - [[package]] 33 21 name = "aho-corasick" 34 22 version = "1.1.3" 35 23 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 959 947 "links", 960 948 "mediatype", 961 949 "metrics", 962 - "metrics-exporter-prometheus 0.16.2", 950 + "metrics-exporter-prometheus 0.18.3", 963 951 "metrics-process", 952 + "metrics-util", 964 953 "num-format", 965 954 "ratelimit", 966 955 "rocksdb", ··· 1431 1420 ] 1432 1421 1433 1422 [[package]] 1423 + name = "endian-type" 1424 + version = "0.1.2" 1425 + source = "registry+https://github.com/rust-lang/crates.io-index" 1426 + checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" 1427 + 1428 + [[package]] 1434 1429 name = "enum-as-inner" 1435 1430 version = "0.6.1" 1436 1431 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1515 1510 ] 1516 1511 1517 1512 [[package]] 1513 + name = "evmap" 1514 + version = "11.0.0" 1515 + source = "registry+https://github.com/rust-lang/crates.io-index" 1516 + checksum = "1b8874945f036109c72242964c1174cf99434e30cfa45bf45fedc983f50046f8" 1517 + dependencies = [ 1518 + "hashbag", 1519 + "left-right", 1520 + "smallvec", 1521 + ] 1522 + 1523 + [[package]] 1518 1524 name = "fallible-iterator" 1519 1525 version = "0.3.0" 1520 1526 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1995 2001 ] 1996 2002 1997 2003 [[package]] 2004 + name = "hashbag" 2005 + version = "0.1.13" 2006 + source = "registry+https://github.com/rust-lang/crates.io-index" 2007 + checksum = "7040a10f52cba493ddb09926e15d10a9d8a28043708a405931fe4c6f19fac064" 2008 + 2009 + [[package]] 1998 2010 name = "hashbrown" 1999 2011 version = "0.12.3" 2000 2012 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2242 2254 2243 2255 [[package]] 2244 2256 name = "hyper" 2245 - version = "1.6.0" 2257 + version = "1.9.0" 2246 2258 source = "registry+https://github.com/rust-lang/crates.io-index" 2247 - checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" 2259 + checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" 2248 2260 dependencies = [ 2261 + "atomic-waker", 2249 2262 "bytes", 2250 2263 "futures-channel", 2251 - "futures-util", 2264 + "futures-core", 2252 2265 "h2", 2253 2266 "http", 2254 2267 "http-body", ··· 2766 2779 checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" 2767 2780 2768 2781 [[package]] 2782 + name = "left-right" 2783 + version = "0.11.7" 2784 + source = "registry+https://github.com/rust-lang/crates.io-index" 2785 + checksum = "0f0c21e4c8ff95f487fb34e6f9182875f42c84cef966d29216bf115d9bba835a" 2786 + dependencies = [ 2787 + "crossbeam-utils", 2788 + "loom", 2789 + "slab", 2790 + ] 2791 + 2792 + [[package]] 2769 2793 name = "libc" 2770 2794 version = "0.2.174" 2771 2795 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3036 3060 3037 3061 [[package]] 3038 3062 name = "metrics" 3039 - version = "0.24.2" 3063 + version = "0.24.5" 3040 3064 source = "registry+https://github.com/rust-lang/crates.io-index" 3041 - checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" 3065 + checksum = "ff56c2e7dce6bd462e3b8919986a617027481b1dcc703175b58cf9dd98a2f071" 3042 3066 dependencies = [ 3043 - "ahash", 3044 3067 "portable-atomic", 3068 + "rapidhash", 3045 3069 ] 3046 3070 3047 3071 [[package]] 3048 3072 name = "metrics-exporter-prometheus" 3049 - version = "0.16.2" 3073 + version = "0.17.2" 3050 3074 source = "registry+https://github.com/rust-lang/crates.io-index" 3051 - checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034" 3075 + checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" 3052 3076 dependencies = [ 3053 3077 "base64 0.22.1", 3054 3078 "http-body-util", 3055 3079 "hyper", 3080 + "hyper-rustls", 3056 3081 "hyper-util", 3057 3082 "indexmap 2.11.4", 3058 3083 "ipnet", 3059 3084 "metrics", 3060 - "metrics-util 0.19.0", 3085 + "metrics-util", 3061 3086 "quanta", 3062 - "thiserror 1.0.69", 3087 + "thiserror 2.0.18", 3063 3088 "tokio", 3064 3089 "tracing", 3065 3090 ] 3066 3091 3067 3092 [[package]] 3068 3093 name = "metrics-exporter-prometheus" 3069 - version = "0.17.2" 3094 + version = "0.18.3" 3070 3095 source = "registry+https://github.com/rust-lang/crates.io-index" 3071 - checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" 3096 + checksum = "1db0d8f1fc9e62caebd0319e11eaec5822b0186c171568f0480b46a0137f9108" 3072 3097 dependencies = [ 3073 3098 "base64 0.22.1", 3099 + "evmap", 3074 3100 "http-body-util", 3075 3101 "hyper", 3076 - "hyper-rustls", 3077 3102 "hyper-util", 3078 3103 "indexmap 2.11.4", 3079 3104 "ipnet", 3080 3105 "metrics", 3081 - "metrics-util 0.20.0", 3106 + "metrics-util", 3082 3107 "quanta", 3083 3108 "thiserror 2.0.18", 3084 3109 "tokio", ··· 3103 3128 3104 3129 [[package]] 3105 3130 name = "metrics-util" 3106 - version = "0.19.0" 3107 - source = "registry+https://github.com/rust-lang/crates.io-index" 3108 - checksum = "dbd4884b1dd24f7d6628274a2f5ae22465c337c5ba065ec9b6edccddf8acc673" 3109 - dependencies = [ 3110 - "crossbeam-epoch", 3111 - "crossbeam-utils", 3112 - "hashbrown 0.15.2", 3113 - "metrics", 3114 - "quanta", 3115 - "rand 0.8.5", 3116 - "rand_xoshiro 0.6.0", 3117 - "sketches-ddsketch", 3118 - ] 3119 - 3120 - [[package]] 3121 - name = "metrics-util" 3122 - version = "0.20.0" 3131 + version = "0.20.3" 3123 3132 source = "registry+https://github.com/rust-lang/crates.io-index" 3124 - checksum = "fe8db7a05415d0f919ffb905afa37784f71901c9a773188876984b4f769ab986" 3133 + checksum = "9e56997f084e57b045edf17c3ed8ba7f9f779c670df8206dfd1c736f4c02dc4a" 3125 3134 dependencies = [ 3135 + "aho-corasick", 3126 3136 "crossbeam-epoch", 3127 3137 "crossbeam-utils", 3128 - "hashbrown 0.15.2", 3138 + "hashbrown 0.16.1", 3139 + "indexmap 2.11.4", 3129 3140 "metrics", 3141 + "ordered-float", 3130 3142 "quanta", 3143 + "radix_trie", 3131 3144 "rand 0.9.3", 3132 - "rand_xoshiro 0.7.0", 3145 + "rand_xoshiro", 3146 + "rapidhash", 3133 3147 "sketches-ddsketch", 3134 3148 ] 3135 3149 ··· 3262 3276 "security-framework 2.11.1", 3263 3277 "security-framework-sys", 3264 3278 "tempfile", 3279 + ] 3280 + 3281 + [[package]] 3282 + name = "nibble_vec" 3283 + version = "0.1.0" 3284 + source = "registry+https://github.com/rust-lang/crates.io-index" 3285 + checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" 3286 + dependencies = [ 3287 + "smallvec", 3265 3288 ] 3266 3289 3267 3290 [[package]] ··· 3452 3475 ] 3453 3476 3454 3477 [[package]] 3478 + name = "ordered-float" 3479 + version = "5.3.0" 3480 + source = "registry+https://github.com/rust-lang/crates.io-index" 3481 + checksum = "b7d950ca161dc355eaf28f82b11345ed76c6e1f6eb1f4f4479e0323b9e2fbd0e" 3482 + dependencies = [ 3483 + "num-traits", 3484 + ] 3485 + 3486 + [[package]] 3455 3487 name = "p256" 3456 3488 version = "0.13.2" 3457 3489 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3730 3762 source = "registry+https://github.com/rust-lang/crates.io-index" 3731 3763 checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" 3732 3764 dependencies = [ 3733 - "zerocopy 0.8.24", 3765 + "zerocopy", 3734 3766 ] 3735 3767 3736 3768 [[package]] ··· 3906 3938 checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" 3907 3939 3908 3940 [[package]] 3941 + name = "radix_trie" 3942 + version = "0.2.1" 3943 + source = "registry+https://github.com/rust-lang/crates.io-index" 3944 + checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" 3945 + dependencies = [ 3946 + "endian-type", 3947 + "nibble_vec", 3948 + ] 3949 + 3950 + [[package]] 3909 3951 name = "rand" 3910 3952 version = "0.8.5" 3911 3953 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3966 4008 3967 4009 [[package]] 3968 4010 name = "rand_xoshiro" 3969 - version = "0.6.0" 4011 + version = "0.7.0" 3970 4012 source = "registry+https://github.com/rust-lang/crates.io-index" 3971 - checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" 4013 + checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" 3972 4014 dependencies = [ 3973 - "rand_core 0.6.4", 4015 + "rand_core 0.9.3", 3974 4016 ] 3975 4017 3976 4018 [[package]] 3977 - name = "rand_xoshiro" 3978 - version = "0.7.0" 4019 + name = "rapidhash" 4020 + version = "4.4.1" 3979 4021 source = "registry+https://github.com/rust-lang/crates.io-index" 3980 - checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" 4022 + checksum = "b5e48930979c155e2f33aa36ab3119b5ee81332beb6482199a8ecd6029b80b59" 3981 4023 dependencies = [ 3982 - "rand_core 0.9.3", 4024 + "rustversion", 3983 4025 ] 3984 4026 3985 4027 [[package]] ··· 6203 6245 6204 6246 [[package]] 6205 6247 name = "zerocopy" 6206 - version = "0.7.35" 6207 - source = "registry+https://github.com/rust-lang/crates.io-index" 6208 - checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" 6209 - dependencies = [ 6210 - "zerocopy-derive 0.7.35", 6211 - ] 6212 - 6213 - [[package]] 6214 - name = "zerocopy" 6215 6248 version = "0.8.24" 6216 6249 source = "registry+https://github.com/rust-lang/crates.io-index" 6217 6250 checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" 6218 6251 dependencies = [ 6219 - "zerocopy-derive 0.8.24", 6220 - ] 6221 - 6222 - [[package]] 6223 - name = "zerocopy-derive" 6224 - version = "0.7.35" 6225 - source = "registry+https://github.com/rust-lang/crates.io-index" 6226 - checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" 6227 - dependencies = [ 6228 - "proc-macro2", 6229 - "quote", 6230 - "syn", 6252 + "zerocopy-derive", 6231 6253 ] 6232 6254 6233 6255 [[package]]
+3
Cargo.toml
··· 14 14 15 15 [workspace.dependencies] 16 16 clap = { version = "4.5.56", features = ["derive", "env"] } 17 + metrics = "0.24.5" 18 + metrics-exporter-prometheus = { version = "0.18.3", default-features = false, features = ["http-listener"] } 19 + metrics-util = "0.20.3" 17 20 thiserror = "2.0.18"
+3 -2
constellation/Cargo.toml
··· 18 18 headers-accept = "0.1.4" 19 19 links = { path = "../links" } 20 20 mediatype = "0.19.18" 21 - metrics = "0.24.1" 22 - metrics-exporter-prometheus = { version = "0.16.1", default-features = false, features = ["http-listener"] } 21 + metrics = { workspace = true } 22 + metrics-exporter-prometheus = { workspace = true } 23 23 metrics-process = "2.4.0" 24 + metrics-util = { workspace = true } 24 25 num-format = "0.4.4" 25 26 ratelimit = "0.10.0" 26 27 rocksdb = { version = "0.23.0", optional = true }
+10 -8
constellation/src/bin/main.rs
··· 63 63 /// Don't change the database jetstream cursor when using a fixture 64 64 #[arg(long, requires("fixture"))] 65 65 fixture_preserve_cursor: bool, 66 - /// run a scan across the target id table and write all key -> ids to id -> keys 66 + /// fix the constellation start date (funny previous bug oops) 67 67 #[arg(long, action)] 68 - repair_target_ids: bool, 68 + reset_db_start: bool, 69 69 } 70 70 71 71 #[derive(Debug, Clone, ValueEnum)] ··· 133 133 } 134 134 println!("rocks ready."); 135 135 std::thread::scope(|s| { 136 - if args.repair_target_ids { 136 + if args.reset_db_start { 137 137 let rocks = rocks.clone(); 138 - let stay_alive = stay_alive.clone(); 139 138 s.spawn(move || { 140 - let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive); 141 - eprintln!("repair finished: {rep:?}"); 142 - rep 139 + let res = rocks.reset_start(); 140 + eprintln!("reset start finished: {res:?}"); 143 141 }); 144 142 } 145 143 s.spawn(|| { ··· 304 302 fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> { 305 303 println!("installing metrics server..."); 306 304 PrometheusBuilder::new() 305 + .idle_timeout( 306 + metrics_util::MetricKindMask::ALL, 307 + Some(time::Duration::from_secs(900)), // 15 min 308 + ) 307 309 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 308 310 .set_bucket_duration(time::Duration::from_secs(30))? 309 311 .set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here. 310 - .set_enable_unit_suffix(true) 312 + .with_recommended_naming(true) 311 313 .with_http_listener(metrics_bind) 312 314 .install()?; 313 315 println!("metrics server installed! listening at {metrics_bind:?}");
+39 -93
constellation/src/storage/rocks_store.rs
··· 308 308 } 309 309 310 310 fn global_init(&self) -> Result<()> { 311 - let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some(); 312 - if first_run { 311 + if self.db.get(STARTED_AT_KEY)?.is_none() { 313 312 self.db.put(STARTED_AT_KEY, _rv(now()))?; 314 - 315 - // hack / temporary: if we're a new db, put in a completed repair 316 - // state so we don't run repairs (repairs are for old-code dbs) 317 - let completed = TargetIdRepairState { 318 - id_when_started: 0, 319 - current_us_started_at: 0, 320 - latest_repaired_i: 0, 321 - }; 322 - self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?; 323 313 } 324 314 Ok(()) 325 315 } 326 316 327 - pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> { 328 - let mut state = match self 329 - .db 330 - .get(TARGET_ID_REPAIR_STATE_KEY)? 331 - .map(|s| _vr(&s)) 332 - .transpose()? 333 - { 334 - Some(s) => s, 335 - None => TargetIdRepairState { 336 - id_when_started: self.did_id_table.priv_id_seq, 337 - current_us_started_at: now(), 338 - latest_repaired_i: 0, 339 - }, 340 - }; 341 - 342 - eprintln!("initial repair state: {state:?}"); 343 - 344 - let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap(); 345 - 346 - let mut iter = self.db.raw_iterator_cf(&cf); 347 - iter.seek_to_first(); 348 - 349 - eprintln!("repair iterator sent to first key"); 350 - 351 - // skip ahead if we're done some, or take a single first step 352 - for _ in 0..state.latest_repaired_i { 353 - iter.next(); 317 + pub fn reset_start(&self) -> Result<bool> { 318 + let existing = self.db.get(STARTED_AT_KEY)?; 319 + if existing.is_none() { 320 + bail!("not resetting started-at key because one wasn't set"); 354 321 } 355 - 356 - eprintln!( 357 - "repair iterator skipped to {}th key", 358 - state.latest_repaired_i 359 - ); 360 - 361 - let mut maybe_done = false; 362 - 363 - let mut write_fast = rocksdb::WriteOptions::default(); 364 - write_fast.set_sync(false); 365 - write_fast.disable_wal(true); 366 - 367 - while !stay_alive.is_cancelled() && !maybe_done { 368 - // let mut batch = WriteBatch::default(); 369 - 370 - let mut any_written = false; 371 - 372 - for _ in 0..1000 { 373 - if state.latest_repaired_i % 1_000_000 == 0 { 374 - eprintln!("target iter at {}", state.latest_repaired_i); 375 - } 376 - state.latest_repaired_i += 1; 377 - 378 - if !iter.valid() { 379 - eprintln!("invalid iter, are we done repairing?"); 380 - maybe_done = true; 381 - break; 382 - }; 383 - 384 - // eprintln!("iterator seems to be valid! getting the key..."); 385 - let raw_key = iter.key().unwrap(); 386 - if raw_key.len() == 8 { 387 - // eprintln!("found an 8-byte key, skipping it since it's probably an id..."); 388 - iter.next(); 389 - continue; 390 - } 391 - let target: TargetKey = _kr::<TargetKey>(raw_key)?; 392 - let target_id: TargetId = _vr(iter.value().unwrap())?; 393 - 394 - self.db 395 - .put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?; 396 - any_written = true; 397 - iter.next(); 398 - } 399 - 400 - if any_written { 401 - self.db 402 - .put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?; 403 - std::thread::sleep(breather); 404 - } 405 - } 406 - 407 - eprintln!("repair iterator done."); 408 - 409 - Ok(false) 322 + self.db.put(STARTED_AT_KEY, _rv(COZY_FIRST_CURSOR))?; 323 + println!("started-at key reset to {COZY_FIRST_CURSOR}"); 324 + Ok(true) 410 325 } 411 326 412 327 pub fn start_backup( ··· 1795 1710 } 1796 1711 1797 1712 // TODO: add tests for key prefixes actually prefixing (bincode encoding _should_...) 1713 + 1714 + #[test] 1715 + fn rocks_started_at_persists_across_opens() -> Result<()> { 1716 + let dir = tempdir()?; 1717 + 1718 + let mut store = RocksStorage::new(dir.path())?; 1719 + store.push( 1720 + &ActionableEvent::CreateLinks { 1721 + record_id: RecordId { 1722 + did: "did:plc:asdf".into(), 1723 + collection: "a.b.c".into(), 1724 + rkey: "asdf".into(), 1725 + }, 1726 + links: vec![CollectedLink { 1727 + target: Link::Uri("e.com".into()), 1728 + path: ".uri".into(), 1729 + }], 1730 + }, 1731 + 0, 1732 + )?; 1733 + let first = store.get_stats()?.started_at; 1734 + drop(store); 1735 + 1736 + std::thread::sleep(Duration::from_millis(5)); 1737 + 1738 + let store = RocksStorage::new(dir.path())?; 1739 + let second = store.get_stats()?.started_at; 1740 + 1741 + assert_eq!(first, second, "STARTED_AT must not change across opens"); 1742 + Ok(()) 1743 + } 1798 1744 }