Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

1//! An implementation of the CVM fast element counting algorithm presented in 2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34> 3//! 4//! This implementation uses a treap data structure as the buffer, following Knuth's original design. 5 6mod treap; 7 8use crate::treap::Treap; 9use rand::rngs::StdRng; 10use rand::{Rng, SeedableRng}; 11 12/// Specification for confidence level in the CVM algorithm 13#[derive(Debug, Clone, Copy)] 14pub enum ConfidenceSpec { 15 /// Specify delta directly (probability of failure) 16 Delta(f64), 17 /// Specify confidence level (probability of success) 18 Confidence(f64), 19} 20 21impl ConfidenceSpec { 22 /// Convert to delta value for internal use 23 fn to_delta(self) -> f64 { 24 match self { 25 ConfidenceSpec::Delta(delta) => delta, 26 ConfidenceSpec::Confidence(confidence) => 1.0 - confidence, 27 } 28 } 29 30 /// Validate the confidence specification 31 fn validate(self) -> Result<Self, String> { 32 match self { 33 ConfidenceSpec::Delta(delta) => { 34 if delta <= 0.0 || delta >= 1.0 { 35 Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string()) 36 } else { 37 Ok(self) 38 } 39 } 40 ConfidenceSpec::Confidence(confidence) => { 41 if confidence <= 0.0 || confidence >= 1.0 { 42 Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string()) 43 } else { 44 Ok(self) 45 } 46 } 47 } 48 } 49} 50 51/// Builder for constructing CVM instances with validation and defaults 52/// 53/// # Examples 54/// 55/// ``` 56/// use cvmcount::CVM; 57/// 58/// // Using defaults (`epsilon=0.8`, `confidence=0.9`, `size=1000`) 59/// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 60/// 61/// // Custom parameters 62/// let cvm: CVM<i32> = CVM::<i32>::builder() 63/// .epsilon(0.05) // 5 % accuracy 64/// .confidence(0.99) // 99 % confidence 65/// .estimated_size(10_000) 66/// .build() 67/// .unwrap(); 68/// 69/// // Using delta instead of confidence 70/// let cvm: CVM<String> = CVM::<String>::builder() 71/// .epsilon(0.1) 72/// .delta(0.01) // 1 % failure probability 73/// .build() 74/// .unwrap(); 75/// ``` 76#[derive(Debug, Clone, Default)] 77pub struct CVMBuilder { 78 epsilon: Option<f64>, 79 confidence_spec: Option<ConfidenceSpec>, 80 stream_size: Option<usize>, 81} 82 83impl CVMBuilder { 84 /// Create a new builder with default values 85 pub fn new() -> Self { 86 Self::default() 87 } 88 89 /// Set the epsilon parameter (accuracy requirement) 90 /// 91 /// `Epsilon` determines how close you want your estimate to be to the true number 92 /// of distinct elements. A smaller `ε` means you require a more precise estimate. 93 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the 94 /// actual value. 95 /// 96 /// Must be between 0.0 and 1.0 (exclusive). 97 pub fn epsilon(mut self, epsilon: f64) -> Self { 98 self.epsilon = Some(epsilon); 99 self 100 } 101 102 /// Set the confidence level (probability that the estimate will be accurate) 103 /// 104 /// Confidence represents how certain you want to be that the algorithm's 105 /// estimate will fall within the desired accuracy range. For example, 106 /// `confidence = 0.99` means you're 99 % sure the estimate will be accurate. 107 /// 108 /// Must be between 0.0 and 1.0 (exclusive). 109 /// Cannot be used together with [`Self::delta`] – the last one called will be used. 110 pub fn confidence(mut self, confidence: f64) -> Self { 111 self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence)); 112 self 113 } 114 115 /// Set the delta parameter (probability of failure) 116 /// 117 /// Delta represents the probability that the algorithm's estimate will fall 118 /// outside the desired accuracy range. For example, `delta = 0.01` means there's 119 /// a 1 % chance the estimate will be inaccurate. 120 /// 121 /// Must be between 0.0 and 1.0 (exclusive). 122 /// Cannot be used together with [`Self::confidence()`] – the last one called will be used. 123 pub fn delta(mut self, delta: f64) -> Self { 124 self.confidence_spec = Some(ConfidenceSpec::Delta(delta)); 125 self 126 } 127 128 /// Set the estimated stream size 129 /// 130 /// This is used to determine buffer size and can be a loose approximation. 131 /// The closer it is to the actual stream size, the more accurate the results 132 /// will be. 133 pub fn estimated_size(mut self, size: usize) -> Self { 134 self.stream_size = Some(size); 135 self 136 } 137 138 /// Build the CVM instance with validation 139 /// 140 /// Uses the following defaults if not specified: 141 /// - `epsilon: 0.8` (good starting point for most applications) 142 /// - `confidence: 0.9` (90 % confidence, equivalent to delta = 0.1) 143 /// - `estimated_size: 1000` 144 /// 145 /// Returns an error if any parameters are invalid. 146 pub fn build<T: Ord>(self) -> Result<CVM<T>, String> { 147 // Validate and get epsilon 148 let epsilon = self.epsilon.unwrap_or(0.8); 149 if epsilon <= 0.0 || epsilon >= 1.0 { 150 return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string()); 151 } 152 153 // Validate and get delta 154 let confidence_spec = self 155 .confidence_spec 156 .unwrap_or(ConfidenceSpec::Confidence(0.9)); 157 let validated_spec = confidence_spec.validate()?; 158 let delta = validated_spec.to_delta(); 159 160 // Validate and get stream size 161 let stream_size = self.stream_size.unwrap_or(1000); 162 if stream_size == 0 { 163 return Err("Stream size must be greater than 0".to_string()); 164 } 165 166 Ok(CVM::new(epsilon, delta, stream_size)) 167 } 168} 169 170/// A counter implementing the CVM algorithm 171/// 172/// This implementation uses a treap (randomized binary search tree) as the buffer, 173/// which provides `O(log n)` operations while maintaining the probabilistic properties 174/// needed for the algorithm. 175/// 176/// Note that the CVM struct's buffer takes ownership of its elements. 177pub struct CVM<T: Ord> { 178 buf_size: usize, 179 buf: Treap<T>, 180 probability: f64, 181 rng: StdRng, 182} 183 184impl<T: Ord> CVM<T> { 185 /// Create a new builder for constructing CVM instances 186 /// 187 /// The builder provides a more ergonomic way to construct CVM instances with 188 /// validation and sensible defaults. 189 /// 190 /// # Examples 191 /// 192 /// ``` 193 /// use cvmcount::CVM; 194 /// 195 /// // Using defaults 196 /// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 197 /// 198 /// // Custom configuration 199 /// let cvm: CVM<i32> = CVM::<i32>::builder() 200 /// .epsilon(0.05) 201 /// .confidence(0.99) 202 /// .estimated_size(10_000) 203 /// .build() 204 /// .unwrap(); 205 /// ``` 206 pub fn builder() -> CVMBuilder { 207 CVMBuilder::new() 208 } 209 210 /// Initialise the algorithm 211 /// 212 /// `epsilon`: how close you want your estimate to be to the true number of distinct elements. 213 /// A smaller `ε` means you require a more precise estimate. 214 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value. 215 /// An epsilon of `0.8` is a good starting point for most applications. 216 /// 217 /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 218 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 219 /// higher chance the estimate might be outside the desired range. 220 /// A `delta` of `0.1` is a good starting point for most applications. 221 /// 222 /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 223 /// the more accurate the result will be. 224 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 225 let bufsize = buffer_size(epsilon, delta, stream_size); 226 Self { 227 buf_size: bufsize, 228 buf: Treap::new(), 229 probability: 1.0, 230 rng: StdRng::from_entropy(), 231 } 232 } 233 /// Add an element, potentially updating the unique element count 234 pub fn process_element(&mut self, elem: T) { 235 // The algorithm works as follows: 236 // 1. If element exists in buffer, remove it (this ensures proper sampling) 237 // 2. Add element back with current probability 238 // 3. If buffer is full, remove ~half the elements and halve the probability 239 // This creates a geometric sampling scheme that provides an unbiased estimate 240 if self.buf.contains(&elem) { 241 self.buf.remove(&elem); 242 } 243 if self.rng.gen_bool(self.probability) { 244 self.buf.insert(elem, &mut self.rng); 245 } 246 while self.buf.len() == self.buf_size { 247 self.clear_about_half(); 248 self.probability /= 2.0; 249 } 250 } 251 // remove around half of the elements at random 252 fn clear_about_half(&mut self) { 253 // Need to capture rng reference to use in closure 254 let rng = &mut self.rng; 255 self.buf.retain(|_| rng.gen_bool(0.5)); 256 } 257 /// Calculate the current unique element count. You can continue to add elements after calling this method. 258 pub fn calculate_final_result(&self) -> f64 { 259 self.buf.len() as f64 / self.probability 260 } 261} 262 263// Calculate threshold (buf_size) value for the F0-Estimator algorithm 264fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize { 265 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize 266} 267 268#[cfg(test)] 269mod tests { 270 use std::{ 271 fs::File, 272 io::{BufRead, BufReader}, 273 path::Path, 274 }; 275 276 use super::{CVM, ConfidenceSpec}; 277 use regex::Regex; 278 use std::collections::HashSet; 279 280 fn open_file<P>(filename: P) -> BufReader<File> 281 where 282 P: AsRef<Path>, 283 { 284 let f = File::open(filename).expect("Couldn't read from file"); 285 BufReader::new(f) 286 } 287 288 fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) { 289 let words = line.split(' '); 290 words.for_each(|word| { 291 let clean_word = re.replace_all(word, "").to_lowercase(); 292 hs.insert(clean_word); 293 }) 294 } 295 #[test] 296 fn actual() { 297 let input_file = "benches/kiy.txt"; 298 let re = Regex::new(r"[^\w\s]").unwrap(); 299 let br = open_file(input_file); 300 let mut hs = HashSet::new(); 301 br.lines() 302 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap())); 303 assert_eq!(hs.len(), 9016) 304 } 305 306 #[test] 307 fn test_builder_defaults() { 308 let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 309 // Verify that it's properly constructed with defaults 310 assert_eq!(cvm.calculate_final_result(), 0.0); // Empty buffer 311 } 312 313 #[test] 314 fn test_builder_custom_params() { 315 let cvm: CVM<i32> = CVM::<i32>::builder() 316 .epsilon(0.05) 317 .confidence(0.99) 318 .estimated_size(5000) 319 .build() 320 .unwrap(); 321 322 // Test that it works by processing some elements 323 let mut cvm = cvm; 324 for i in 0..100 { 325 cvm.process_element(i); 326 } 327 let result = cvm.calculate_final_result(); 328 assert!(result > 0.0); 329 } 330 331 #[test] 332 fn test_builder_delta_vs_confidence() { 333 // Test that confidence and delta give equivalent results 334 let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap(); 335 336 let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap(); 337 338 // They should have the same internal configuration 339 // (we can't directly test this without exposing internals, 340 // but we can test they both work) 341 assert_eq!(cvm1.calculate_final_result(), 0.0); 342 assert_eq!(cvm2.calculate_final_result(), 0.0); 343 } 344 345 #[test] 346 fn test_builder_last_wins() { 347 // Test that the last confidence/delta setting wins 348 let cvm: CVM<i32> = CVM::<i32>::builder() 349 .confidence(0.9) 350 .delta(0.05) // This should override confidence 351 .build() 352 .unwrap(); 353 354 assert_eq!(cvm.calculate_final_result(), 0.0); 355 } 356 357 #[test] 358 fn test_builder_validation() { 359 // Test epsilon validation 360 let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>(); 361 assert!(result.is_err()); 362 363 let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>(); 364 assert!(result.is_err()); 365 366 let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>(); 367 assert!(result.is_err()); 368 369 // Test confidence validation 370 let result = CVM::<i32>::builder().confidence(0.0).build::<i32>(); 371 assert!(result.is_err()); 372 373 let result = CVM::<i32>::builder().confidence(1.0).build::<i32>(); 374 assert!(result.is_err()); 375 376 // Test delta validation 377 let result = CVM::<i32>::builder().delta(0.0).build::<i32>(); 378 assert!(result.is_err()); 379 380 let result = CVM::<i32>::builder().delta(1.0).build::<i32>(); 381 assert!(result.is_err()); 382 383 // Test stream size validation 384 let result = CVM::<i32>::builder().estimated_size(0).build::<i32>(); 385 assert!(result.is_err()); 386 } 387 388 #[test] 389 fn test_builder_method_chaining() { 390 let result = CVM::<String>::builder() 391 .epsilon(0.1) 392 .confidence(0.95) 393 .estimated_size(2000) 394 .build::<String>(); 395 396 assert!(result.is_ok()); 397 } 398 399 #[test] 400 fn test_confidence_spec_conversion() { 401 // Test ConfidenceSpec::to_delta conversion 402 let confidence_spec = ConfidenceSpec::Confidence(0.9); 403 assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON); 404 405 let delta_spec = ConfidenceSpec::Delta(0.05); 406 assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON); 407 } 408}