Rust implementation of the CVM algorithm for counting distinct elements in a stream
0

Configure Feed

Select the types of activity you want to include in your feed.

1//! An implementation of the CVM fast element counting algorithm presented in 2//! Chakraborty, S., Vinodchandran, N. V., & Meel, K. S. (2022). *Distinct Elements in Streams: An Algorithm for the (Text) Book*. 6 pages, 727571 bytes. <https://doi.org/10.4230/LIPIcs.ESA.2022.34> 3//! 4//! This implementation uses a treap data structure as the buffer, following Knuth's original design. 5 6mod treap; 7 8use crate::treap::Treap; 9use rand::rngs::StdRng; 10use rand::{Rng, SeedableRng}; 11 12/// Specification for confidence level in the CVM algorithm 13#[derive(Debug, Clone, Copy)] 14pub enum ConfidenceSpec { 15 /// Specify delta directly (probability of failure) 16 Delta(f64), 17 /// Specify confidence level (probability of success) 18 Confidence(f64), 19} 20 21impl ConfidenceSpec { 22 /// Convert to delta value for internal use 23 fn to_delta(self) -> f64 { 24 match self { 25 ConfidenceSpec::Delta(delta) => delta, 26 ConfidenceSpec::Confidence(confidence) => 1.0 - confidence, 27 } 28 } 29 30 /// Validate the confidence specification 31 fn validate(self) -> Result<Self, String> { 32 match self { 33 ConfidenceSpec::Delta(delta) => { 34 if delta <= 0.0 || delta >= 1.0 { 35 Err("Delta must be between 0.0 and 1.0 (exclusive)".to_string()) 36 } else { 37 Ok(self) 38 } 39 } 40 ConfidenceSpec::Confidence(confidence) => { 41 if confidence <= 0.0 || confidence >= 1.0 { 42 Err("Confidence must be between 0.0 and 1.0 (exclusive)".to_string()) 43 } else { 44 Ok(self) 45 } 46 } 47 } 48 } 49} 50 51/// Builder for constructing CVM instances with validation and defaults 52/// 53/// # Examples 54/// 55/// ``` 56/// use cvmcount::CVM; 57/// 58/// // Using defaults (`epsilon=0.8`, `confidence=0.9`, `size=1000`) 59/// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 60/// 61/// // Custom parameters 62/// let cvm: CVM<i32> = CVM::<i32>::builder() 63/// .epsilon(0.05) // 5 % accuracy 64/// .confidence(0.99) // 99 % confidence 65/// .estimated_size(10_000) 66/// .build() 67/// .unwrap(); 68/// 69/// // Using delta instead of confidence 70/// let cvm: CVM<String> = CVM::<String>::builder() 71/// .epsilon(0.1) 72/// .delta(0.01) // 1 % failure probability 73/// .build() 74/// .unwrap(); 75/// ``` 76#[derive(Debug, Clone, Default)] 77pub struct CVMBuilder { 78 epsilon: Option<f64>, 79 confidence_spec: Option<ConfidenceSpec>, 80 stream_size: Option<usize>, 81} 82 83impl CVMBuilder { 84 /// Create a new builder with default values 85 pub fn new() -> Self { 86 Self::default() 87 } 88 89 /// Set the epsilon parameter (accuracy requirement) 90 /// 91 /// `Epsilon` determines how close you want your estimate to be to the true number 92 /// of distinct elements. A smaller `ε` means you require a more precise estimate. 93 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the 94 /// actual value. 95 /// 96 /// Must be between 0.0 and 1.0 (exclusive). 97 pub fn epsilon(mut self, epsilon: f64) -> Self { 98 self.epsilon = Some(epsilon); 99 self 100 } 101 102 /// Set the confidence level (probability that the estimate will be accurate) 103 /// 104 /// Confidence represents how certain you want to be that the algorithm's 105 /// estimate will fall within the desired accuracy range. For example, 106 /// `confidence = 0.99` means you're 99 % sure the estimate will be accurate. 107 /// 108 /// Must be between 0.0 and 1.0 (exclusive). 109 /// Cannot be used together with [`Self::delta`] – the last one called will be used. 110 pub fn confidence(mut self, confidence: f64) -> Self { 111 self.confidence_spec = Some(ConfidenceSpec::Confidence(confidence)); 112 self 113 } 114 115 /// Set the delta parameter (probability of failure) 116 /// 117 /// Delta represents the probability that the algorithm's estimate will fall 118 /// outside the desired accuracy range. For example, `delta = 0.01` means there's 119 /// a 1 % chance the estimate will be inaccurate. 120 /// 121 /// Must be between 0.0 and 1.0 (exclusive). 122 /// Cannot be used together with [`Self::confidence()`] – the last one called will be used. 123 pub fn delta(mut self, delta: f64) -> Self { 124 self.confidence_spec = Some(ConfidenceSpec::Delta(delta)); 125 self 126 } 127 128 /// Set the estimated stream size 129 /// 130 /// This is used to determine buffer size and can be a loose approximation. 131 /// The closer it is to the actual stream size, the more accurate the results 132 /// will be. 133 pub fn estimated_size(mut self, size: usize) -> Self { 134 self.stream_size = Some(size); 135 self 136 } 137 138 /// Build the CVM instance with validation 139 /// 140 /// Uses the following defaults if not specified: 141 /// - `epsilon: 0.8` (good starting point for most applications) 142 /// - `confidence: 0.9` (90 % confidence, equivalent to delta = 0.1) 143 /// - `estimated_size: 1000` 144 /// 145 /// Returns an error if any parameters are invalid. 146 pub fn build<T: Ord>(self) -> Result<CVM<T>, String> { 147 // Validate and get epsilon 148 let epsilon = self.epsilon.unwrap_or(0.8); 149 if epsilon <= 0.0 || epsilon >= 1.0 { 150 return Err("Epsilon must be between 0.0 and 1.0 (exclusive)".to_string()); 151 } 152 153 // Validate and get delta 154 let confidence_spec = self 155 .confidence_spec 156 .unwrap_or(ConfidenceSpec::Confidence(0.9)); 157 let validated_spec = confidence_spec.validate()?; 158 let delta = validated_spec.to_delta(); 159 160 // Validate and get stream size 161 let stream_size = self.stream_size.unwrap_or(1000); 162 if stream_size == 0 { 163 return Err("Stream size must be greater than 0".to_string()); 164 } 165 166 Ok(CVM::new(epsilon, delta, stream_size)) 167 } 168} 169 170/// A counter implementing the CVM algorithm 171/// 172/// This implementation uses a treap (randomized binary search tree) as the buffer, 173/// which provides `O(log n)` operations while maintaining the probabilistic properties 174/// needed for the algorithm. 175/// 176/// Note that the CVM struct's buffer takes ownership of its elements. 177pub struct CVM<T: Ord> { 178 buf_size: usize, 179 buf: Treap<T>, 180 probability: f64, 181 rng: StdRng, 182} 183 184impl<T: Ord> CVM<T> { 185 /// Create a new builder for constructing CVM instances 186 /// 187 /// The builder provides a more ergonomic way to construct CVM instances with 188 /// validation and sensible defaults. 189 /// 190 /// # Examples 191 /// 192 /// ``` 193 /// use cvmcount::CVM; 194 /// 195 /// // Using defaults 196 /// let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 197 /// 198 /// // Custom configuration 199 /// let cvm: CVM<i32> = CVM::<i32>::builder() 200 /// .epsilon(0.05) 201 /// .confidence(0.99) 202 /// .estimated_size(10_000) 203 /// .build() 204 /// .unwrap(); 205 /// ``` 206 pub fn builder() -> CVMBuilder { 207 CVMBuilder::new() 208 } 209 210 /// Initialise the algorithm 211 /// 212 /// `epsilon`: how close you want your estimate to be to the true number of distinct elements. 213 /// A smaller `ε` means you require a more precise estimate. 214 /// For example, `ε = 0.05` means you want your estimate to be within 5 % of the actual value. 215 /// An epsilon of `0.8` is a good starting point for most applications. 216 /// 217 /// `delta`: The level of certainty that the algorithm's estimate will fall within the desired accuracy range. A higher confidence 218 /// (e.g. 99.9 %) means you're very sure the estimate will be accurate, while a lower confidence (e.g. 90 %) means there's a 219 /// higher chance the estimate might be outside the desired range. 220 /// A `delta` of `0.1` is a good starting point for most applications. 221 /// 222 /// `stream_size`: this is used to determine buffer size and can be a loose approximation. The closer it is to the stream size, 223 /// the more accurate the result will be. 224 pub fn new(epsilon: f64, delta: f64, stream_size: usize) -> Self { 225 let bufsize = buffer_size(epsilon, delta, stream_size); 226 Self { 227 buf_size: bufsize, 228 buf: Treap::new(), 229 probability: 1.0, 230 rng: StdRng::from_entropy(), 231 } 232 } 233 /// Add an element, potentially updating the unique element count 234 pub fn process_element(&mut self, elem: T) { 235 // The algorithm works as follows: 236 // 1. If element exists in buffer, remove it (this ensures proper sampling) 237 // 2. Add element back with current probability 238 // 3. If buffer is full, remove ~half the elements and halve the probability 239 // This creates a geometric sampling scheme that provides an unbiased estimate 240 if self.buf.contains(&elem) { 241 self.buf.remove(&elem); 242 } 243 if self.rng.gen_bool(self.probability) { 244 self.buf.insert(elem, &mut self.rng); 245 } 246 while self.buf.len() == self.buf_size { 247 self.clear_about_half(); 248 self.probability /= 2.0; 249 } 250 } 251 // remove around half of the elements at random 252 fn clear_about_half(&mut self) { 253 // Need to capture rng reference to use in closure 254 let rng = &mut self.rng; 255 self.buf.retain(|_| rng.gen_bool(0.5)); 256 } 257 /// Process an entire iterator of owned values and return the final estimate 258 /// 259 /// This is a convenience method that processes all elements from an iterator 260 /// and returns the final count estimate. The iterator must yield owned values 261 /// that the CVM can take ownership of. 262 /// 263 /// # Examples 264 /// 265 /// ``` 266 /// use cvmcount::CVM; 267 /// 268 /// // Process owned strings 269 /// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()]; 270 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 271 /// let estimate = cvm.process_stream(words); 272 /// assert!(estimate > 0.0); 273 /// 274 /// // Process numeric data 275 /// let numbers = vec![1, 2, 3, 2, 1, 4]; 276 /// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap(); 277 /// let estimate = cvm.process_stream(numbers); 278 /// assert!(estimate > 0.0); 279 /// 280 /// // When you have borrowed data, clone explicitly 281 /// let borrowed_words = vec!["hello", "world", "hello"]; 282 /// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 283 /// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string())); 284 /// assert!(estimate > 0.0); 285 /// ``` 286 pub fn process_stream<I>(&mut self, iter: I) -> f64 287 where 288 I: IntoIterator<Item = T>, 289 { 290 for item in iter { 291 self.process_element(item); 292 } 293 self.calculate_final_result() 294 } 295 296 /// Calculate the current unique element count. You can continue to add elements after calling this method. 297 pub fn calculate_final_result(&self) -> f64 { 298 self.buf.len() as f64 / self.probability 299 } 300} 301 302/// Extension trait for iterators to estimate distinct count directly 303/// 304/// This trait provides convenient methods to estimate distinct counts from iterators 305/// without manually creating and managing a CVM instance. 306/// 307/// # Examples 308/// 309/// ``` 310/// use cvmcount::{CVM, EstimateDistinct}; 311/// 312/// // Simple usage with default parameters 313/// let numbers = vec![1, 2, 3, 2, 1, 4, 5]; 314/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000); 315/// assert!(estimate > 0.0); 316/// 317/// // Using builder pattern for more control 318/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()]; 319/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99); 320/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap(); 321/// assert!(estimate > 0.0); 322/// ``` 323pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized { 324 /// Estimate distinct count using the CVM algorithm with specified parameters 325 /// 326 /// # Arguments 327 /// 328 /// * `epsilon` - Accuracy requirement (smaller = more accurate) 329 /// * `delta` - Failure probability (smaller = more confident) 330 /// * `estimated_size` - Rough estimate of total stream size 331 /// 332 /// # Returns 333 /// 334 /// The estimated number of distinct elements 335 fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 { 336 let mut cvm = CVM::new(epsilon, delta, estimated_size); 337 cvm.process_stream(self) 338 } 339 340 /// Estimate distinct count using a builder for more ergonomic configuration 341 /// 342 /// # Arguments 343 /// 344 /// * `builder` - A configured CVMBuilder instance 345 /// 346 /// # Returns 347 /// 348 /// Result containing the estimated number of distinct elements or an error message 349 /// 350 /// # Examples 351 /// 352 /// ``` 353 /// use cvmcount::{CVM, EstimateDistinct}; 354 /// 355 /// let data = vec![1, 2, 3, 2, 1]; 356 /// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99); 357 /// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap(); 358 /// assert!(estimate > 0.0); 359 /// ``` 360 fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> { 361 let mut cvm: CVM<T> = builder.build()?; 362 Ok(cvm.process_stream(self)) 363 } 364} 365 366/// Implement EstimateDistinct for all iterators that yield Ord types 367impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {} 368 369// Calculate threshold (buf_size) value for the F0-Estimator algorithm 370fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize { 371 ((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize 372} 373 374#[cfg(test)] 375mod tests { 376 use std::{ 377 fs::File, 378 io::{BufRead, BufReader}, 379 path::Path, 380 }; 381 382 use super::{CVM, ConfidenceSpec, EstimateDistinct}; 383 use regex::Regex; 384 use std::collections::HashSet; 385 386 fn open_file<P>(filename: P) -> BufReader<File> 387 where 388 P: AsRef<Path>, 389 { 390 let f = File::open(filename).expect("Couldn't read from file"); 391 BufReader::new(f) 392 } 393 394 fn line_to_word(re: &Regex, hs: &mut HashSet<String>, line: &str) { 395 let words = line.split(' '); 396 words.for_each(|word| { 397 let clean_word = re.replace_all(word, "").to_lowercase(); 398 hs.insert(clean_word); 399 }) 400 } 401 #[test] 402 fn actual() { 403 let input_file = "benches/kiy.txt"; 404 let re = Regex::new(r"[^\w\s]").unwrap(); 405 let br = open_file(input_file); 406 let mut hs = HashSet::new(); 407 br.lines() 408 .for_each(|line| line_to_word(&re, &mut hs, &line.unwrap())); 409 assert_eq!(hs.len(), 9016) 410 } 411 412 #[test] 413 fn test_builder_defaults() { 414 let cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 415 // Verify that it's properly constructed with defaults 416 assert_eq!(cvm.calculate_final_result(), 0.0); // Empty buffer 417 } 418 419 #[test] 420 fn test_builder_custom_params() { 421 let cvm: CVM<i32> = CVM::<i32>::builder() 422 .epsilon(0.05) 423 .confidence(0.99) 424 .estimated_size(5000) 425 .build() 426 .unwrap(); 427 428 // Test that it works by processing some elements 429 let mut cvm = cvm; 430 for i in 0..100 { 431 cvm.process_element(i); 432 } 433 let result = cvm.calculate_final_result(); 434 assert!(result > 0.0); 435 } 436 437 #[test] 438 fn test_builder_delta_vs_confidence() { 439 // Test that confidence and delta give equivalent results 440 let cvm1: CVM<i32> = CVM::<i32>::builder().confidence(0.9).build().unwrap(); 441 442 let cvm2: CVM<i32> = CVM::<i32>::builder().delta(0.1).build().unwrap(); 443 444 // They should have the same internal configuration 445 // (we can't directly test this without exposing internals, 446 // but we can test they both work) 447 assert_eq!(cvm1.calculate_final_result(), 0.0); 448 assert_eq!(cvm2.calculate_final_result(), 0.0); 449 } 450 451 #[test] 452 fn test_builder_last_wins() { 453 // Test that the last confidence/delta setting wins 454 let cvm: CVM<i32> = CVM::<i32>::builder() 455 .confidence(0.9) 456 .delta(0.05) // This should override confidence 457 .build() 458 .unwrap(); 459 460 assert_eq!(cvm.calculate_final_result(), 0.0); 461 } 462 463 #[test] 464 fn test_builder_validation() { 465 // Test epsilon validation 466 let result = CVM::<i32>::builder().epsilon(0.0).build::<i32>(); 467 assert!(result.is_err()); 468 469 let result = CVM::<i32>::builder().epsilon(1.0).build::<i32>(); 470 assert!(result.is_err()); 471 472 let result = CVM::<i32>::builder().epsilon(-0.5).build::<i32>(); 473 assert!(result.is_err()); 474 475 // Test confidence validation 476 let result = CVM::<i32>::builder().confidence(0.0).build::<i32>(); 477 assert!(result.is_err()); 478 479 let result = CVM::<i32>::builder().confidence(1.0).build::<i32>(); 480 assert!(result.is_err()); 481 482 // Test delta validation 483 let result = CVM::<i32>::builder().delta(0.0).build::<i32>(); 484 assert!(result.is_err()); 485 486 let result = CVM::<i32>::builder().delta(1.0).build::<i32>(); 487 assert!(result.is_err()); 488 489 // Test stream size validation 490 let result = CVM::<i32>::builder().estimated_size(0).build::<i32>(); 491 assert!(result.is_err()); 492 } 493 494 #[test] 495 fn test_builder_method_chaining() { 496 let result = CVM::<String>::builder() 497 .epsilon(0.1) 498 .confidence(0.95) 499 .estimated_size(2000) 500 .build::<String>(); 501 502 assert!(result.is_ok()); 503 } 504 505 #[test] 506 fn test_confidence_spec_conversion() { 507 // Test ConfidenceSpec::to_delta conversion 508 let confidence_spec = ConfidenceSpec::Confidence(0.9); 509 assert!((confidence_spec.to_delta() - 0.1).abs() < f64::EPSILON); 510 511 let delta_spec = ConfidenceSpec::Delta(0.05); 512 assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON); 513 } 514 515 #[test] 516 fn test_process_stream() { 517 let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap(); 518 519 // Test with vector 520 let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3]; 521 let estimate = cvm.process_stream(numbers); 522 assert!(estimate > 0.0); 523 524 // Test with range 525 let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap(); 526 let estimate2 = cvm2.process_stream(1..=100); 527 assert!(estimate2 > 0.0); 528 } 529 530 #[test] 531 fn test_process_stream_strings() { 532 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 533 534 // Test with owned strings 535 let words = vec![ 536 "hello".to_string(), 537 "world".to_string(), 538 "hello".to_string(), 539 "rust".to_string(), 540 ]; 541 let estimate = cvm.process_stream(words); 542 assert!(estimate > 0.0); 543 } 544 545 #[test] 546 fn test_process_stream_with_map() { 547 let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap(); 548 549 // Test with borrowed data mapped to owned 550 let borrowed_words = ["hello", "world", "hello", "rust"]; 551 let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string())); 552 assert!(estimate > 0.0); 553 } 554 555 #[test] 556 fn test_estimate_distinct_trait() { 557 // Test simple usage 558 let numbers = vec![1, 2, 3, 2, 1, 4, 5]; 559 let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000); 560 assert!(estimate > 0.0); 561 562 // Test with builder 563 let words = vec![ 564 "hello".to_string(), 565 "world".to_string(), 566 "hello".to_string(), 567 ]; 568 let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99); 569 let estimate = words 570 .into_iter() 571 .estimate_distinct_with_builder(builder) 572 .unwrap(); 573 assert!(estimate > 0.0); 574 } 575 576 #[test] 577 fn test_estimate_distinct_with_cloning() { 578 // Test that explicit cloning works as expected 579 let borrowed_numbers = [1, 2, 3, 2, 1, 4]; 580 let estimate = borrowed_numbers 581 .iter() 582 .cloned() 583 .estimate_distinct_count(0.1, 0.1, 100); 584 assert!(estimate > 0.0); 585 } 586 587 #[test] 588 fn test_streaming_integration_with_file_processing() { 589 // Simulate file processing pattern 590 let lines = vec![ 591 "hello world".to_string(), 592 "world peace".to_string(), 593 "hello rust".to_string(), 594 ]; 595 596 let mut cvm: CVM<String> = CVM::<String>::builder() 597 .epsilon(0.1) 598 .confidence(0.9) 599 .build() 600 .unwrap(); 601 602 // Process words from all lines 603 let words: Vec<String> = lines 604 .into_iter() 605 .flat_map(|line| { 606 line.split_whitespace() 607 .map(|s| s.to_string()) 608 .collect::<Vec<_>>() 609 }) 610 .collect(); 611 let estimate = cvm.process_stream(words); 612 613 assert!(estimate > 0.0); 614 } 615 616 #[test] 617 fn test_streaming_large_dataset() { 618 // Test with a larger dataset to verify the algorithm works 619 let mut cvm: CVM<i32> = CVM::<i32>::builder() 620 .epsilon(0.1) 621 .confidence(0.9) 622 .estimated_size(10_000) 623 .build() 624 .unwrap(); 625 626 // Create data with known distinct count (1000 unique values, repeated) 627 let data: Vec<i32> = (0..1000).cycle().take(10_000).collect(); 628 let estimate = cvm.process_stream(data); 629 630 // The estimate should be reasonably close to 1000 631 // With epsilon=0.1, we expect within 10 % accuracy most of the time 632 assert!(estimate > 500.0 && estimate < 2000.0); 633 } 634}